こんにちは!@karupaneruraです。 自分のことはPerlの人として知ってもらっている方が多いかと思いますが、今回はJavaの記事を書いてみます。
今回の記事では、「Apache Beamについて調べたけどよくわからなかった」「Apache Beamはなんとなく分かるような気がするけどサンプルコードを多少書き換える範囲から抜け出しきれない」「理解をもう一歩進めて複雑なパイプラインも記述できるようになりたい」という方に向けて、どのようにBeam SDKをつかってどのようなことができるのか、その概観を掴んでもらえるようなところを目標にしています。
なお、分かりやすさのために説明を一部端折ることで不正確な部分が一部出てくるかと思いますが、この概観を掴んだあとに公式ドキュメントを読むなどして補完・修正していただけますと幸いです。
Apache Beamのご紹介
特にApache Beamを知らない方に向けて、Apache Beamに関して簡単に紹介します。
Apache Beam(以後、Beam)はストリーム処理/バッチ処理を記述するためのフレームワークで、Beamを利用して記述したデータ処理パイプラインを様々な実行基盤の上で実行することができます。
Beamは主にETLに使われることが多いかと思いますが他にも様々な用途に利用することができます。 たとえば、複数のAPIから得た情報を突き合わせて一定の条件下である処理を実行する、といった素朴に作るとスケーラブルに実装することが難しいような処理もBeamではパイプラインとして比較的簡単に記述することが出来ます。
また、BeamではJavaやPythonなどの各種プログラミング言語でデータ処理のパイプラインを記述することができます。 また、それがどのような処理基盤の上で実行されるのかをほとんど考えずに本質的な処理に集中して記述することができます。 たとえば、複数サーバーを使って並列処理をしたい場合でもそのことをパイプライン上で表現するには単に実行単位をコード上で分割するだけで済みます。
Beamの実行基盤は様々なものがありますが、Google Cloud Dataflow(以後、Dataflow)が最も代表的なのではないでしょうか。 Dataflowは大変スケーラビリティの高いBeamの実行基盤です。オートスケーリングに対応しており、ちゃんとスケールできるように記述してさえいれば処理するデータの規模に応じて自動でスケールアウトしてくれます。
このようにBeamはとても便利なフレームワークですが、当然ながらBeamを使って自由自在にパイプラインを作るためには何をどのように利用すればそれが実現できるのかを正しく理解しなければなりません。 しかし、入門のための資料やリファレンスはそれなりに充実しているものの、実際には見た目以上に独特な挙動をするために最初は戸惑ってしまう事があると思います。
そこで、今回の記事ではBeam Java SDKが提供してくれる道具がそれぞれどのような役割を担っているのか、それらはどのように動くのか、それらをどのように利用すればよいのか、といったところを解説します。 Beam SDKのより詳細な概念などに関しては他の資料をご参照ください。
基本的な3つの概念
Beam SDKをつかってパイプラインを記述する際に、ある3つの重要な概念が登場します。
Pipeline
とPCollection
、そしてPTransform
です。
Pipeline
とはその名前のとおり、データ処理を実行するためのパイプラインを示す概念です。
Pipeline.create
で作成します。最初は空っぽで、何もしないパイプラインになっています。
例えるならば、空っぽのソースコードを用意する行為に相当します。
Pipeline pipeline = Pipeline.create(options);
このPipeline
に対して何らかの処理をapply
することで、そのパイプラインでどのような処理を行うのかを表現します。
例えるならば、ソースコードに関数呼び出しを追加する行為に相当します。
pipeline.apply(transform);
この際にapply
するものがPTransform
です。
JavaではGenericsを使って表現されており、PTransform<InputT, OutputT>
となっています。
PTransform
はかなり抽象的な概念ですが、代表的な例を挙げるといわゆるMap関数やFilter関数がわかりやすいのではないでしょうか。
仮に型変数T
とR
があるとするとそれぞれ、Map関数はPTransform<PCollection<T>, PCollection<R>>
であり、同様にFilter関数はPTransform<PCollection<T>, PCollection<T>>
のように表現できます。(突然PCollection
が登場していますが、いったん気にせずにこの概念をふわっとさせたままPTransform
の説明を続けます。)
PTransform
で表現できるものはそれだけではなく、パイプラインの始まりと終わりをどうするのか、といったことも定義することができます。
この場合、それぞれPBegin
とPDone
で表現します。
たとえば、整数の数列を読み込む処理はPTransform<PBegin, PCollection<Integer>>
であり、
それをもとに副作用を発生させるなどしてパイプラインを終端する処理はPTransform<PCollection<Integer>, PDone>
と表現できます。
つまり、InputT
にPTransform<InputT, OutputT>
を適用(apply
)したらOutputT
になるということを表現しているのです。
そのため、パイプラインはPBegin
からPDone
までのあいだにどのようなPTransform
を適用(apply
)するのか、といった考え方ですべてを記述します。
これを聞くと、上記の例を含むほとんどのサンプルコードでpipeline.apply
からパイプラインが始まっていることから矛盾を感じるかもしれませんが、実はpipeline.apply
とは、PBegin.in(pipeline).apply
の省略形なのでこれは単にPBegin
が隠蔽されているだけに過ぎません。
ここで、PCollection<T>
に立ち戻ってみると、これはまさにパイプラインの過程で扱う型T
の数列を抽象的に表現していることが分かるかと思います。
たとえば、データを読み込み加工しその内容に応じて副作用を発生させて終端するパイプラインは、PBegin
->PCollection<T>
->PDone
のような過程をたどります。
以下の例はParDo
を利用しているためPDone
は登場しませんが、代わりにPCollection<Void>
で代用しています。(これまたParDo
が突然登場していますがいったん気にしないで進みます。)
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read Integer", Create.of(1, 2, 3)) // PCollection<Integer>
.apply("Print Integer", ParDo.of(new DoFn<String, Void>() { // PCollection<Void>
@ProcessElement
public void processElement(@Element Integer value) {
System.out.println(String.valueOf(value));
}
}));
このように、PDone
は必ずしも利用する必要はなく、単にPCollection<T>
でパイプラインを終わらせても何も問題はありません。
PDone
は単に、このパイプラインの先にはいかなるデータも流れない、ということを示すだけのものに過ぎません。
PTransformの実態
さて、ここまでの話を理解すると、BeamでパイプラインをつくるということはPTransform
をどのように適用していくかを考えることと同義であることがわかります。
では、PTransform
を作るにはどうしたらよいのでしょうか?
InputT
にPTransform<InputT, OutputT>
を適用(apply
)したらOutputT
になるということは前述の通りですが、では抽象クラスとしてのPTransform
がそのような機能を持っているのかというと実はそんなことはありません。
では、一体どのようにしてPTransform
を構成したらよいのか。
実は最小単位としてParDo
というPTransform
の実装があり、それを直接的あるいは間接的に利用することでほとんどすべてのPTransform
は構成されています。
そのため、ParDo
を理解することがPTransform
を理解するための近道になっています。
ParDo
先程もしれっと登場したParDo
ですが、これが実に様々な機能を持っています。
機能の詳細は公式ドキュメントに譲るとして、ここではParDo
で何を行うことができるのかを簡単に解説します。
ParDo
で具体的に何を行うかはDoFn
という抽象クラスの実装で表現します。
一般的には先の例のように匿名クラスとしてその実装を行うのが一般的です。
さて、このDoFn
ですが抽象クラスであるものの抽象メソッドの定義はありません。
先程の例のprocessElement
も単にDoFn
を継承した匿名クラスの独自の実装として定義されているだけです。
そうであるにも関わらず、これは実際にうまく動作します。
実はParDo
はDoFn
の実装上のメソッドにある@ProcessElement
などのアノテーションによって、それが何をするメソッドなのかを見分けているのです。
そのため、インターフェースや抽象メソッドを実装していなくてもうまく動くようになっています。
これはアノテーションではなくインターフェースや抽象メソッドでよいのではないか?と思うかもしれません。
しかし、@ProcessElement
だけを切り取っても実に多様なインターフェースを持つことができるようになっており、これを実現するためにはアノテーションを使って表現するしかないのだろうと考えられます。
これも、より詳細について知りたい方は公式ドキュメントを参照してください。
なお、Map関数のような基本的なTransformについては、
公式のカタログ
に色々載っています。
たとえば、Map関数相当の操作であればMapElements
、そしてFilter関数相当の操作であればFilter
が利用できます。先程登場したParDo
の詳しい説明もここから辿って読むことができます。
Serializable
ところで、これまでに説明した概念の説明だけでは、サンプルコードを見てもこれらのPTransform
がなぜ分散実行できるようになるのか全く分からないと思います。
実はPTransform
はJavaのSerializable
インターフェースを実装しており、BeamはSerializable
を使うことによって分散実行を実現しているのです。
つまり、全てのPTransform
はSerializable
でなければなりません。
たとえば先程のDoFn
を実装した匿名クラスがSerializable
でないメンバーを持っている場合、NoSerializableException
がthrow
されます。
どうしてもSerializable
にできない場合には、たとえばstaticメンバとして持つ(ただし、スレッドセーフになるように注意が必要です)、transient
修飾子を利用してシリアライズ対象から外すなどの対応策が考えられます。
MapElements
などにラムダを渡す場合もそのラムダはSerializable
でなければなりません。
これをサポートするために、SerializableFunction
インターフェースやSimpleFunction
抽象クラスがBeam SDKによって提供されています。
Coder
さて、PTransform
がSerializable
でなければならないのなら、パイプラインを流れるデータもSerializable
でなければならないのでは?と疑問に思うかと思います。
実際のところ、ほとんどのデータはSerializable
で扱われますが、それはCoder
という仕組みによって抽象化されています。
Coder
はパイプラインを持つデータをどのようにシリアライズ/デシリアライズして受け渡すのかを定義するものです。
実際にはこれはPCollection
ごとに持つことになりますが、ほとんどの場合は自動的にCoder
が設定されるため、これを意識しなければならない場面は多くありません。
しかし、Beamが正確にCoder
を設定できないケースが稀に存在します。そのようなときには手動でCoder
を設定する必要があります。
PCollection
のsetCoder
を利用してCoder
を設定してあげましょう。
この際、パイプラインを流れるデータがSerializableであれば単にSerializableCoder
を利用すれば良いでしょう。
型の指定にはTypeDescriptor
というクラスを利用して行います。
単純なクラスの場合はTypeDescriptor.of(Hoge.class)
のように単にClass<T>
からTypeDescriptor<T>
を作ってあげればよいです。
コンテナ型の場合、一般的な型であればTypeDescriptors
に主要なものは実装されているため、これを直接使うのが便利です。
そうでない型の場合は、new TypeDescriptor<Hoge<Fuga, Piyo>>() {}
のように匿名クラスを作って利用すれば大丈夫です。
TypeDescriptor
の使い方は、先程すこし紹介したMapElements
などでも利用することがあるので覚えておくとよいでしょう。
ちなみに、KotlinなどのJava以外のJVMで動く言語では、このあたりの型について一部に違いがあるために、一見正しいように見えてもうまく動作しないという問題が起こりがちです。
たとえば、自分の知っている問題のひとつとして、KotlinのIterable<T>
はIterable<? extends T>
と解釈されてしまいうまく扱えないというものがあります。
KotlinやScalaなども良い言語であることは個人的にはよく知っていますが、このような問題を避けるためにはJavaを選択したほうが賢明かもしれません。それらの言語とJVMについてよほど詳しい場合は別ですが……。
集計処理
Beamではパイプライン処理を行うことができるとこれまで説明してきましたが、実はBeamを使うとパイプラインに流れてくるデータを集計することもできます。
先程のDoFn
の例でも見たように自由な処理を記述することができるので、当然ながら集計もできるというわけです。
その際、パイプラインで扱うデータは2つのパターンがあり、たとえばいわゆるPub/Subのようなものからデータを受け取って処理するようなパイプラインの終わりが定義できない場合と、ファイルを読み込んで全行に対して処理するような終わりが定義できる場合とのパターンに分けて考えることができます。 これをそれぞれ、BeamにおいてはUnboundedとBoundedと呼びます。
基本的に集計処理はBoundedなPCollection
に対してしか行うことができません。
Unboundedでは終わりが定義できないため、どこまで集計するかを定義できないと集計できないためです。
それでは困るので、BeamにおいてはUnboundedなものをあたかもBoundedに変換したかのように扱える仕組みが存在します。それがWindowingとTriggerです。
WindowingとTrigger
先程、終わりが定義できないので集計できないという説明をしましたが、WindowingはUnboundedなものをBoundedな単位に分割する手段といえます。
Windowingの手段は一般的なものがいくつか提供されていますが、一番わかりやすいのはFixedWindows
でしょうか。
これは、たとえば10秒毎といった単位の時間を指定してUnbounded Sourceを分割することができます。この分割した単位をWindow
と呼びます。
もう少し詳しい説明をすると、パイプラインを流れるデータはなんらかのWindow
に属します。何もWindowingしていない場合はGlobalWindow
に属しています。
つまり、Windowingとは実際にはWindow
を分割する処理であるといえます。
TriggerはこのWindowをどこで終わらせるのか、ということを決めるための概念になります。 分散処理されるパイプライン処理においてはデータの到着が遅延するというケースはありえるため、いつまで遅延データを待つのかを決めるのがTriggerの最も基本的なユースケースです。(このためにWatermarkという概念を考える必要がありますが、今回は割愛します。) TriggerはWindowingにおいて基本的にセットで考える概念ですが、基本的にはBeamがデフォルトで提供する挙動で問題がない場合がほとんどです。
ほか、たとえばWindowingで区切られた前のデータも含めて集計するといったことをしたい場合にも活用することができますが、このあたりの応用的なユースケースに関しては公式ドキュメントを参照してもらうのがよいでしょう。
KV
集計を行う際に、たとえばユーザー毎に集計を行うといったケースがあると思います。
そのような場合に集計のキーと実際のデータを抽象的に表現するために、KV
というクラスが提供されています。
これは素朴にキーとそれに対応する値のペアを持つだけの非常に簡単なクラスです。
たとえば、あるデータをユーザーごとに集計するためには、そのデータを扱うPCollection<T>
に対してWithKeys.of((SerializableFunction) entry -> entry.getUserId())
というPTransform
を適用するとPCollection<KV<String, T>>
に変換することができます。
なお、KV
を手で扱う場合はKV.of
を利用するとよいでしょう。
GroupBy
Beamで利用可能な集計処理にはいくつかありますが、最も基本的と思われるGroupBy
だけ簡単に紹介します。
これは、KV
のキーの部分が共通する値ごとにまとめてIterable
にしてくれます。
たとえば、以下のようなデータが流れてきた場合は:
a: 1
b: 2
a: 3
c: 4
d: 5
b: 6
以下のようにまとめてくれます:
a: [1, 3]
b: [2, 6]
c: [4]
d: [5]
つまり、型としてはPCollection<KV<K, V>>
をPCollection<KV<K, Iterable<V>>>
に変換してくれるものと考えることができます。
あとは、単純にこれを処理するPTransform
を用意すれば任意の集計が可能です。
ほかにも、Combine
やCoGroupBy
などいくつかの手段が標準で提供されています。
詳しくは公式ドキュメントを参照してみてください。
おわりに
以上でおおまかな概観が掴めたのではないかと思います。 これをもとにとりあえず触ってみて、触り心地を確かめてみると学習のモチベーションになるのではないかと思います。
さらなる理解を深めるためには公式ドキュメントを参照することも不可欠ですので、いくつかキーとなるドキュメントを紹介します。
具体的にどのようにパイプラインを設計していけばよいかは Design Your Pipeline が最もわかりやすいかと思います。 基本的な概念は Basics of the Beam model で学ぶとよいでしょう。詳細に関しては Beam Programming Guide と各クラスのリファレンスを参照するとよいかと思います。
Beam SDKの概観が少しでも伝わってBeamを利用する際の一助になれば幸いです。
最後まで読んでいただき、ありがとうございます!
この記事をシェアしていただける方はこちらからお願いします。