こんにちは、クラウド経費 テックリードの宮村(みやむー)@miyamura.koyo です。
2026年1月31日に開催されるSRE Kaigi 2026で、「月間数億レコードのアクセスログ基盤を無停止・低コストでAWS移行せよ!アプリケーションエンジニアのSREチャレンジ💪」というタイトルで発表させていただきます!
発表では30分という時間制限もあり、技術的な詳細を全てお伝えできないため、テックブログを執筆しました。
本記事では、発表では説明しきれない実装詳細を中心に補足していきます。
なお紹介しているサンプルコードについては、実際のシステムで使われているコードとは一部異なるため、参考にする際は適宜読み替えてください。
発表資料
当日の発表資料はこちらです。
p34 アクセスログ書き込み実装(インフラ編)
発表資料のp34で触れたアクセスログ書き込み実装について、より詳しく補足します。
Firehose + Glue の設定
FirehoseでAWS Glueを用いてParquet + Snappy圧縮をするためのTerraform定義は以下です。
resource "aws_glue_catalog_database" "firehose_glue_db" { name = "xxx" } resource "aws_glue_catalog_table" "firehose_glue_table" { name = "xxx" database_name = aws_glue_catalog_database.firehose_glue_db.name # ここにスキーマを定義する storage_descriptor { columns { name = "uuid" type = "string" } # ... } }
resource "aws_kinesis_firehose_delivery_stream" "log_transfer_fhstream" { # ... data_format_conversion_configuration { enabled = true # 入力は JSON input_format_configuration { deserializer { open_x_json_ser_de {} } } # 出力は Parquet + Snappy output_format_configuration { serializer { parquet_ser_de { compression = "SNAPPY" } } } schema_configuration { database_name = aws_glue_catalog_database.firehose_glue_db.name table_name = aws_glue_catalog_table.firehose_glue_table.name role_arn = "xxx" } } }
詳しくはTerraformの公式ドキュメントを参照してください。
- aws_glue_catalog_table | Resources | hashicorp/aws | Terraform | Terraform Registry
- aws_kinesis_firehose_delivery_stream | Resources | hashicorp/aws | Terraform | Terraform Registry
ちなみにAWS Glueクローラーでテーブル定義を自動生成する方法もありますが、今回はアクセスログのスキーマが固定なので、手動でGlueテーブル定義をすることにしました。
ステージング環境のみリソースを定義する
Terraformでステージング(以下stg)環境のみリソースを定義する場合は count が使用できます。
resource "aws_s3_bucket" "stg_only" { # environment が "stg" の場合のみ 1つ作成、それ以外は 0 (作成しない) count = var.environment == "stg" ? 1 : 0 # ... }
この場合、参照する際に aws_s3_bucket.stg_only[0] のように、配列形式でアクセスが必要になるので、注意です。
cwagentconfig の設定
CloudWatchエージェントの設定ファイル cwagentconfig で特定のログファイルを監視する設定は以下です。
{ "logs": { "logs_collected": { "files": { "collect_list": [ { "file_path": "xxx.log", "log_group_name": "xxx-loggroup", "log_stream_name": "xxx-logstream", "timestamp_format": "xxx" } ] } } } }
ちなみにp44の force_flush_interval の設定を行う場合は以下のように記述します。
{ "logs": { // デフォルトは5s "force_flush_interval": 1, "logs_collected": { // ... } } }
詳しくは以下の公式ドキュメントを参照してください。
p35 アクセスログ書き込み実装(アプリケーション編)
発表資料のp35で触れたアプリケーション側の実装について補足します。
AppAccessLog::Logger の実装
アクセスログを出力するための専用のLoggerクラスを実装しました。 1行あたり1つのJSON文字列にするために、カスタムフォーマッターを自作するなど、工夫しています。
require 'logger' require 'singleton' class AppAccessLog::Logger # インスタンス生成コストを下げるためのシングルトン include Singleton ACCESS_LOG_PATH = Rails.root.join('xxx.log') def initialize # skip_header を指定することで余計なコメントが入らないようにする @logger = Logger.new(ACCESS_LOG_PATH, skip_header: true) @logger.formatter = AppAccessLog::Formatter.new end def log(message) # AppAccessLog::Formatter によって info メソッドにしてもタグはつかない # @see AppAccessLog::Formatter @logger.info(message) end end
class AppAccessLog::Formatter < Logger::Formatter # @override # 1行あたり1つのJSON文字列にするためにフォーマッターを改造 def call(_serverity, _time, _progname, msg) "#{msg}\n" end end
Logger.new の引数に skip_header をつけない場合、ファイル作成時に # Logfile created on xxx by xxx のようなコメント行がついてしまい、取り込み側で考慮が必要になるため必須です。
skip_header については、るりまにも記載されています。
ちなみに、元々は記載がなかったので、私がプルリクエストを出しました 💪
消しやすさを考慮したリファクタリングや実装
旧実装のアクセスログ書き込みは以下のようになっていました。
def access_log_write # ...数十行の実装 end
それを以下のように、旧実装と新実装で実装を一部共通化せずにリファクタ・追加実装することで、削除時に消しやすくなるように工夫しました。
def write_access_log # 最終的にこの行だけ消せば良い write_old_access_log(args) # 削除する場合は単にこのメソッドを削除して、内容を展開するだけで良い write_new_access_log(args) end
p37 アクセスログ検索管理画面実装(インフラ編)
発表資料のp37で触れた検索管理画面のインフラについて補足します。
IRSA(IAM Roles for Service Accounts)について
EKS環境でAthenaにアクセスするためには、適切な権限管理が必要です。ここでIRSAを活用しました。
IRSAについては、詳しくは以下の公式ドキュメントをご参照ください。
ここで実際に付与したIAMロールは以下になります。相当試行錯誤したので、同じような課題にハマっている方いたら参考にしてもらえればと思います。
data "aws_iam_policy_document" "irsa_for_access_log" { statement { sid = "AllowOperateAthena" effect = "Allow" actions = [ "athena:StartQueryExecution", "athena:GetQueryExecution", "athena:GetQueryResults" ] resources = [ aws_athena_workgroup.athena_workgroup.arn, "arn:aws:athena:${local.region}:${local.account_id}:datacatalog/*" ] } statement { sid = "AllowAccessAthenaQueryResultsS3" effect = "Allow" actions = [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:PutObject" ] resources = [ aws_s3_bucket.athena_query_results.arn, "${aws_s3_bucket.athena_query_results.arn}/*" ] } statement { sid = "AllowAccessAthenaDataSourceS3" effect = "Allow" actions = [ "s3:ListBucket", "s3:GetObject" ] resources = [ aws_s3_bucket.access_log_bucket.arn, "${aws_s3_bucket.access_log_bucket.arn}/app_access_log/*", ] } statement { sid = "AllowAccessGlueDataCatalog" effect = "Allow" actions = [ "glue:GetDatabase", "glue:GetTableVersion", "glue:GetTable" ] resources = [ "arn:aws:glue:${local.region}:${local.account_id}:database/database_name", "arn:aws:glue:${local.region}:${local.account_id}:table/table_name/*", "arn:aws:glue:${local.region}:${local.account_id}:catalog" ] } }
Athena のテーブル定義
Athenaのテーブル定義は以下のようにしました。
Parquet形式のデータを読み込めるようにするのと、パーティションの定義を日単位で設定するようにしています。
CREATE EXTERNAL TABLE IF NOT EXISTS database_name.table_name ( uuid string -- ... ) PARTITIONED BY (`datepath` string) STORED AS PARQUET LOCATION 's3://bucket_name/' TBLPROPERTIES ( 'parquet.compression' = 'SNAPPY', 'projection.enabled' = 'true', 'projection.datepath.type' = 'date', 'projection.datepath.range' = '202x/xx/xx,NOW', 'projection.datepath.format' = 'yyyy/MM/dd', 'projection.datepath.interval' = '1', 'projection.datepath.interval.unit' = 'DAYS', 'storage.location.template' = 's3://bucket_name/app_access_log/$${datepath}' );
p39 アクセスログ検索管理画面実装(アプリケーション編)
発表資料のp39で触れたアプリケーション側の実装について補足します。
クエリビルダーの実装
Athenaが採用しているPrestoに対応したSQLを文字列結合で組み立てるのは保守性が低いため、専用のクエリビルダーを実装しました。
管理画面なので、SQLインジェクション対策は最低限で十分なのですが、念の為、文字列のエスケープなどを行なっています。
また、Athenaにはパラメータクエリの機能があります。
パラメータクエリを用いることで、実際の引数の値を後で指定でき、ユーザー入力に対してもより堅牢なSQLの実行を行うことができます。
"SELECT * FROM access_logs WHERE user_name = ?"
これらを考慮した実装は概ね以下のような形で行なっています。
# Ruby 3.2以降のDataクラスを使用 AthenaClient::Query = Data.define(:query_string, :execution_parameters)
# Athena のSQLクエリを組み立てるクエリビルダークラス # 最低限の実装を行っている。`OR` や `NOT` などは現時点で使用してないので未対応 class AthenaClient::QueryBuilder # @param database [String] # @param table [String] def initialize(database:, table:) @database = database @table = table @select_columns = [] @where_conditions = [] @execution_parameters = [] @order_by_columns = [] @limit = nil end # @param columns [String, Array<String>] def select(*columns) @select_columns.concat(columns) self end # @param condition [String] # @param values [String, Array<String>] # @example where('column1 = ?', 'value1') # @example where('column1 >= ?', 'value1') # @example where('column2 IN (?, ?)', ['value1', 'value2']) def where(condition, *values) @where_conditions << condition values.each do |value| @execution_parameters << quote(value) end self end # @param column [String] # @param value [String] # @param match_condition [Symbol] :prefix, :suffix, :full. デフォルト :full. # @example where_like('column1', 'value1', :prefix) def where_like(column, value, match_condition = :full) raise ArgumentError.new("Invalid match condition: #{match_condition}. Use :prefix, :suffix, or :full.") unless %i(prefix suffix full).include?(match_condition) sanitized_value = case match_condition when :prefix "#{ApplicationRecord.sanitize_sql_like(value)}%" when :suffix "%#{ApplicationRecord.sanitize_sql_like(value)}" when :full "%#{ApplicationRecord.sanitize_sql_like(value)}%" end # @note 明示的なエスケープが必要 @where_conditions << "#{column} LIKE ? ESCAPE '\\'" @execution_parameters << quote(sanitized_value) self end # @param column [String] # @param direction [Symbol] :asc or :desc # @raise [ArgumentError] directionが:ascまたは:descでない場合に発生 def order(column, direction = :asc) unless %i(desc asc).include?(direction) raise ArgumentError.new("Invalid order direction: #{direction}. Use :asc or :desc.") end direction_string = direction == :desc ? 'DESC' : 'ASC' @order_by_columns << "#{column} #{direction_string}" self end # @param value [Integer] 返される行数の上限 # @raise [ArgumentError] valueが正の整数でない場合に発生 def limit(value) unless value.is_a?(Integer) && value.positive? raise ArgumentError.new("Limit must be a positive number, got: #{value}") end @limit = value self end # @return [AthenaClient::Query] def to_query select_clause = @select_columns.empty? ? '*' : @select_columns.join(', ') query = "SELECT #{select_clause} FROM #{@database}.#{@table}" query += " WHERE #{@where_conditions.join(' AND ')}" unless @where_conditions.empty? query += " ORDER BY #{@order_by_columns.join(', ')}" unless @order_by_columns.empty? query += " LIMIT #{@limit.to_i}" if @limit AthenaClient::Query.new(query_string: query, execution_parameters: @execution_parameters) end private # Athena は Presto SQL ベースなので、Presto のルールに従ってエスケープする必要がある。 # 基本的な型に対して最低限のクオート処理を用意している。 # 必要に応じて拡張してください。 def quote(value) case value when String "'#{value}'" when nil then "NULL" when Numeric then value.to_s else raise TypeError.new("can't quote #{value.class.name}") end end end
AthenaClientの実装
Athenaとの通信を抽象化するクライアントクラスを実装しました。AWS SDKを使ってAthenaからデータを取得する場合、実行リクエストを行い、実行結果ステータスのポーリングを行い、1000件ずつ取得するという3ステップの実装が必要になります。また返ってきた値を整形する必要があります。これらの処理を AthenaClient クラスとして抽象化することで使いやすく、テストしやすい実装としました。
また設定は AthenaClient::Config として別で切り出しています。
実装は概ね以下のように形で行なっています。
class AthenaClient::Config attr_reader :region, :work_group, :database, :access_log_table, :credentials def initialize(region:, work_group:, database:, access_log_table:, credentials:) @region = region @work_group = work_group @database = database @access_log_table = access_log_table @credentials = credentials end class << self # 環境変数などから値を取得 def build new( region: "xxx", work_group: "xxx", database: "xxx", access_log_table: "xxx", credentials: Aws::AssumeRoleWebIdentityCredentials.new( role_arn: "xxx", web_identity_token_file: "xxx", role_session_name: "xxx" ) ) end end end
# Ruby 3.2以降のDataクラスを使用 AthenaClient::Result = Data.define(:scanned_bytes, :rows)
class AthenaClient class Error < StandardError; end class TimeoutError < Error; end class QueryExecutionError < Error; end # @note `status` は QUEUED, RUNNING, SUCCEEDED, FAILED, CANCELLED の5種類 # @see https://docs.aws.amazon.com/athena/latest/APIReference/API_QueryExecutionStatus.html COMPLETE_STATUSES = %w(SUCCEEDED FAILED CANCELLED).freeze # アプリの性質に合わせて適宜カスタマイズ MAX_RETRY_COUNT = 20 SLEEP_INTERVAL_SEC = 1 # @param config [AthenaClient::Config] def initialize(config:) @config = config @client = Aws::Athena::Client.new( region: config.region, credentials: config.credentials ) end # @param query [AthenaClient::Query] # @return [AthenaClient::Result] def execute!(query:) async_execute_query(query:) .then { await_query_result!(query_execution_id: _1) } .then { to_result(query_execution_output: _1) } end private # @note より安全にSQLを実行するため execution_parameters を用いる def async_execute_query(query:) @client.start_query_execution( query_string: query.query_string, execution_parameters: query.execution_parameters.presence, work_group: @config.work_group ).query_execution_id end def await_query_result!(query_execution_id:) query_execution_output = polling(query_execution_id:) raise_if_query_execution_failed!(query_execution_output:) query_execution_output end def polling(query_execution_id:) 1.upto(MAX_RETRY_COUNT) do query_execution_output = @client.get_query_execution(query_execution_id:) state = query_execution_output.query_execution.status.state return query_execution_output if COMPLETE_STATUSES.include?(state) sleep SLEEP_INTERVAL_SEC end raise TimeoutError.new("Query execution timed out after #{MAX_RETRY_COUNT} attempts") end def raise_if_query_execution_failed!(query_execution_output:) unless query_execution_output.query_execution.status.state == 'SUCCEEDED' error_message = query_execution_output.query_execution.status.state_change_reason raise QueryExecutionError.new("Query execution failed: #{error_message}") end end def to_result(query_execution_output:) AthenaClient::Result.new( scanned_bytes: query_execution_output.query_execution.statistics.data_scanned_in_bytes, rows: get_all_rows(query_execution_output) ) end # @note `get_query_results` は一度にヘッダー行を含めて最大1000件(= 999件のデータ)までしか取得できないので、それ以上のデータは繰り返し取得する # @see https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html def get_all_rows(query_execution_output) all_rows = [] next_token = nil loop do query_result = @client.get_query_results( query_execution_id: query_execution_output.query_execution.query_execution_id, next_token: next_token ) all_rows.concat(query_result.result_set.rows) next_token = query_result.next_token break unless next_token end format_rows(all_rows) end def format_rows(all_rows) headers, *rows = all_rows headers = headers.data.map(&:var_char_value) rows.map do |row| row_data = row.data.map(&:var_char_value) headers.zip(row_data).to_h end end end
p44 データ欠損の原因と対策
発表資料のp44で触れた欠損の原因について、補足します。
Sidecar Containers
Kubernetes 1.29以降 SidecarContainers 機能が導入され、メインコンテナとサイドカーコンテナのライフサイクルを同期させることができるようになりました。
これまでは「メインコンテナより前に起動し、メインコンテナより後に終了する」ことを実現したい場合は自前で実装しなければなりませんでしたが、これをk8sのネイティブ機能として利用できるようになりました。
詳しくは以下の公式ドキュメントを参照してください。
なおサイドカーコンテナがメインコンテナより後に終了することで、Pod全体で見ると起動終了に時間がかかることになります。
そのため terminationGracePeriodSeconds を十分な猶予持たせるようにするなど、各種パラメータも同時に見直す必要があるので、設定の際は注意してください。
CloudWatch エージェントのソースコード・issue調査
AIを活用しながら以下ソースコードを読んでみました。
内部的には telegraf というメトリクス収集・転送エージェントをベースにしていて、なおかつAWS独自の拡張が加えたfork版を使用しているようです。
また、いくつかGraceful Shutdownに関するissueもありました。結果的には今回の問題とは直接関係ありませんでしたが force_flush_interval などの設定を活用しつつ、コンテナの終了時にログ収集から転送までにある程度余裕を持って終了処理を行わない場合、TERMシグナルを適切に送信してもログが欠損する可能性がありそうでした。
今回は後述する、アクセスログを書き込むSidekiqコンテナを、アクセスログ書き込みが終わってから十分な猶予を持って終了するようにしたことで対応できましたが、もし同様の問題に遭遇した方がいたら参考にしてみてください。
p45~46 Sidekiq コンテナの安全な終了
発表資料のp45〜46で触れた、Sidekiqコンテナの安全な終了について解説します。
Sidekiq の Graceful Shutdown
Sidekiqには、実行中のジョブを完了させてから終了するGraceful Shutdownの仕組みがあります。
TSTPシグナルを送ると新規ジョブの受付を停止します。TERMシグナルを送るとTSTPシグナルと同様に新規ジョブの受付を終了しつつ、実行中のジョブが完了するかタイムアウト時間(デフォルト25s)後にプロセスが終了します。
Use TSTP + TERM to guarantee shutdown within a time period. Best practice is to send TSTP at the start of a deploy and TERM at the end of a deploy.
と記述があるように、ベストプラクティスとしては、デプロイ開始時にTSTPシグナルを送り、デプロイ終了時にTERMシグナルを送ることが推奨されています。
k8s上では、マニフェストを以下のように記述することで、Podの終了前にTSTPシグナルを送ることができます。念のため一定時間sleepを入れることもできます。なお、TERMシグナルについてはk8sが自動的に送信してくれるため、特に設定は不要です。
lifecycle: preStop: exec: command: - /bin/sh - -c - kill -TSTP $(ps aux | grep sidekiq | grep busy | awk '{ print $2 }'); sleep 40
exec コマンドの重要性
exec コマンドを用いることで、新たに子プロセスを作成せず、現在のプロセスでコマンドを実行できます。
これによって、シェルスクリプトがプロセスをラップしなくなるため、TERMシグナルが正しく伝達されるようになります。
exec を用いない場合
言葉で説明するとわかりづらいのでサンプルコードで説明します。
以下のようなシェルスクリプト sample.sh とRubyスクリプト sample.rb を用意します。
まず exec をつけない場合の挙動を確認します。
#!/bin/bash # sample.sh echo 'start' # exec を用いて起動していない! bundle exec ruby sample.rb
# sample.rb # SIGTERM が送られてくるまで待つ。送られてきたら "hi!" と返して継続する。 Signal.trap(:TERM) do puts "hi!" end loop do sleep 1 end
sample.sh を実行します。
$ chmod +x sample.sh $ ./sample.sh start
別のターミナルで ps コマンドを実行するとシェルスクリプトとRubyスクリプトで、それぞれPIDが分かれていることが分かります。ここで親の sample.sh プロセスに対してTERMシグナルを送信してみます。
$ ps | grep sample | grep -v grep 77781 ttys014 0:00.01 /bin/bash ./sample.sh 77782 ttys014 0:00.41 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb $ kill -TERM 77781
すると sample.sh プロセスは終了しますが、sample.rb プロセスにはTERMシグナルが届いておらず、そのまま動き続けてしまいます。
$ ./sample.sh start zsh: terminated ./sample.sh $ ps | grep sample | grep -v grep 77782 ttys014 0:00.42 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb
exec を用いた場合
これを防ぐために、exec を用いてシェルスクリプトを以下のように修正します。
#!/bin/bash # sample2.sh echo 'start' # exec を用いて起動する exec bundle exec ruby sample.rb
sample2.sh を実行します。
$ chmod +x sample2.sh $ ./sample2.sh start
別のターミナルで ps コマンドを実行すると、Rubyスクリプトのみがプロセスとして存在していることが分かります。ここでTERMシグナルを送信してみます。
$ ps | grep sample | grep -v grep 55831 ttys014 0:00.46 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb $ kill -TERM 55831
そうすると、今度はRubyスクリプトにTERMシグナルが届き、"hi!" と表示されます。Rubyスクリプトに対してTERMシグナルが正しく伝達されていることが分かります。
$ ./sample2.sh
start
hi!
このように exec を用いることで、シグナルが正しく伝達されるようになります。
k8s上のマニフェストの記述方法との関連
コンテナの起動をシェルスクリプト経由で行っている場合は、exec を用いないと、k8sから送信されるTERMシグナルがアプリケーションプロセスに届かないため危険です。
spec: containers: - name: batch image: batch-image command: - /scripts/entrypoint_batch.sh
#!/bin/bash # exec 必須 exec bundle exec sidekiq ...
ちなみに、以下のように、command で直接アプリケーションを起動している場合は exec は不要です。
spec: containers: - name: batch image: batch-image command: [ 'sh', '-c', 'bundle exec sidekiq ...' ]
まとめ
SRE Kaigi 2026での発表内容を、技術的詳細を中心に補足しました。
月間数億レコード規模のアクセスログ基盤の移行は、技術的にもチャレンジングなプロジェクトでしたが、一つ一つ課題を解決していくことで、無停止かつ低コストでの移行を実現できました。
また、SREのロールではない私がこのプロジェクトに挑戦できたのは、組織を横断して協力してくれた多くのメンバーのおかげです。
改めて、SRE Kaigi 2026で発表の機会をいただけたこと、そして福岡から東京への参加を快く了承してくれた福岡開発拠点の方々に感謝します!