Money Forward Developers Blog

株式会社マネーフォワード公式開発者向けブログです。技術や開発手法、イベント登壇などを発信します。サービスに関するご質問は、各サービス窓口までご連絡ください。

20230215130734

SRE Kaigi 2026 「月間数億レコードのアクセスログ基盤を無停止・低コストでAWS移行せよ!アプリケーションエンジニアのSREチャレンジ💪」の発表内容と補足

こんにちは、クラウド経費 テックリードの宮村(みやむー)@miyamura.koyo です。

2026年1月31日に開催されるSRE Kaigi 2026で、「月間数億レコードのアクセスログ基盤を無停止・低コストでAWS移行せよ!アプリケーションエンジニアのSREチャレンジ💪」というタイトルで発表させていただきます!

2026.srekaigi.net

発表では30分という時間制限もあり、技術的な詳細を全てお伝えできないため、テックブログを執筆しました。

本記事では、発表では説明しきれない実装詳細を中心に補足していきます。

なお紹介しているサンプルコードについては、実際のシステムで使われているコードとは一部異なるため、参考にする際は適宜読み替えてください。

発表資料

当日の発表資料はこちらです。

p34 アクセスログ書き込み実装(インフラ編)

発表資料のp34で触れたアクセスログ書き込み実装について、より詳しく補足します。

Firehose + Glue の設定

FirehoseでAWS Glueを用いてParquet + Snappy圧縮をするためのTerraform定義は以下です。

resource "aws_glue_catalog_database" "firehose_glue_db" {
  name  = "xxx"
}

resource "aws_glue_catalog_table" "firehose_glue_table" {
  name          = "xxx"
  database_name = aws_glue_catalog_database.firehose_glue_db.name

  # ここにスキーマを定義する
  storage_descriptor {
    columns {
      name = "uuid"
      type = "string"
    }

    # ...
  }
}
resource "aws_kinesis_firehose_delivery_stream" "log_transfer_fhstream" {
  # ...

  data_format_conversion_configuration {
    enabled = true

    # 入力は JSON
    input_format_configuration {
      deserializer {
        open_x_json_ser_de {}
      }
    }
    # 出力は Parquet + Snappy
    output_format_configuration {
      serializer {
        parquet_ser_de {
          compression = "SNAPPY"
        }
      }
    }
    schema_configuration {
      database_name = aws_glue_catalog_database.firehose_glue_db.name
      table_name    = aws_glue_catalog_table.firehose_glue_table.name
      role_arn      = "xxx"
    }
  }
}

詳しくはTerraformの公式ドキュメントを参照してください。

ちなみにAWS Glueクローラーでテーブル定義を自動生成する方法もありますが、今回はアクセスログのスキーマが固定なので、手動でGlueテーブル定義をすることにしました。

ステージング環境のみリソースを定義する

Terraformでステージング(以下stg)環境のみリソースを定義する場合は count が使用できます。

resource "aws_s3_bucket" "stg_only" {
  # environment が "stg" の場合のみ 1つ作成、それ以外は 0 (作成しない)
  count = var.environment == "stg" ? 1 : 0

  # ...
}

この場合、参照する際に aws_s3_bucket.stg_only[0] のように、配列形式でアクセスが必要になるので、注意です。

cwagentconfig の設定

CloudWatchエージェントの設定ファイル cwagentconfig で特定のログファイルを監視する設定は以下です。

{
  "logs": {
    "logs_collected": {
      "files": {
        "collect_list": [
          {
            "file_path": "xxx.log",
            "log_group_name": "xxx-loggroup",
            "log_stream_name": "xxx-logstream",
            "timestamp_format": "xxx"
          }
        ]
      }
    }
  }
}

ちなみにp44の force_flush_interval の設定を行う場合は以下のように記述します。

{
  "logs": {
    // デフォルトは5s
    "force_flush_interval": 1,
    "logs_collected": {
      // ...
    }
  }
}

詳しくは以下の公式ドキュメントを参照してください。

docs.aws.amazon.com

p35 アクセスログ書き込み実装(アプリケーション編)

発表資料のp35で触れたアプリケーション側の実装について補足します。

AppAccessLog::Logger の実装

アクセスログを出力するための専用のLoggerクラスを実装しました。 1行あたり1つのJSON文字列にするために、カスタムフォーマッターを自作するなど、工夫しています。

require 'logger'
require 'singleton'

class AppAccessLog::Logger
  # インスタンス生成コストを下げるためのシングルトン
  include Singleton

  ACCESS_LOG_PATH = Rails.root.join('xxx.log')

  def initialize
    # skip_header を指定することで余計なコメントが入らないようにする
    @logger = Logger.new(ACCESS_LOG_PATH, skip_header: true)
    @logger.formatter = AppAccessLog::Formatter.new
  end

  def log(message)
    # AppAccessLog::Formatter によって info メソッドにしてもタグはつかない
    # @see AppAccessLog::Formatter
    @logger.info(message)
  end
end
class AppAccessLog::Formatter < Logger::Formatter
  # @override
  # 1行あたり1つのJSON文字列にするためにフォーマッターを改造
  def call(_serverity, _time, _progname, msg)
    "#{msg}\n"
  end
end

Logger.new の引数に skip_header をつけない場合、ファイル作成時に # Logfile created on xxx by xxx のようなコメント行がついてしまい、取り込み側で考慮が必要になるため必須です。

skip_header については、るりまにも記載されています。

docs.ruby-lang.org

ちなみに、元々は記載がなかったので、私がプルリクエストを出しました 💪

github.com

消しやすさを考慮したリファクタリングや実装

旧実装のアクセスログ書き込みは以下のようになっていました。

def access_log_write
# ...数十行の実装
end

それを以下のように、旧実装と新実装で実装を一部共通化せずにリファクタ・追加実装することで、削除時に消しやすくなるように工夫しました。

def write_access_log
  # 最終的にこの行だけ消せば良い
  write_old_access_log(args)
  # 削除する場合は単にこのメソッドを削除して、内容を展開するだけで良い
  write_new_access_log(args)
end

p37 アクセスログ検索管理画面実装(インフラ編)

発表資料のp37で触れた検索管理画面のインフラについて補足します。

IRSA(IAM Roles for Service Accounts)について

EKS環境でAthenaにアクセスするためには、適切な権限管理が必要です。ここでIRSAを活用しました。

IRSAについては、詳しくは以下の公式ドキュメントをご参照ください。

docs.aws.amazon.com

ここで実際に付与したIAMロールは以下になります。相当試行錯誤したので、同じような課題にハマっている方いたら参考にしてもらえればと思います。

data "aws_iam_policy_document" "irsa_for_access_log" {
  statement {
    sid    = "AllowOperateAthena"
    effect = "Allow"
    actions = [
      "athena:StartQueryExecution",
      "athena:GetQueryExecution",
      "athena:GetQueryResults"
    ]
    resources = [
      aws_athena_workgroup.athena_workgroup.arn,
      "arn:aws:athena:${local.region}:${local.account_id}:datacatalog/*"
    ]
  }

  statement {
    sid    = "AllowAccessAthenaQueryResultsS3"
    effect = "Allow"
    actions = [
      "s3:GetBucketLocation",
      "s3:GetObject",
      "s3:ListBucket",
      "s3:PutObject"
    ]
    resources = [
      aws_s3_bucket.athena_query_results.arn,
      "${aws_s3_bucket.athena_query_results.arn}/*"
    ]
  }

  statement {
    sid    = "AllowAccessAthenaDataSourceS3"
    effect = "Allow"
    actions = [
      "s3:ListBucket",
      "s3:GetObject"
    ]
    resources = [
      aws_s3_bucket.access_log_bucket.arn,
      "${aws_s3_bucket.access_log_bucket.arn}/app_access_log/*",
    ]
  }

  statement {
    sid    = "AllowAccessGlueDataCatalog"
    effect = "Allow"
    actions = [
      "glue:GetDatabase",
      "glue:GetTableVersion",
      "glue:GetTable"
    ]
    resources = [
      "arn:aws:glue:${local.region}:${local.account_id}:database/database_name",
      "arn:aws:glue:${local.region}:${local.account_id}:table/table_name/*",
      "arn:aws:glue:${local.region}:${local.account_id}:catalog"
    ]
  }
}

Athena のテーブル定義

Athenaのテーブル定義は以下のようにしました。

Parquet形式のデータを読み込めるようにするのと、パーティションの定義を日単位で設定するようにしています。

CREATE EXTERNAL TABLE IF NOT EXISTS database_name.table_name (
  uuid string
  -- ...
)
PARTITIONED BY (`datepath` string)
STORED AS PARQUET
LOCATION 's3://bucket_name/'
TBLPROPERTIES (
  'parquet.compression' = 'SNAPPY',
  'projection.enabled' = 'true',
  'projection.datepath.type' = 'date',
  'projection.datepath.range' = '202x/xx/xx,NOW',
  'projection.datepath.format' = 'yyyy/MM/dd',
  'projection.datepath.interval' = '1',
  'projection.datepath.interval.unit' = 'DAYS',
  'storage.location.template' = 's3://bucket_name/app_access_log/$${datepath}'
);

p39 アクセスログ検索管理画面実装(アプリケーション編)

発表資料のp39で触れたアプリケーション側の実装について補足します。

クエリビルダーの実装

Athenaが採用しているPrestoに対応したSQLを文字列結合で組み立てるのは保守性が低いため、専用のクエリビルダーを実装しました。

管理画面なので、SQLインジェクション対策は最低限で十分なのですが、念の為、文字列のエスケープなどを行なっています。

また、Athenaにはパラメータクエリの機能があります。

docs.aws.amazon.com

パラメータクエリを用いることで、実際の引数の値を後で指定でき、ユーザー入力に対してもより堅牢なSQLの実行を行うことができます。

"SELECT * FROM access_logs WHERE user_name = ?"

これらを考慮した実装は概ね以下のような形で行なっています。

# Ruby 3.2以降のDataクラスを使用
AthenaClient::Query = Data.define(:query_string, :execution_parameters)
# Athena のSQLクエリを組み立てるクエリビルダークラス
# 最低限の実装を行っている。`OR` や `NOT` などは現時点で使用してないので未対応
class AthenaClient::QueryBuilder
  # @param database [String]
  # @param table [String]
  def initialize(database:, table:)
    @database = database
    @table = table
    @select_columns = []
    @where_conditions = []
    @execution_parameters = []
    @order_by_columns = []
    @limit = nil
  end

  # @param columns [String, Array<String>]
  def select(*columns)
    @select_columns.concat(columns)

    self
  end

  # @param condition [String]
  # @param values [String, Array<String>]
  # @example where('column1 = ?', 'value1')
  # @example where('column1 >= ?', 'value1')
  # @example where('column2 IN (?, ?)', ['value1', 'value2'])
  def where(condition, *values)
    @where_conditions << condition

    values.each do |value|
      @execution_parameters << quote(value)
    end

    self
  end

  # @param column [String]
  # @param value [String]
  # @param match_condition [Symbol] :prefix, :suffix, :full. デフォルト :full.
  # @example where_like('column1', 'value1', :prefix)
  def where_like(column, value, match_condition = :full)
    raise ArgumentError.new("Invalid match condition: #{match_condition}. Use :prefix, :suffix, or :full.") unless %i(prefix suffix full).include?(match_condition)

    sanitized_value = case match_condition
                      when :prefix
                        "#{ApplicationRecord.sanitize_sql_like(value)}%"
                      when :suffix
                        "%#{ApplicationRecord.sanitize_sql_like(value)}"
                      when :full
                        "%#{ApplicationRecord.sanitize_sql_like(value)}%"
                      end

    # @note 明示的なエスケープが必要
    @where_conditions << "#{column} LIKE ? ESCAPE '\\'"

    @execution_parameters << quote(sanitized_value)

    self
  end

  # @param column [String]
  # @param direction [Symbol] :asc or :desc
  # @raise [ArgumentError] directionが:ascまたは:descでない場合に発生
  def order(column, direction = :asc)
    unless %i(desc asc).include?(direction)
      raise ArgumentError.new("Invalid order direction: #{direction}. Use :asc or :desc.")
    end

    direction_string = direction == :desc ? 'DESC' : 'ASC'
    @order_by_columns << "#{column} #{direction_string}"

    self
  end

  # @param value [Integer] 返される行数の上限
  # @raise [ArgumentError] valueが正の整数でない場合に発生
  def limit(value)
    unless value.is_a?(Integer) && value.positive?
      raise ArgumentError.new("Limit must be a positive number, got: #{value}")
    end

    @limit = value

    self
  end

  # @return [AthenaClient::Query]
  def to_query
    select_clause = @select_columns.empty? ? '*' : @select_columns.join(', ')

    query = "SELECT #{select_clause} FROM #{@database}.#{@table}"
    query += " WHERE #{@where_conditions.join(' AND ')}" unless @where_conditions.empty?
    query += " ORDER BY #{@order_by_columns.join(', ')}" unless @order_by_columns.empty?
    query += " LIMIT #{@limit.to_i}" if @limit

    AthenaClient::Query.new(query_string: query, execution_parameters: @execution_parameters)
  end

  private

  # Athena は Presto SQL ベースなので、Presto のルールに従ってエスケープする必要がある。
  # 基本的な型に対して最低限のクオート処理を用意している。
  # 必要に応じて拡張してください。
  def quote(value)
    case value
    when String
      "'#{value}'"
    when nil     then "NULL"
    when Numeric then value.to_s
    else raise TypeError.new("can't quote #{value.class.name}")
    end
  end
end

AthenaClientの実装

Athenaとの通信を抽象化するクライアントクラスを実装しました。AWS SDKを使ってAthenaからデータを取得する場合、実行リクエストを行い、実行結果ステータスのポーリングを行い、1000件ずつ取得するという3ステップの実装が必要になります。また返ってきた値を整形する必要があります。これらの処理を AthenaClient クラスとして抽象化することで使いやすく、テストしやすい実装としました。

また設定は AthenaClient::Config として別で切り出しています。

実装は概ね以下のように形で行なっています。

class AthenaClient::Config
  attr_reader :region, :work_group, :database, :access_log_table, :credentials

  def initialize(region:, work_group:, database:, access_log_table:, credentials:)
    @region = region
    @work_group = work_group
    @database = database
    @access_log_table = access_log_table
    @credentials = credentials
  end

  class << self
    # 環境変数などから値を取得
    def build
      new(
        region: "xxx",
        work_group: "xxx",
        database: "xxx",
        access_log_table: "xxx",
        credentials: Aws::AssumeRoleWebIdentityCredentials.new(
          role_arn: "xxx",
          web_identity_token_file: "xxx",
          role_session_name: "xxx"
        )
      )
    end
  end
end
# Ruby 3.2以降のDataクラスを使用
AthenaClient::Result = Data.define(:scanned_bytes, :rows)
class AthenaClient
  class Error < StandardError; end
  class TimeoutError < Error; end
  class QueryExecutionError < Error; end

  # @note `status` は QUEUED, RUNNING, SUCCEEDED, FAILED, CANCELLED の5種類
  # @see https://docs.aws.amazon.com/athena/latest/APIReference/API_QueryExecutionStatus.html
  COMPLETE_STATUSES = %w(SUCCEEDED FAILED CANCELLED).freeze
  # アプリの性質に合わせて適宜カスタマイズ
  MAX_RETRY_COUNT = 20
  SLEEP_INTERVAL_SEC = 1

  # @param config [AthenaClient::Config]
  def initialize(config:)
    @config = config
    @client = Aws::Athena::Client.new(
      region: config.region,
      credentials: config.credentials
    )
  end

  # @param query [AthenaClient::Query]
  # @return [AthenaClient::Result]
  def execute!(query:)
    async_execute_query(query:)
    .then { await_query_result!(query_execution_id: _1) }
    .then { to_result(query_execution_output: _1) }
  end

  private

  # @note より安全にSQLを実行するため execution_parameters を用いる
  def async_execute_query(query:)
    @client.start_query_execution(
      query_string: query.query_string,
      execution_parameters: query.execution_parameters.presence,
      work_group: @config.work_group
    ).query_execution_id
  end

  def await_query_result!(query_execution_id:)
    query_execution_output = polling(query_execution_id:)

    raise_if_query_execution_failed!(query_execution_output:)

    query_execution_output
  end

  def polling(query_execution_id:)
    1.upto(MAX_RETRY_COUNT) do
      query_execution_output = @client.get_query_execution(query_execution_id:)
      state = query_execution_output.query_execution.status.state

      return query_execution_output if COMPLETE_STATUSES.include?(state)

      sleep SLEEP_INTERVAL_SEC
    end

    raise TimeoutError.new("Query execution timed out after #{MAX_RETRY_COUNT} attempts")
  end

  def raise_if_query_execution_failed!(query_execution_output:)
    unless query_execution_output.query_execution.status.state == 'SUCCEEDED'
      error_message = query_execution_output.query_execution.status.state_change_reason

      raise QueryExecutionError.new("Query execution failed: #{error_message}")
    end
  end

  def to_result(query_execution_output:)
    AthenaClient::Result.new(
      scanned_bytes: query_execution_output.query_execution.statistics.data_scanned_in_bytes,
      rows: get_all_rows(query_execution_output)
    )
  end

  # @note `get_query_results` は一度にヘッダー行を含めて最大1000件(= 999件のデータ)までしか取得できないので、それ以上のデータは繰り返し取得する
  # @see https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html
  def get_all_rows(query_execution_output)
    all_rows = []
    next_token = nil

    loop do
      query_result = @client.get_query_results(
        query_execution_id: query_execution_output.query_execution.query_execution_id,
        next_token: next_token
      )

      all_rows.concat(query_result.result_set.rows)
      next_token = query_result.next_token

      break unless next_token
    end

    format_rows(all_rows)
  end

  def format_rows(all_rows)
    headers, *rows = all_rows
    headers = headers.data.map(&:var_char_value)
    rows.map do |row|
      row_data = row.data.map(&:var_char_value)
      headers.zip(row_data).to_h
    end
  end
end

p44 データ欠損の原因と対策

発表資料のp44で触れた欠損の原因について、補足します。

Sidecar Containers

Kubernetes 1.29以降 SidecarContainers 機能が導入され、メインコンテナとサイドカーコンテナのライフサイクルを同期させることができるようになりました。 これまでは「メインコンテナより前に起動し、メインコンテナより後に終了する」ことを実現したい場合は自前で実装しなければなりませんでしたが、これをk8sのネイティブ機能として利用できるようになりました。

詳しくは以下の公式ドキュメントを参照してください。

kubernetes.io

kubernetes.io

なおサイドカーコンテナがメインコンテナより後に終了することで、Pod全体で見ると起動終了に時間がかかることになります。 そのため terminationGracePeriodSeconds を十分な猶予持たせるようにするなど、各種パラメータも同時に見直す必要があるので、設定の際は注意してください。

CloudWatch エージェントのソースコード・issue調査

AIを活用しながら以下ソースコードを読んでみました。

github.com

内部的には telegraf というメトリクス収集・転送エージェントをベースにしていて、なおかつAWS独自の拡張が加えたfork版を使用しているようです。

また、いくつかGraceful Shutdownに関するissueもありました。結果的には今回の問題とは直接関係ありませんでしたが force_flush_interval などの設定を活用しつつ、コンテナの終了時にログ収集から転送までにある程度余裕を持って終了処理を行わない場合、TERMシグナルを適切に送信してもログが欠損する可能性がありそうでした。

今回は後述する、アクセスログを書き込むSidekiqコンテナを、アクセスログ書き込みが終わってから十分な猶予を持って終了するようにしたことで対応できましたが、もし同様の問題に遭遇した方がいたら参考にしてみてください。

github.com

github.com

p45~46 Sidekiq コンテナの安全な終了

発表資料のp45〜46で触れた、Sidekiqコンテナの安全な終了について解説します。

Sidekiq の Graceful Shutdown

Sidekiqには、実行中のジョブを完了させてから終了するGraceful Shutdownの仕組みがあります。

TSTPシグナルを送ると新規ジョブの受付を停止します。TERMシグナルを送るとTSTPシグナルと同様に新規ジョブの受付を終了しつつ、実行中のジョブが完了するかタイムアウト時間(デフォルト25s)後にプロセスが終了します。

github.com

Use TSTP + TERM to guarantee shutdown within a time period. Best practice is to send TSTP at the start of a deploy and TERM at the end of a deploy.

と記述があるように、ベストプラクティスとしては、デプロイ開始時にTSTPシグナルを送り、デプロイ終了時にTERMシグナルを送ることが推奨されています。

k8s上では、マニフェストを以下のように記述することで、Podの終了前にTSTPシグナルを送ることができます。念のため一定時間sleepを入れることもできます。なお、TERMシグナルについてはk8sが自動的に送信してくれるため、特に設定は不要です。

lifecycle:
  preStop:
    exec:
      command:
      - /bin/sh
      - -c
      - kill -TSTP $(ps aux | grep sidekiq | grep busy | awk '{ print $2 }'); sleep 40

exec コマンドの重要性

exec コマンドを用いることで、新たに子プロセスを作成せず、現在のプロセスでコマンドを実行できます。 これによって、シェルスクリプトがプロセスをラップしなくなるため、TERMシグナルが正しく伝達されるようになります。

exec を用いない場合

言葉で説明するとわかりづらいのでサンプルコードで説明します。 以下のようなシェルスクリプト sample.sh とRubyスクリプト sample.rb を用意します。

まず exec をつけない場合の挙動を確認します。

#!/bin/bash
# sample.sh
echo 'start'
# exec を用いて起動していない!
bundle exec ruby sample.rb
# sample.rb
# SIGTERM が送られてくるまで待つ。送られてきたら "hi!" と返して継続する。
Signal.trap(:TERM) do
  puts "hi!"
end

loop do
  sleep 1
end

sample.sh を実行します。

$ chmod +x sample.sh
$ ./sample.sh
start

別のターミナルで ps コマンドを実行するとシェルスクリプトとRubyスクリプトで、それぞれPIDが分かれていることが分かります。ここで親の sample.sh プロセスに対してTERMシグナルを送信してみます。

$ ps | grep sample | grep -v grep
77781 ttys014    0:00.01 /bin/bash ./sample.sh
77782 ttys014    0:00.41 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb

$ kill -TERM 77781

すると sample.sh プロセスは終了しますが、sample.rb プロセスにはTERMシグナルが届いておらず、そのまま動き続けてしまいます。

$ ./sample.sh
start
zsh: terminated  ./sample.sh

$ ps | grep sample | grep -v grep
77782 ttys014    0:00.42 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb

exec を用いた場合

これを防ぐために、exec を用いてシェルスクリプトを以下のように修正します。

#!/bin/bash
# sample2.sh
echo 'start'
# exec を用いて起動する
exec bundle exec ruby sample.rb

sample2.sh を実行します。

$ chmod +x sample2.sh
$ ./sample2.sh
start

別のターミナルで ps コマンドを実行すると、Rubyスクリプトのみがプロセスとして存在していることが分かります。ここでTERMシグナルを送信してみます。

$ ps | grep sample | grep -v grep
55831 ttys014    0:00.46 /Users/miyamura.koyo/.rbenv/versions/3.3.8/bin/ruby sample.rb
$ kill -TERM 55831

そうすると、今度はRubyスクリプトにTERMシグナルが届き、"hi!" と表示されます。Rubyスクリプトに対してTERMシグナルが正しく伝達されていることが分かります。

$ ./sample2.sh
start
hi!

このように exec を用いることで、シグナルが正しく伝達されるようになります。

k8s上のマニフェストの記述方法との関連

コンテナの起動をシェルスクリプト経由で行っている場合は、exec を用いないと、k8sから送信されるTERMシグナルがアプリケーションプロセスに届かないため危険です。

spec:
  containers:
  - name: batch
    image: batch-image
    command:
    - /scripts/entrypoint_batch.sh
#!/bin/bash

# exec 必須
exec bundle exec sidekiq ...

ちなみに、以下のように、command で直接アプリケーションを起動している場合は exec は不要です。

spec:
  containers:
  - name: batch
    image: batch-image
    command: [
      'sh',
      '-c',
      'bundle exec sidekiq ...'
    ]

まとめ

SRE Kaigi 2026での発表内容を、技術的詳細を中心に補足しました。

月間数億レコード規模のアクセスログ基盤の移行は、技術的にもチャレンジングなプロジェクトでしたが、一つ一つ課題を解決していくことで、無停止かつ低コストでの移行を実現できました。

また、SREのロールではない私がこのプロジェクトに挑戦できたのは、組織を横断して協力してくれた多くのメンバーのおかげです。

改めて、SRE Kaigi 2026で発表の機会をいただけたこと、そして福岡から東京への参加を快く了承してくれた福岡開発拠点の方々に感謝します!