MoneyForward Developers Blog

株式会社マネーフォワード公式開発者向けブログです。技術や開発手法、イベント登壇などを発信します。サービスに関するご質問は、各サービス窓口までご連絡ください。

dev

内製データカタログによる分析基盤の改善〜ミニマムなメタデータ管理で分析を前に進めよう〜

この記事は、Money Forward Engineering 1 Advent Calendar 2022 7日目の投稿です。

6日目は koyoさんで i18n YAML ファイル中の日本語/英語のズレを検知する単体テスト でした。

本日は私が「メタデータ管理の最初の一歩」について書いていきたいと思います。


はじめに

初めまして、CTO室分析基盤部のnakamoriです。

私は22新卒でこの分析基盤部に配属されてから半年、データ分析基盤の開発と運用を行っています。

今回は入社して一番大きなプロジェクトだったメタデータ管理についてお話ししようと思います。

この記事が想定する読者

  • 分析基盤であるDWH(データウェアハウス)にデータが蓄積されてきたが、メタデータ管理をまだ行っていない人
  • データカタログツールを導入しようか迷っている人

抱えていた課題

弊社マネーフォワードは40を超えるサービスを抱えるSaaSの会社であり、主に複数のサービスをまたぐ分析で以下の問題が発生していました。

分析時に仕様確認に時間がかかってしまう

皆さんの分析チームは分析する際に、分析したい項目はどのテーブルのどのカラムに格納されているかがわかる状態になっているでしょうか?

弊社は長らく中央集権的なデータ管理を行うことでデータの管理や分析を行いやすくしてきました。詳しくは以下の記事をご覧ください。

#4 マネフォが実践する「MF on SSOT」な分析基盤の構成

このような仕組みの中では、データを整備・分析する人にプロダクトのドメイン知識が欠けてしまうことがあり、分析を推進する上で問題となっていました。

例えば、 is_active などの抽象的なネーミングはカラム名からユーザーが有効化されたかどうかを示していることがわかったとしても、どの条件を満たせばユーザーが有効化されるのかまで示されないと分析には利用できません。

特に、サービスごとに定義が違っていたりするものもあり、現在も調査に手間や時間がかかってしまうこともあります。

調査結果の保存方法が確立していない

仮に調査でデータに関しての情報がわかったとしても、そのデータが格納する場所が今までは用意されていませんでした。

弊社では情報共有ツールとしてkibelaを利用しているのでkibelaに資料を投稿する人が多く、人によってGoogle スプレッドシートGoogle ドキュメントを使ったりしていました。

ですが、資料が共有される環境が整っていないため、別の部署の人が同じデータを分析する際はまた調査から始めなければなりませんでした。

テーブル数が膨大なため、そのままの状態ではデータエンジニアが一元管理するには難しく仕組み作りの部分でも難航していました。

結果

最終的には以下の取り組みに落ち着きました

  • データカタログツールを導入せずに内製化した
  • 入力する情報はビジネスメタデータのみ
    • データ生成プロセスなどのテクニカルメタデータは簡素化のためスコープ外とした
  • 最初のメタデータの回収は2種類に絞った
    • 放置されていたメタデータ管理ツールをスクレイピングしたもの
    • セキュアデータ基盤とカジュアルデータ基盤間のパイプライン設定時に用意する仕様書

今回紹介するものの構成図

主にGitHubGCPを用いて作られており、社内では「Subako」と呼ばれております。

構成図

解決策

システムとしては、GitHubメタデータを蓄積しそこからGCSからCloud Functionsに送り、BQのSchemaを取得し書き換えます。

GCSにファイルの変更があるとそれをトリガーにCloud Functionsを実行できるため、日次で全ファイルをGCSに送り直すことでSubakoを実現しています。

Cloud Functionsの世代ですが、最初は1世代を利用していましたがテーブル数が増えていき要求される同時実行数が増えてきたので2世代目を利用するようにしました。

GitHubにデータを蓄積する理由としては以下があります。

  • BigQueryのみに格納してしまうとメタデータが取り出しづらくなってしまう
  • Cloud Composerで作成しているテーブルは更新時に説明が消えてしまうため、定期的に書き込めるように外部に保管する必要があった
  • GitHubの編集システムを利用できる
    • 変更履歴が見やすい
    • 分析基盤部の承認フローがあるおかげで書き込みにある程度の品質が担保される

実装イメージ

ファイル構成

こちらがメインのフォルダ構造のイメージになります。

metadata
  ┣ projectA
  ┃  ┣ dataset1
  ┃  ┃  ┣ dataset_description.yaml
  ┃  ┃  ┣ hugahugatable1.yaml
  ┃  ┃  ┗ hugahugatable2.yaml
  ┃  ┣ dataset2
  ┃  ┃  ┗ dataset_description.yaml
  ┃  ┗ dataset3
  ┃     ┗ dataset_description.yaml
  ┃
  ┣ projectB
  ┃  ┗ dataset1
  ┃     ┣ dataset_description.yaml
  ┃     ┗ hogehogetable1.yaml
  .
  .
  .

project → dataset → table の構造にすることでData Lakeをそのまま反映した構造をとることができます。

また、各datasetフォルダでdetaset_description.yamlというものを持つようにしており、このファイルはdatasetの説明を反映しています。

各フォルダの中身はこうなっています。ファイルは2種類あります。

dataset_description.yaml
# ファイル名はそのままでお願いします
project: プロジェクト名
dataset: データセット名
labels:
  timezone: #jst or utc 
dataset_description: |-
  こちらにデータセットに関する説明をお願いします。 
  改行が利用できます。
  
  詳細を記載した記事:
  連絡用チャンネル:
  メンション先:
${table_name}.yaml
# ファイル名をテーブル名に変更お願いします
project: プロジェクト名
dataset: データセット名
table: テーブル名
table_description: |-
  テーブルの説明をお願いします。
  改行が反映されます。
columns:
  # カラムの説明に改行は反映されません
  - column: カラム名その1
    description: |-
      カラムその1に対する説明
  - column: カラム名その2
    description: |-
      カラムその2に対する説明

YAMLの改行の記号を|-としているのは文末の改行を消すためです。もし静的サイトジェネレータなどを使ってwebサイトで見られるようにしたときに改行が邪魔になると思って消しています。

最初はいろいろな要素を盛り込もうとしていましたが、最終的にデータの説明、いわゆるビジネスメタデータのみを書き込むことにしました。

また、現在はlabelの書き込みにも反映できるようになっており、これによりtableの性質を瞬時に把握することもできます。

データが入った時のイメージ

コード部分

今回はCloud Functionsのmain.pyのみ掲載します。

行っていることは3つです。

  • BigQuery API Clientを用いて対象のテーブルのSchemaを取る
  • GCPに格納してあるyamlと比較してdescriptionがあれば埋める
  • update_table or update_datasetでテーブルのSchemaを変更する
def send_schema(event, context):
    """Triggered by cloud storage.
        Args:
            event (dict): Event payload and .
            context (google.cloud.functions.Context): Metadata for the event.
    """
    
    str_footer = \
        "\n" + "created by Subako" + "\n" + \
        "更新日時:" + str(datetime.datetime.now())

    event_type = context.event_type

    bucket_name = event['bucket']
    file_name = event['name']

    if event_type == "google.storage.object.finalize":
        client = storage.Client()
        bucket = client.get_bucket(bucket_name)
        blob_yaml = bucket.blob(file_name)

        obj = yaml.safe_load(blob_yaml.download_as_string())
        client = bigquery.Client()

        project_id = obj["project"]
        dataset_id = obj["dataset"]
        if "dataset_description" in obj:
            dataset_description = obj["dataset_description"]
            dataset_address = f"{project_id}.{dataset_id}"

            # Make an API request.
            dataset_bigquery = client.get_dataset(dataset_address)
            dataset_bigquery.description = dataset_description + str_footer
            if "labels" in obj:
                dataset_bigquery.labels = obj["labels"]
                client.update_dataset(dataset_bigquery, ["description", "labels"])
            else:
                client.update_dataset(dataset_bigquery, ["description"])

            print(f"Dataset {dataset_address} updated.")
        else:
            table_id = obj["table"]
            table_description = obj["table_description"]
            list_descriptions = []
            if "columns" in obj:
                list_descriptions = obj['columns']
            dict_descriptions_from_list = {}
            for entity in list_descriptions:
                dict_descriptions_from_list[entity["column"]] = entity["description"]

            table_address = f"{project_id}.{dataset_id}.{table_id}"

            # Make an API request.
            table_bigquery = client.get_table(table_address)

            # change table description
            table_bigquery.description = table_description + str_footer

            # change labels
            if "labels" in obj:
                table_bigquery.labels = obj["labels"]

            # make schemas to change column description
            list_schema = []
            for index in range(len(table_bigquery.schema)):
                entity_schema = table_bigquery.schema[index]
                if (column_name := entity_schema.name) in dict_descriptions_from_list:
                    list_schema.append(
                        bigquery.SchemaField(
                            name=entity_schema.name,
                            field_type=entity_schema.field_type,
                            mode=entity_schema.mode,
                            description=dict_descriptions_from_list[column_name],
                            fields=entity_schema.fields,
                            policy_tags=entity_schema.policy_tags
                        )
                    )
                else:
                    list_schema.append(table_bigquery.schema[index])

            # set schema on table
            table_bigquery.schema = list_schema

            # write option you want to change on second input
            if "labels" in obj:
                client.update_table(table_bigquery, ["description", "schema", "labels"])
            else:
                client.update_table(table_bigquery, ["description", "schema"])

            print(f"Table {table_address} updated.")

一つの関数で全部やろうとしているので、内容がてんこ盛りになってしまいました。

ちなみに、datasetのプロパティを更新するには以下のIAM権限が必要です。

  • bigquery.datasets.update
  • bigquery.datasets.get

以下の事前定義された役割には、データセットのプロパティを更新するために必要なIAM権限が含まれています。

  • roles/bigquery.dataOwner
  • roles/bigquery.admin

移行作業

Dmemo

元々、弊社ではOSSDmemoというメタデータ管理ツールを利用していました。DBを連携するとスキーマが登録され、そこに対して説明を書き込んでいくといういわゆるPULL型のツールです。

連携したら自動でやってくれるのは良い機能なのですが、連携して放置という状況が相次ぎメタデータに関しての信頼度が失われつつありました。

そこで、Dmemoの内容をSubakoに移し分析チームに確認してもらって情報の鮮度を確認しました。分析チームはメタデータがあっているかどうかを確認するだけでよかったため、一から記入するケースと比べて労力を減らすことができました。

新規データの注入

弊社の分析基盤は基本的に2つあります。

  • セキュアデータ基盤
    • センシティブなデータが含まれる基盤でアクセス可能なメンバーが制限されている
    • 特定の踏み台からしかアクセスすることができない
  • カジュアルデータ基盤
    • センシティブなデータが含まれていない
    • 通常のオフィスネットワークからアクセスすることができる

そして、近日セキュアデータ基盤からカジュアルデータ基盤へのデータバイプラインをStorkというものに刷新しました。

詳しくは以下の記事をご覧ください。 モダンデータスタックでデータ分析基盤の改善〜可用性と保守性もアップ!〜

このパイプラインではカラム単位で転送可否を決めているのですが、その際にスプレッドシートを用いてその情報の機密度を調査していました。

このスプレッドシートをSubakoに書き込むことで、新規の転送予定のものはデータが格納されていくことになります。

とはいえ、どちらの方法においても手動なので一つ一つ頑張るしかないという状況です。

よかったこと

プロジェクトが一息ついた今だから話せますが、メタデータ、あるとかなり便利です。 個人的によかったことは以下の3つになります。

作業効率の向上

データの仕様を把握する時間は状況によって不安定なため、依頼を受けても対応にかかる時間が読めませんでした。そのようなケースを減らすことで迅速に必要な人にデータを届けることができます。

データニーズの把握

プラットフォームを提供することで分析者からのメタデータに対するニーズを拾うことができ、どのデータを必要としているかを知ることができました。

これによって今後のデータ拡充の戦略なども立てることができデータの利活用を推進していきたい分析基盤部としては嬉しかったです。

アナリティクスエンジニアのチームとの連携

これは自分にとって一番嬉しかったです。

データエンジニアとアナリティクスエンジニアは作業内容は近いもののお互いが何を行っているか把握できていない現状があります。Subakoを通じてやり取りを行うことで、コミュニケーションの強化につながりました。

今後

このプロジェクトを通じてメタデータ管理はリバースETLの一環だなと感じました。

テーブルのスキーマをとってきて、メタデータを埋め込んで元のスキーマに返してあげるという処理がデータエンジニアっぽくて地味な作業ですが好きになれました。

分析基盤の近くに分析のために必要な情報を集め、プラットフォームの中で完結させてしまうということは分析を推進する上で必要なことの一つです。

これを読んで興味を持った方が、小さなところから始めてくださると書いた自分としてとても嬉しいです。

当たり前ですが手動でメタデータを書くのは結構大変でした。ですので、これから新しい種類の情報を書き込む場合は自動でとってこれるものに絞っていこうかなと考えています。

また、良いメタデータ管理ツールが現れたら同時並行で使っていこうと思うので、アンテナを高く張って今後の動向に備えたいと思います。

この記事を読んだみなさんも是非「メタデータ管理」に取り組んでみてください。

最後に

また、私の所属している分析基盤部では、データ分析環境向上のための、複数のプロジェクトが進行中です。

いろんな活躍のチャンスがありますので、ぜひカジュアル面談などで紹介させてください!

気になった方は、以下のページからご応募ください。

【データエンジニア】CTO室(データ分析基盤グループ)_東京

それでは、最後まで読んでいただきありがとうございました!


マネーフォワードでは、エンジニアを募集しています。 ご応募お待ちしています。

【会社情報】 ■Wantedly株式会社マネーフォワード福岡開発拠点関西開発拠点(大阪/京都)

SNS】 ■マネーフォワード公式noteTwitter - 【公式】マネーフォワードTwitter - Money Forward Developersconnpass - マネーフォワードYouTube - Money Forward Developers