Money Forward Developers Blog

株式会社マネーフォワード公式開発者向けブログです。技術や開発手法、イベント登壇などを発信します。サービスに関するご質問は、各サービス窓口までご連絡ください。

20230215130734

モダンデータスタックでデータ分析基盤の改善〜可用性と保守性もアップ!〜

はじめに

こんにちは。CTO室 分析基盤部の長谷川(shase)です。

私が所属するチームでは、データ分析基盤の開発と運用を行っています。

今回は私が入社以来(といっても半年弱程度なのですが)やってきた、データパイプラインの整理についてご紹介したいと思います。

この記事が想定する読者

事業会社でデータ分析基盤の開発と運用に携わる、データエンジニア、データアナリストなどの職種の方にとって参考になればと思い執筆しました。

記事に出てくる社内用語の補足

  • セキュアデータ基盤は、センシティブなデータが含まれる基盤でアクセス可能なメンバーが制限されています。また、特定の踏み台からしかアクセスすることができません。

  • カジュアルデータ基盤は、センシティブなデータが含まれていない代わりに、通常のオフィスネットワークからアクセスすることができる環境です。

抱えていた課題

私が入社した時点で、カジュアルデータ分析基盤(センシティブなデータが含まれないデータ分析基盤)向けのデータパイプライン(図でいうところのデータパイプライン#1のJenkins + Embulk)は以下のような課題を抱えていました。

  • データ量の増大により、日次のETL処理(Extract/Transform/Load)が、24時間を超えてしまいそうな状況であったこと。

  • Jenkins/Embulk がホストされている環境の技術スタックが、EOLを迎えていたこと。また、今後の全社的な方針により、同じ状態でのリプレイスやアップグレードが困難であったこと。

  • プロダクトDBはもともとモノリシックなアーキテクチャであったが、マイクロサービス化、新規プロダクトの開発により、ETL対象のデータベースが増えていったが、それに対応できるコンピューティングリソースが確保できなかったこと(新規のデータ追加が出来ない状態であった)

などです。

データ分析基盤の継続性に関わる問題が発生しており、この課題解決は急務でした。

今回紹介する構成

今回の記事では以下のような構成の話となっています。

解決策

元々、セキュア基盤というセンシティブデータを含むデータ分析基盤が存在し、そちらは別のETLの仕組みで、データを取り込んでいました。

プロダクトデータベースからのETLが2系統ある状態は、保守性の面でも、作業の工数の面でも大変都合が悪く、ETL を統合することにしました。

これが今回の主題のデータバイプラインの整理です。

ただ、セキュアデータ基盤向けのETLに統合するにしても、カジュアルデータ基盤向けの何らかのデータ転送の仕組みが必要です。

そのため、図の「データパイプライン#2」の仕組みを新たに実装することにしました。

Cloud ComposerDAG で実装されており、社内では「Stork」と呼んでいます(データを運ぶコウノトリをイメージして名付けました)。

Cloud Composer を採用したのは、既存のデータパイプラインで採用されており、チームにとっての導入コストが小さかった為です。

実装イメージ

以下は実装イメージです。

.

├── builders.py

├── stork_foobar_dataset_daily.py

├── source_datasets

│   └── foobar_dataset

│       └── foobar_table.sql

│       └── foobar_table2.sql

└── target_schema

    └── foobar_dataset

        └── foobar_table.json

        └── foobar_table2.json

以下がメインとなる DAG の実装イメージになります。

  • source_datasets 配下に セキュア基盤から SELECT する SQLファイルを追加し、target_schema 配下の json ファイルで、カジュアル基盤に書き込む際のスキーマファイルを指定します。

  • builder.py には、Operator を生成するメソッドが定義されており、メインのファイルからそれらを呼び出すことによって、ワークフローを定義しています。

  • テーブルごとにまとまった処理を task_group にすることで、全体の見通しを良くしています。

  • セキュアデータ基盤のデータをそのまま転送せず、あえてクエリ(SQL)を書くようにしているのは、セキュアデータ基盤はセンシティブデータが含まれるのですが、それをカジュアルデータ基盤にそのまま転送することができないからです(カラムレベルで転送可否を判断する必要があります)。

  • データの転送と同時に、メタデータの転送も実現しています。

  • カジュアルデータ基盤とセキュアデータ基盤のリージョンが違うなどの関係で、一旦GCSを経由した転送になっています。

import os

import airflow

from airflow import DAG

from airflow.utils.task_group import TaskGroup

from stork import builders

dag_path = "/path/to/dag"

sql_path = dag_path + "/source_datasets"

template_searchpath = [dag_path + "/source_datasets"]

source_dataset = "foobar_dataset"

target_dataset = "foobar_dataset"

temporary_dataset = "temporary"

args = {

    "owner": "data-infra",

    "start_date": airflow.utils.dates.days_ago(2)

}

parent_name = "stork_foobar_dataset_daily"

dag = DAG(

    parent_name,

    default_args=args,

    schedule_interval=#省略

    template_searchpath=template_searchpath,

)

with dag:

    dag_start = builders.build_start_dummy_operator(#省略)

    dag_end = builders.build_end_dummy_operator(#省略)

    post_process_start = builders.build_start_dummy_operator(#省略)

    post_process_end = builders.build_end_dummy_operator(#省略)

    copy_dataset_description = builders.build_copy_dataset_description_operator(#省略)

    path = sql_path + "/" + source_dataset

    sqls = os.listdir(path)

    for sql in sqls:

        table = os.path.splitext(sql)[0]

        with TaskGroup("taskgroup_section_" + table, tooltip="taskgroup_section") as taskgroup_section:

            start_operator = builders.build_start_dummy_operator(#省略)

            bq_operator = builders.build_bq_operator(#省略)

            bq_to_gcs_operator = builders.build_bq_to_gcs_operator(#省略)

            move_gcs_operator = builders.build_move_gcs_operator(#省略)

            gcs_to_bq_operator = builders.build_gcs_to_bq_table_replace_operator_with_description(#省略)

           bq_table_delete_operator = builders.build_bq_table_delete_operator(#省略)

           copy_table_description_operator = builders.build_copy_table_description_operator(#省略)

           end_operator = builders.build_end_dummy_operator(#省略)

            (

                start_operator

                >> bq_operator

                >> bq_to_gcs_operator

                >> move_gcs_operator

                >> gcs_to_bq_operator

                >> bq_table_delete_operator

                >> copy_table_description_operator

                >> end_operator

            )

        (

            dag_start

            >> taskgroup_section

            >> post_process_start

            >> copy_dataset_description

            >> post_process_end

            >> dag_end

        )

SQLフォーマッタの導入

SQLファイル(jinja2テンプレート)が300ファイルぐらい、生まれたので、SQLフォーマッタを導入しました。

BigQuery に対応しているフォーマッタとして、今回は sqlfluff を導入しています。

典型的なSQLファイルが以下のようなものです。

-- Transfer : diff

CREATE OR REPLACE TABLE `foobar-project.{{ params.temporary_dataset }}.{{ params.temporary_table }}_{{ ds_nodash }}` AS

SELECT

    TIMESTAMP,

    ID,

    FOOBAR,

    CREATED_AT,

    UPDATED_AT

FROM

    `foobar-project.{{ params.dataset }}.{{ params.table }}`

WHERE

    CREATED_AT BETWEEN '{{ macros.ds_add(ds, -1) }} 15:00:00' AND '{{ ds }} 14:59:59'

ORDER BY

    CREATED_AT;

jinja2 テンプレート形式のため、実際に sqlfluff を適用させるには、パラメータをダミーのようなもので埋める必要があります。

以下に、.sqlfluff (設定ファイル)のサンプルを掲載します。

[sqlfluff:templater:jinja:context]

ds="2022-01-02"

params=""

params.dataset="foo"

params.table="foo"

params.temporary_dataset="foo"

params.temporary_table="foo"

ds_nodash="20220101"

[sqlfluff:templater:jinja]

library_path = sqlfluff_libs

固定値のようなパラメータであれば、 [sqlfluff:templater:jinja:context] のような記載でよいのですが、問題は、 {{ macros.ds_add(ds, -1) }} のような、メソッド定義している箇所です。

こういった場合、 [sqlfluff:templater:jinja] セクションに、 library_path = sqlfluff_libs のような形で、macro のディレクトリを指定し、以下のようなコードを用意する必要がありました。

sqlfluff_libs

├── __init__.py

└── macros.py
def ds_add(x: str, y: int) -> str:

    return "2022-01-01"

sqlfluff の細かいルールについては、現在検討中ですが、ルールをツールで反映できるのは大変魅力的だと感じています。

移行作業

既存の仕組み(データパイプライン#1)で転送されているテーブル数は概算で300テーブルほどありました。

このテーブル群の大半をまず、新しい転送の仕組みで転送させるようにしたのが、先月の9月になります。

この時点で同時稼働しています。

今月に入って、やっと、データ利用者と協力しながら、順次既存の仕組みで転送しているテーブル群の停止を行っているところです。

Cloud Composer のチューニング

今回のデータパイプラインで、300テーブルほど転送するようになり、Cloud Composr の DAG と Task が増え、DAG 解析時間が伸びるようになってしまいました。

以下の画像は、Cloud Composer の MONITORING で、Total parse time for all DAG files を確認したものです。元々、30分程度時間がかかっていたのが読み取れます。

Airflow Scheduler の リソース不足が確認できたので、Scheduler の CPU 割当を増やした結果、以下の画像のように DAG 解析時間が低下したのが読み取れます。

Cloud Composer も以前に比べ、監視周りが強化され、こういったチューニングが行いやすくなってきました。

おわりに

モダンデータスタック(Cloud Composer / Airflow など)を採用し、データパイプラインの整理(統廃合)をおこなうことによって

  • プロダクトDBからのETLの工数が半減

  • スケーラビリティの確保し、可用性を向上させることに成功

  • 転送時間を削減し、転送量を増やすことができるようになった

これらを実現することができました。

また、私の所属している分析基盤部では、データ分析環境向上のための、複数のプロジェクトが進行中です。

ご興味があれば、ぜひカジュアル面談などで紹介させてください!!

私の Meety です!!


マネーフォワードでは、エンジニアを募集しています。 ご応募お待ちしています。

【会社情報】 ■Wantedly株式会社マネーフォワード福岡開発拠点関西開発拠点(大阪/京都)

【SNS】 ■マネーフォワード公式noteTwitter - 【公式】マネーフォワードTwitter - Money Forward Developersconnpass - マネーフォワードYouTube - Money Forward Developers