Mercari Engineering Blog

We're the software engineers behind Mercari. Check out our blog to see the tech that powers our marketplace.

TB越えのMySQL 巨大テーブルを 1日で BigQueryへLOADする

こんにちは!!

私はメルカリでSREをしている k-oguma ( ktykogm ) です。

ちょうど1年くらい前にジョインしました。

よろしくお願いします!

今日は、タイトルの件で対応した方法をご紹介したいと思います。

それはある日突然やってきた

ある日、ETL作業 (データ分析基盤運用)の依頼がUSチームからやってきました。

要件は次のようなものでした。

  • 1.4TB サイズの MySQL innodb tableを1つをBigQueryに上げる
    • 約1年分。期間指定。
  • 期限数日、なる早
  • 対象のcoloumsは全て
  • 期間指定以外のデータ加工は不要

...やっていき!

TL;DR

  • 1日でTB越え (18億行以上) のMySQLのtableを BigQuery に LOAD することが出来た
  • Digdag, Embulk, Shell scriptで一気にloopでretry等も対応して送れた
  • 複雑ではないので応用が効きやすい
  • 結論
    • Digdagだけでも出来たかもだけど Digdag + Embulk は使いやすいし本当に良い
    • やり方だけ見たい方は、 #Embulk の項目からお読みください

BigQueryへLOADさせる方法を考える

今回、巨大なtableを高速且つ信頼性を高く簡単にBQへLOADできる方法 を模索しました。

BigQueryにLOADさせるには、様々な方法があります。 Dataflow, Airflow, Google Cloud Composer, Digdag, Embulk, GCS (CSV, Avro, etc), etc

今回は、それほど難しいデータの抽出等の処理は必要ありません。 よって、DataflowやAirflow/Composer等ではなく、DigdagとEmbulkで簡単且つ迅速に対応出来そうです。

使用したのは、以下になります。

しかし、一気にこのサイズをBQにLOADすることは出来ませんし、一度にLOADさせる量が大きすぎると進行状況がなかなか見えないのも問題です。

進捗状況が見える + 失敗がすぐに検出できてretry*1等ができるようにすることも必要でした。

初期の検討

その対象table は以下のようなschemaでした。

+--------------+---------------------+------+-----+---------+----------------+
| Field        | Type                | Null | Key | Default | Extra          |
+--------------+---------------------+------+-----+---------+----------------+
| id           | bigint(20) unsigned | NO   | PRI | NULL    | auto_increment |

...<snip>

| data         | text                | YES  |     | NULL    |                |
| created      | datetime            | NO   |     | NULL    |                |
+--------------+---------------------+------+-----+---------+----------------+

そこで最初のプランは以下を検討しました。

id を元に数万件を並列で処理させたものを for_range> および for_each> で回す

しかし、その方法には以下の問題がありました。

  • id が連番ではなかった
    • auto_increment ですが、全てのRecordがそのまま残っているtableではなかった
      • 他のtableからFKされているなどリレーションを持ったtableだったのも関係
    • ばらつきが生じてBQのLimit (1日の更新は1000回まで https://cloud.google.com/bigquery/quotas )を超えてしまい、Errorになることもあった
    • これでは速度が出ない
  • for_range>for_each> 組み合わせても速度が出ない
  • id を 細切りにして for_range> に渡しても自動的にloopさせるのに難があった
    • for_range> のfromとtoにnullが入るといった問題も発生

見直し

上記のやり方だと問題があることが分かったので、以下のように変更しました。

  • id だと問題があるので、 created で日付を見つつ一定期間づつ並列で最速に回す
  • BQ Limit (1,000/day) に引っかからないけれど全期間一気に処理できるようにする

最初、created ではなく、 id にしたのは、そう... INDEXがありません でした。

そこで、作業用のslaveのtableだけ created に ADD INDEXしちゃいます。 (これらも出来るための作業用DB Slave)

+--------------+---------------------+------+-----+---------+----------------+
| Field        | Type                | Null | Key | Default | Extra          |
+--------------+---------------------+------+-----+---------+----------------+
| id           | bigint(20) unsigned | NO   | PRI | NULL    | auto_increment |

...<snip>

| data         | text                | YES  |     | NULL    |                |
| created      | datetime            | NO   | MUL | NULL    |                |
+--------------+---------------------+------+-----+---------+----------------+

こうなりました。

これで、embulk-input-mysql pluginの WHERE句 にて created を検索する時の時間を少なく出来ます。

Embulk

Embulkのliquid ファイルを以下のように用意しました。 これは、後に説明するDigdagから動かすためのEmbulk です。

XXXXXX-bulk.yml.liquid

in:
  type: mysql
  host: localhost
  user: XXXXX_user
  password: {{ env.MYSQL_PASS }}
  database: XXXXX
  incremental: true
  incremental_column: [created]
  table: XXXXX
  select: "*"
  where: "'{{ env.S }}' <= created AND created <= '{{ env.E }}'"
  options: {useLegacyDatetimeCode: false, serverTimezone: UTC}

out:
  type: bigquery
  mode: append
  table: XXXXX
  auth_method: json_key
  json_keyfile:  /path/to/your/directory/serviceAccount.json
  project: XXXXXX
  dataset: us_XXXXXXX
  compression: GZIP
  auto_create_table: true

Embulk 説明

  • in:
    • where:
      • ここでcreated の日付で範囲指定をして一回LOAD分の抽出をしています
    • incremental_column:
      • ここでは対象が created ですが、WHERE の絞り込み対象のcolumn 名が異なる場合はそれに合わせてください
  • out:
    • json_keyfile:
      • serverAccount.json は、別途 GCP consoleで取得しておく必要があります
    • mode:
      • append にした理由は、replaceだと上書きしてしまうからです
  • 共通
    • default_timeset:
      • 今回USだったので、Embulkの defaultである UTC でしたので省いています

Digdag

上記を動かすDigdag のDAG (Workflow) ファイルは、以下のように用意しました。

XXXXXX.dag

_export:
  plugin:
    repositories:
      - https://jitpack.io
    dependencies:
      - com.github.szyn:digdag-slack:0.1.2
  webhook_url: https://hooks.slack.com/services/${ YOUR_WEBHOOK_TOKEN }
  workflow_name: slack
  ENV: develop
  _start_day: ${start_day}
  _end_day: ${end_day}

timezone: UTC

+setup:
  echo>: start ${session_time}

+prepare:
  _retry:
    limit: 3
    interval: 10
    interval_type: exponential

+repeat:
  _export:
    start: ${moment(_start_day).format("YYYY-MM-DD HH:mm:ss")}
    end  : ${moment(_end_day)  .format("YYYY-MM-DD HH:mm:ss")}
  loop>: ${moment(end).diff(moment(start), 'day') + 1 }
  _do:
    _export:
      start_date: ${moment(start).add(i, 'days').format('YYYY-MM-DD HH:mm:ss')}
      end_date: ${moment(start).add(i, 'days').format('YYYY-MM-DD 23:59:59')}
    +first_day:
      if>: ${i == 0}
      _do:
        sh>: S="${start}" E="${end_date}" embulk run XXXXXX-bulk.yml.liquid
    +not_first_day:
      if>: ${i != 0}
      _do:
        sh>: S="${start_date}" E="${end_date}" embulk run XXXXXX-bulk.yml.liquid

  _parallel: true
  _error:
    slack>: danger-template.yml

+slack:
  slack>: good-template.yml

+teardown:
  echo>: finish ${session_time}

Digdag 説明

  • webhook_url:
    • Slack通知用に事前にWebhookのTOKENを取得して設定します
    • Digdag-slack を使用して成功と失敗を通知されるようにしています
  • +repeat:
    • これがWorkflow のJob本体です
    • DATE_TIME format で開始日と終了日を得ます
    • moment()...
      • Moment.js を参照してください。変更する場合はMoment.jsの記法に合わせる形です
    • loop>:
      • この処理により、指定した期間の中で + 1 dayをloopさせて足し続けて長期間でも処理を回し続けられるようにしています
    • first_day:
      • loopの初回だけ(${i == 0})は、${moment(start).add(i, 'days') で +1 day される前の日付 = ${start} から開始である必要があります

Digdag呼び出し処理

2017-11-20 から 2018-12-17 まで実施の例です。

Dry-run

試すときは、echo print debugで良いですね!

$ START_DAY='2017-11-20'; for ((i=1; i<= 131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);echo digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d); done | head -5
digdag run -p start_day=2017-11-20 -p end_day=2017-11-22 -a XXXXXX.dig
digdag run -p start_day=2017-11-23 -p end_day=2017-11-25 -a XXXXXX.dig
digdag run -p start_day=2017-11-26 -p end_day=2017-11-28 -a XXXXXX.dig
digdag run -p start_day=2017-11-29 -p end_day=2017-12-01 -a XXXXXX.dig
digdag run -p start_day=2017-12-02 -p end_day=2017-12-04 -a XXXXXX.dig
$ START_DAY='2017-11-20'; for ((i=1; i<= 131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);echo digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d); done | tail -2
digdag run -p start_day=2018-12-12 -p end_day=2018-12-14 -a XXXXXX.dig
digdag run -p start_day=2018-12-15 -p end_day=2018-12-17 -a XXXXXX.dig

いざ、実行

まず前提として、tmux 等や nohup + background処理にして長時間実行でも切断されないようにしておきます。

次に先ほどdry-runで試した処理を echo 無しで実行です!

 $ START_DAY='2017-11-20'; for ((i=1; i<=131; i++));do END_DAY=$(date --date="$START_DAY 2 days" +%Y-%m-%d);digdag run -p start_day=$START_DAY -p end_day=$END_DAY -a XXXXXX.dig ;START_DAY=$(date --date="$START_DAY 3 days" +%Y-%m-%d);done

これで、1日ごとにBQへLOADするEmbulk処理を3日先までまとめて並列で実行しています。

そのまま帰宅して、翌日くらいに見てみましょう。

補足: もっと高速化させたいなら

例えば、上記のDidgag 説明にある loop>:+ 1 の数値を増やして、こちらの END_DAY や 最後の START_DAY に入る n days の数値を増やせば、Embulkの一度にLOADさせる対象を増やすことが出来てLoopが減るので速度が早まる可能性があります。 しかしその反動で、一回の処理に時間が掛かるようになるため、確認が遅くなって色々試しづらかったのもあったので、今回は 1 day * 3 づつLOAD させています。

終わったあとは

確認してみます。

% bq query 'SELECT min(created) as start_date, max(created) as end_date FROM [project.dataset.table]'
Waiting on bqjob_r7bc1e8862af918df_00000167decc19a5_1 ... (0s) Current status: DONE
+---------------------+---------------------+
|     start_date      |      end_date       |
+---------------------+---------------------+
| 2017-11-20 23:05:45 | 2018-12-17 23:59:59 |
+---------------------+---------------------+
% bq query 'SELECT COUNT(1) FROM [project.dataset.table]'
Waiting on bqjob_r2b8104e26b860aad_00000167ded120dc_1 ... (5s) Current status: DONE
+------------+
|    f0_     |
+------------+
| 1840266370 |
+------------+

全部で最終的にBQにLOADさせたRecord数は 18億4,026万6370件 でした。

たったこれだけで完了です!!

簡単で早くて最高ですね!!

最後に

最後に宣伝ですが、 SRE Loungeというコミュニティの運営にも私は携わらせていただいています。 来年 1/18(金) 19:00から #7 が開催されます!

メルカリが会場スポンサーとしてサポートさせていただきます。 私も登壇させていただく予定ですので、お時間ご興味ある方はぜひ以下のリンクからお申し込みいただけると幸いです。

sre-lounge.connpass.com

平成最後の年末ですね。今年は暖冬とは言われていますが 寒くなってきたので、みなさま風邪など引かないように!

それではまた!! ノシ

参考にしたURL

*1:失敗で中途半端なRecordが存在しうることと、重複する可能性があるのもダメなので、実際にDigdagの_retryとEmbulkのmode: appedがそういった問題を起こさないか(atomic性があるか)は検証しました。mode: append mode now expresses a transactional append, and mode: append_direct is one which is not transactional. とのことなので、Caused by: org.jruby.exceptions.RaiseException: (ConnectionFailed) Failed to open TCP connection to www.googleapis.com:443 (execution expired) などのErrorをわざと途中で発生させたり、再度同じ日付を繰り返し実行したりしてretryが正常に行われることを確認しました。