Mercari Engineering Blog

We're the software engineers behind Mercari. Check out our blog to see the tech that powers our marketplace.

Apache Avro に入門した

この記事は MERPAY TECH OPENNESS MONTH の 1 日目の記事です。

メルペイでソフトウェアエンジニアやっている @syu_cream です。 メルペイバックエンドシステムにおけるデータ・ログ収集と活用を促進するためのシステム DataPlatform の仕事をしています。

本記事ではこの DataPlatform で用いているデータフォーマット Apache Avro について、簡単な紹介とメルペイにおける導入背景、そして利用事例を示していきます。 ビッグデータ処理基盤においてフォーマット選定は頭を悩まされる課題かと思います。 ログフォーマット選定やスキーマ管理について、少しでもなにか得られるものがあれば幸いです。

Apache Avro の簡単な紹介

Apache Avro はデータがバイナリエンコードされる、軽量で柔軟なデータフォーマットです。 筆者としては BigQuery や Apache Kafka などビッグデータ処理基盤でよく用いられている印象があります。 またよく比較される他のフォーマットとして、 Protocol BuffersMessagePack が挙げられます。

Apache Avro の特徴として以下が挙げられます。

  • リッチなデータ構造の表現(ネストされた型や配列、UNION、 別型への変換がサポートされる)
  • スキーマ付きファイルのサポート
  • スキーマハンドリングが柔軟(事前にパーサ生成することも、インラインでスキーマを読んでパースすることも可能)
  • 後方互換性への配慮
  • BigQuery などの DWH(Data WareHouse) や Apache Spark などのデータ処理ミドルウェアでのサポート
  • avro-tools や便利ライブラリを公式に提供

また Apache Avro では公式で C, C++, C#, Java, JavaScript, Perl, PHP, Python, Ruby 実装が提供されているほか、サードパーティでは Go や Scala などのライブラリも存在するため、様々な言語で Avro エンコードされたデータを扱うことが容易になっています。 余談ですがこの中でも特に Java のサポートが手厚く、シリアライズ・デシリアライズの他にも便利ライブラリが提供されています。

メルペイ DataPlatform 、 Apache Avro に入門する

この Apache Avro なのですが、以前公開した記事で少し触れましたが、メルペイ DataPlatform ではストリームデータパイプラインの中間表現として用いられています。

tech.mercari.com

ここでは Apache Avro を採用するまでの経緯について簡単に触れておきます。

2018 年 8 月頃、その頃はまだプロトタイプ実装しか存在しなかった時期なのですが、システム内部で用いるデータの中間表現をどうするか問題が湧き上がりました。 当時は(今もなのですが) DataPlatform が収集したデータは最終的に BigQuery に蓄積するようにしています。 さらにメッセージキューやデータレイク用のオブジェクトストレージになるべく後続の処理や BigQuery への再ロードがしやすい形式でデータを保持できるようにしたい要望が上がりました。

この時検討したフォーマットは以下の通りでした。

この中で、スキーマ情報を含めて後続の処理によりリッチな情報を受け渡したかったので JSON は候補から外しました。 また Parquet その他列形式でデータを格納するカラムナフォーマット も候補から除外しました。 BigQuery がカラムナでデータを保持するため重要性がそれほど高くなく、かつカラムナに変換する処理は別途行うと割り切り実装をシンプルにするためです。

残る候補の中から Apache Avro を選定した理由としては、 Object Container Files 形式を利用することで各メッセージに型情報を簡単に含め、処理することが可能である点が大きいです。 この形式のファイルとしてオブジェクトストレージにログを保存することで、後で再取り込みしたい際にスキーマ情報をファイル中の先頭のメタデータだけ読めば完結できます。 また、複数段階に分割された ETL(Extract, Transform, Load) 処理をするにあたり、この形式でスキーマ情報を付与することで各処理でデータとスキーマを個別に扱う手間が省けます。

Apache Avro と Protocol Buffers 相互運用

これらの経緯より親和性が高い Apache Avro を選定した訳ですが、一方ログを生成する各マイクロサービスではサービス間のメッセージ定義に Protocol Buffers を多用しています。 このノウハウと Protocol Buffers 向けに作られた社内のエコシステムを活用するため、各マイクロサービスからストリームデータパイプラインに出力するログは Protocol Buffers で定義できるようにしています。 これに関する経緯は昨年の builderscon にて LT でほんの少し触れています。

builderscon.io

しかし Protocol Buffers でエンコードされたままのメッセージだと、スキーマ情報を付与してファイルとしてデータを保存しておいたり、 BigQuery に再取り込みするなどの作業が難しくなります。 なんとか Protocol Buffers でエンコードされたメッセージを Apache Avro の形式に変換して容易に相互運用可能にしたい。 この課題を解決する方法が、幸いにも Apache Avro コミュニティが提供する avro-protobuf という便利ライブラリによってもたらされました。

avro-protobuf について

avro-protobuf では、 Apache Avro でエンコードされたバイナリを Protocol Buffers のメッセージ型オブジェクトとして読み出す ProtobufDatumReader と、その逆で Protocol Buffers のメッセージ型オブジェクトを Apache Avro のバイナリに書き出す ProtobufDatumWriter が提供されています。

Scala で簡単にシリアライザ・デシリアライザを書くならたとえば以下のようにできると思います。 なおここで提示するコードは一部記述を簡略化しています。

package com.merpay.data.serde.avro.protobuf

import java.io.ByteArrayOutputStream
import org.apache.avro.io.EncoderFactory
import org.apache.avro.protobuf.{ProtobufData, ProtobufDatumWriter}

case class AvroProtobufSerializer()
...
  private val avroSchema = ProtobufData.get().getSchema(classOf[MerpayEventLog])
  private val datumWriter = new ProtobufDatumWriter[T](avroSchema)

  def serialize(data: MerpayEventLog): Array[Byte] = {
    val outStream = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get().binaryEncoder(outStream, null)

    datumWriter.write(data, encoder)
    encoder.flush()
    outStream.close()

    outStream.toByteArray
  }
}
package com.merpay.data.serde.avro.protobuf

import org.apache.avro.io.DecoderFactory
import org.apache.avro.protobuf.{ProtobufData, ProtobufDatumReader}

case class AvroProtobufDeserializer() {

  private val avroSchema = ProtobufData.get().getSchema(classOf[MerpayEventLog])
  val datumReader = new ProtobufDatumReader[MerpayEventLog](avroSchema)

  def deserialize(data: Array[Byte]): MerpayEventLog = {
    datumReader.read(null, DecoderFactory.get().binaryDecoder(data, null))
  }
}

Apache Avro の GenericDatumWriter を用いて GenericRecord オブジェクトを Apache Avro のバイナリに変換してから前述の AvroProtobufDeserializer を用いると GenericRecord オブジェクトを Protocol Buffers のメッセージオブジェクトに変換することが可能です。

package com.merpay.data.serde.protobuf.avro

import java.io.ByteArrayOutputStream

import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.apache.avro.protobuf.ProtobufData

case class ProtobufAvroSerializer() {

  private val avroSchema = ProtobufData.get().getSchema(classOf[MerpayEventLog])
  private val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
  private val avroProtobufdeser = AvroProtobufDeserializer()

  def serialize(data: GenericRecord): Array[Byte] = {
    // Avro GenericRecord -> Avro bytes
    val avroByteStream = new ByteArrayOutputStream()
    val encoder = EncoderFactory.get().binaryEncoder(avroByteStream, null)
    datumWriter.write(data, encoder)
    encoder.flush()
    avroByteStream.close()
    val avroBytes = avroByteStream.toByteArray

    // Avro bytes -> Protobuf bytes
    val protobufObj = avroProtobufdeser.deserialize(avroBytes)
    val protobufByteStream = new ByteArrayOutputStream()
    protobufObj.writeTo(protobufByteStream)
    protobufByteStream.close()
    protobufByteStream.toByteArray
  }
}

同様に Protocol Buffers のメッセージオブジェクトのバイナリを一旦 Protocol Buffers のメッセージオブジェクトに変換した後、先述の AvroProtobufSerializer で Apache Avro のバイナリに変換してから、 GenericDatumReader で読み出すことで GenericRecord オブジェクトを得るという逆変換もすることが可能です。

package com.merpay.data.serde.protobuf.avro

import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.avro.protobuf.ProtobufData

case class ProtobufAvroDeserializer() {

  private val avroSchema = ProtobufData.get().getSchema(classOf[MerpayEventLog])
  private val datumReader = new GenericDatumReader[GenericRecord](avroSchema)
  private val avroProtobufSer = AvroProtobufSerializer()

  override def deserialize(data: Array[Byte]): GenericRecord = {
    // Protobuf bytes -> Protobuf obj -> Avro bytes
    val protobufObj = MerpayEventLog.parseFrom(data)
    val avroBytes = avroProtobufSer.serialize(protobufObj)

    // Avro bytes -> Avro GenericRecord
    datumReader.read(null, DecoderFactory.get().binaryDecoder(avroBytes, null))
  }
}

これらにより、二段階の変換処理を行うコストは生じてしまいますが、既存の資産を活用しつつ Protocol Buffers と Apache Avro の相互運用が可能になります。

avro-protobuf の課題とコントリビュート

さて、avro-protobuf が便利に使えそうなのですが、我々が使い始めた当初は以下のような問題がありました。

  1. ProtobufData.get().getSchema() が内部的に static メソッドを reflection で参照しに行く実装で、これを迂回するのが不可能
  2. Protocol Buffer の java_multiple_files オプションに未対応
  3. Protocol Buffer 標準の型に対する Logical Type の扱いが不可能

  4. は先述の MerpayEventLog というメッセージ型に限った SerDe の実装であれば問題ないのですが、任意の Protocol Buffers のメッセージ型を扱えるようにしたい場合課題になってきます。

  5. com.google.protobuf.Timestamp などこのオプションを使っている型を扱おうとすると内部的に無限ループに入る実装になっていました。
  6. は付加的なものですが、前述の com.google.protobuf.Timestamp などの型を Apache Avro の Logical Type として扱う機構がなく、拡張性に欠ける部分がありました。

これらの問題を解決するため、個々に JIRA へのレポーティングや機能追加パッチの作成を行いました。 上記3点の問題は我々によって実装・修正され、 master ブランチでは解消された状態になっています。

  1. https://github.com/apache/avro/pull/466
  2. https://github.com/apache/avro/pull/463
  3. https://github.com/apache/avro/pull/482

Apache Avro はリリースサイクルが遅いようで、これらのライブラリが Maven Central Repository で提供されるのにも時間がかかります。 実際に本記事執筆段階では最後のリリースバージョン 1.8.2 は 2017 年 5 月になっております。 その上 master ブランチと 1.8.2 には差分が多く、おそらく次バージョンとなる 1.9.0 を安定稼働させるのは直近は難しいのではないかと考えられます。

そこで今回は上記が正式にリリースされるまでの一時しのぎとして、 master ブランチの修正を任意に取り込みつつも既存の 1.8.2 依存環境を崩さない自社向け avro-protobuf を用意することにしました。 これは実装としては以下の fork したリポジトリに公開しています。

github.com

この実装を社内向け JFrog Artifactory に公開するようにしています。

おわりに

以上が Apache Avro のメルペイでの導入背景と利用事例です。 この構成にもフォーマット変換をすることによるリスクやコストを避けるため Apache Avro に統一する、事前ビルドが必要なく動的にスキーマを更新可能にするスキーマレジストリのような機構を用意するなど改善の余地がありそうです。 プロダクトや周辺システムの成長に従い、今後も改善していければと考えています。

余談ですが、なんとこの記事を執筆している最中に約 2 年リリースが無かった Apache Avro の進展があり、 1.9.0 がリリースされる兆し が出てきました! この件において、自社向けにパッチを当ててビルドするのが必要でなくなる日も近いのかも知れません。

さて、 MERPAY TECH OPENNESS MONTH の 2 日目の記事は @sinmetal による「メルペイにおけるGoogle Cloud Spannerの取り組み」になります。 引き続きお楽しみください!!