blog

DeNAのエンジニアが考えていることや、担当しているサービスについて情報発信しています

2023.12.05 技術記事

Apache Beamの概観を理解する

by karupanerura

#beam #java

こんにちは!@karupaneruraです。 自分のことはPerlの人として知ってもらっている方が多いかと思いますが、今回はJavaの記事を書いてみます。

今回の記事では、「Apache Beamについて調べたけどよくわからなかった」「Apache Beamはなんとなく分かるような気がするけどサンプルコードを多少書き換える範囲から抜け出しきれない」「理解をもう一歩進めて複雑なパイプラインも記述できるようになりたい」という方に向けて、どのようにBeam SDKをつかってどのようなことができるのか、その概観を掴んでもらえるようなところを目標にしています。

なお、分かりやすさのために説明を一部端折ることで不正確な部分が一部出てくるかと思いますが、この概観を掴んだあとに公式ドキュメントを読むなどして補完・修正していただけますと幸いです。

Apache Beamのご紹介

特にApache Beamを知らない方に向けて、Apache Beamに関して簡単に紹介します。

Apache Beam(以後、Beam)はストリーム処理/バッチ処理を記述するためのフレームワークで、Beamを利用して記述したデータ処理パイプラインを様々な実行基盤の上で実行することができます。

Beamは主にETLに使われることが多いかと思いますが他にも様々な用途に利用することができます。 たとえば、複数のAPIから得た情報を突き合わせて一定の条件下である処理を実行する、といった素朴に作るとスケーラブルに実装することが難しいような処理もBeamではパイプラインとして比較的簡単に記述することが出来ます。

また、BeamではJavaやPythonなどの各種プログラミング言語でデータ処理のパイプラインを記述することができます。 また、それがどのような処理基盤の上で実行されるのかをほとんど考えずに本質的な処理に集中して記述することができます。 たとえば、複数サーバーを使って並列処理をしたい場合でもそのことをパイプライン上で表現するには単に実行単位をコード上で分割するだけで済みます。

Beamの実行基盤は様々なものがありますが、Google Cloud Dataflow(以後、Dataflow)が最も代表的なのではないでしょうか。 Dataflowは大変スケーラビリティの高いBeamの実行基盤です。オートスケーリングに対応しており、ちゃんとスケールできるように記述してさえいれば処理するデータの規模に応じて自動でスケールアウトしてくれます。

このようにBeamはとても便利なフレームワークですが、当然ながらBeamを使って自由自在にパイプラインを作るためには何をどのように利用すればそれが実現できるのかを正しく理解しなければなりません。 しかし、入門のための資料やリファレンスはそれなりに充実しているものの、実際には見た目以上に独特な挙動をするために最初は戸惑ってしまう事があると思います。

そこで、今回の記事ではBeam Java SDKが提供してくれる道具がそれぞれどのような役割を担っているのか、それらはどのように動くのか、それらをどのように利用すればよいのか、といったところを解説します。 Beam SDKのより詳細な概念などに関しては他の資料をご参照ください。

基本的な3つの概念

Beam SDKをつかってパイプラインを記述する際に、ある3つの重要な概念が登場します。 PipelinePCollection、そしてPTransformです。

Pipelineとはその名前のとおり、データ処理を実行するためのパイプラインを示す概念です。 Pipeline.create で作成します。最初は空っぽで、何もしないパイプラインになっています。 例えるならば、空っぽのソースコードを用意する行為に相当します。

Pipeline pipeline = Pipeline.create(options);

このPipelineに対して何らかの処理をapplyすることで、そのパイプラインでどのような処理を行うのかを表現します。 例えるならば、ソースコードに関数呼び出しを追加する行為に相当します。

pipeline.apply(transform);

この際にapplyするものがPTransformです。 JavaではGenericsを使って表現されており、PTransform<InputT, OutputT>となっています。

PTransformはかなり抽象的な概念ですが、代表的な例を挙げるといわゆるMap関数やFilter関数がわかりやすいのではないでしょうか。

仮に型変数TRがあるとするとそれぞれ、Map関数はPTransform<PCollection<T>, PCollection<R>>であり、同様にFilter関数はPTransform<PCollection<T>, PCollection<T>>のように表現できます。(突然PCollectionが登場していますが、いったん気にせずにこの概念をふわっとさせたままPTransformの説明を続けます。)

PTransformで表現できるものはそれだけではなく、パイプラインの始まりと終わりをどうするのか、といったことも定義することができます。 この場合、それぞれPBeginPDoneで表現します。

たとえば、整数の数列を読み込む処理はPTransform<PBegin, PCollection<Integer>>であり、 それをもとに副作用を発生させるなどしてパイプラインを終端する処理はPTransform<PCollection<Integer>, PDone>と表現できます。

つまり、InputTPTransform<InputT, OutputT>を適用(apply)したらOutputTになるということを表現しているのです。

そのため、パイプラインはPBeginからPDoneまでのあいだにどのようなPTransformを適用(apply)するのか、といった考え方ですべてを記述します。 これを聞くと、上記の例を含むほとんどのサンプルコードでpipeline.applyからパイプラインが始まっていることから矛盾を感じるかもしれませんが、実はpipeline.applyとは、PBegin.in(pipeline).applyの省略形なのでこれは単にPBeginが隠蔽されているだけに過ぎません。

ここで、PCollection<T>に立ち戻ってみると、これはまさにパイプラインの過程で扱う型Tの数列を抽象的に表現していることが分かるかと思います。 たとえば、データを読み込み加工しその内容に応じて副作用を発生させて終端するパイプラインは、PBegin->PCollection<T>->PDoneのような過程をたどります。 以下の例はParDoを利用しているためPDoneは登場しませんが、代わりにPCollection<Void>で代用しています。(これまたParDoが突然登場していますがいったん気にしないで進みます。)

Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read Integer", Create.of(1, 2, 3)) // PCollection<Integer>
    .apply("Print Integer", ParDo.of(new DoFn<String, Void>() { // PCollection<Void>
         @ProcessElement
         public void processElement(@Element Integer value) {
            System.out.println(String.valueOf(value));
         }
    }));

このように、PDoneは必ずしも利用する必要はなく、単にPCollection<T>でパイプラインを終わらせても何も問題はありません。 PDoneは単に、このパイプラインの先にはいかなるデータも流れない、ということを示すだけのものに過ぎません。

PTransformの実態

さて、ここまでの話を理解すると、BeamでパイプラインをつくるということはPTransformをどのように適用していくかを考えることと同義であることがわかります。 では、PTransformを作るにはどうしたらよいのでしょうか?

InputTPTransform<InputT, OutputT>を適用(apply)したらOutputTになるということは前述の通りですが、では抽象クラスとしてのPTransformがそのような機能を持っているのかというと実はそんなことはありません。 では、一体どのようにしてPTransformを構成したらよいのか。

実は最小単位としてParDoというPTransformの実装があり、それを直接的あるいは間接的に利用することでほとんどすべてのPTransformは構成されています。 そのため、ParDoを理解することがPTransformを理解するための近道になっています。

ParDo

先程もしれっと登場したParDoですが、これが実に様々な機能を持っています。 機能の詳細は公式ドキュメントに譲るとして、ここではParDoで何を行うことができるのかを簡単に解説します。

ParDoで具体的に何を行うかはDoFnという抽象クラスの実装で表現します。 一般的には先の例のように匿名クラスとしてその実装を行うのが一般的です。

さて、このDoFnですが抽象クラスであるものの抽象メソッドの定義はありません。 先程の例のprocessElementも単にDoFnを継承した匿名クラスの独自の実装として定義されているだけです。 そうであるにも関わらず、これは実際にうまく動作します。

実はParDoDoFnの実装上のメソッドにある@ProcessElementなどのアノテーションによって、それが何をするメソッドなのかを見分けているのです。 そのため、インターフェースや抽象メソッドを実装していなくてもうまく動くようになっています。

これはアノテーションではなくインターフェースや抽象メソッドでよいのではないか?と思うかもしれません。 しかし、@ProcessElementだけを切り取っても実に多様なインターフェースを持つことができるようになっており、これを実現するためにはアノテーションを使って表現するしかないのだろうと考えられます。

これも、より詳細について知りたい方は公式ドキュメントを参照してください。

なお、Map関数のような基本的なTransformについては、 公式のカタログ に色々載っています。 たとえば、Map関数相当の操作であればMapElements、そしてFilter関数相当の操作であればFilterが利用できます。先程登場したParDoの詳しい説明もここから辿って読むことができます。

Serializable

ところで、これまでに説明した概念の説明だけでは、サンプルコードを見てもこれらのPTransformがなぜ分散実行できるようになるのか全く分からないと思います。 実はPTransformはJavaのSerializableインターフェースを実装しており、BeamはSerializableを使うことによって分散実行を実現しているのです。

つまり、全てのPTransformSerializableでなければなりません。 たとえば先程のDoFnを実装した匿名クラスがSerializableでないメンバーを持っている場合、NoSerializableExceptionthrowされます。 どうしてもSerializableにできない場合には、たとえばstaticメンバとして持つ(ただし、スレッドセーフになるように注意が必要です)、transient修飾子を利用してシリアライズ対象から外すなどの対応策が考えられます。

MapElementsなどにラムダを渡す場合もそのラムダはSerializableでなければなりません。 これをサポートするために、SerializableFunctionインターフェースやSimpleFunction抽象クラスがBeam SDKによって提供されています。

Coder

さて、PTransformSerializableでなければならないのなら、パイプラインを流れるデータもSerializableでなければならないのでは?と疑問に思うかと思います。 実際のところ、ほとんどのデータはSerializableで扱われますが、それはCoderという仕組みによって抽象化されています。

Coderはパイプラインを持つデータをどのようにシリアライズ/デシリアライズして受け渡すのかを定義するものです。 実際にはこれはPCollectionごとに持つことになりますが、ほとんどの場合は自動的にCoderが設定されるため、これを意識しなければならない場面は多くありません。 しかし、Beamが正確にCoderを設定できないケースが稀に存在します。そのようなときには手動でCoderを設定する必要があります。 PCollectionsetCoderを利用してCoderを設定してあげましょう。

この際、パイプラインを流れるデータがSerializableであれば単にSerializableCoderを利用すれば良いでしょう。 型の指定にはTypeDescriptorというクラスを利用して行います。

単純なクラスの場合はTypeDescriptor.of(Hoge.class)のように単にClass<T>からTypeDescriptor<T>を作ってあげればよいです。 コンテナ型の場合、一般的な型であればTypeDescriptorsに主要なものは実装されているため、これを直接使うのが便利です。 そうでない型の場合は、new TypeDescriptor<Hoge<Fuga, Piyo>>() {} のように匿名クラスを作って利用すれば大丈夫です。 TypeDescriptorの使い方は、先程すこし紹介したMapElementsなどでも利用することがあるので覚えておくとよいでしょう。

ちなみに、KotlinなどのJava以外のJVMで動く言語では、このあたりの型について一部に違いがあるために、一見正しいように見えてもうまく動作しないという問題が起こりがちです。 たとえば、自分の知っている問題のひとつとして、KotlinのIterable<T>Iterable<? extends T>と解釈されてしまいうまく扱えないというものがあります。 KotlinやScalaなども良い言語であることは個人的にはよく知っていますが、このような問題を避けるためにはJavaを選択したほうが賢明かもしれません。それらの言語とJVMについてよほど詳しい場合は別ですが……。

集計処理

Beamではパイプライン処理を行うことができるとこれまで説明してきましたが、実はBeamを使うとパイプラインに流れてくるデータを集計することもできます。 先程のDoFnの例でも見たように自由な処理を記述することができるので、当然ながら集計もできるというわけです。

その際、パイプラインで扱うデータは2つのパターンがあり、たとえばいわゆるPub/Subのようなものからデータを受け取って処理するようなパイプラインの終わりが定義できない場合と、ファイルを読み込んで全行に対して処理するような終わりが定義できる場合とのパターンに分けて考えることができます。 これをそれぞれ、BeamにおいてはUnboundedとBoundedと呼びます。

基本的に集計処理はBoundedなPCollectionに対してしか行うことができません。 Unboundedでは終わりが定義できないため、どこまで集計するかを定義できないと集計できないためです。

それでは困るので、BeamにおいてはUnboundedなものをあたかもBoundedに変換したかのように扱える仕組みが存在します。それがWindowingとTriggerです。

WindowingとTrigger

先程、終わりが定義できないので集計できないという説明をしましたが、WindowingはUnboundedなものをBoundedな単位に分割する手段といえます。

Windowingの手段は一般的なものがいくつか提供されていますが、一番わかりやすいのはFixedWindowsでしょうか。 これは、たとえば10秒毎といった単位の時間を指定してUnbounded Sourceを分割することができます。この分割した単位をWindowと呼びます。

もう少し詳しい説明をすると、パイプラインを流れるデータはなんらかのWindowに属します。何もWindowingしていない場合はGlobalWindowに属しています。 つまり、Windowingとは実際にはWindowを分割する処理であるといえます。

TriggerはこのWindowをどこで終わらせるのか、ということを決めるための概念になります。 分散処理されるパイプライン処理においてはデータの到着が遅延するというケースはありえるため、いつまで遅延データを待つのかを決めるのがTriggerの最も基本的なユースケースです。(このためにWatermarkという概念を考える必要がありますが、今回は割愛します。) TriggerはWindowingにおいて基本的にセットで考える概念ですが、基本的にはBeamがデフォルトで提供する挙動で問題がない場合がほとんどです。

ほか、たとえばWindowingで区切られた前のデータも含めて集計するといったことをしたい場合にも活用することができますが、このあたりの応用的なユースケースに関しては公式ドキュメントを参照してもらうのがよいでしょう。

KV

集計を行う際に、たとえばユーザー毎に集計を行うといったケースがあると思います。 そのような場合に集計のキーと実際のデータを抽象的に表現するために、KVというクラスが提供されています。 これは素朴にキーとそれに対応する値のペアを持つだけの非常に簡単なクラスです。

たとえば、あるデータをユーザーごとに集計するためには、そのデータを扱うPCollection<T>に対してWithKeys.of((SerializableFunction) entry -> entry.getUserId())というPTransformを適用するとPCollection<KV<String, T>>に変換することができます。 なお、KVを手で扱う場合はKV.ofを利用するとよいでしょう。

GroupBy

Beamで利用可能な集計処理にはいくつかありますが、最も基本的と思われるGroupByだけ簡単に紹介します。 これは、KVのキーの部分が共通する値ごとにまとめてIterableにしてくれます。

たとえば、以下のようなデータが流れてきた場合は:

a: 1
b: 2
a: 3
c: 4
d: 5
b: 6

以下のようにまとめてくれます:

a: [1, 3]
b: [2, 6]
c: [4]
d: [5]

つまり、型としてはPCollection<KV<K, V>>PCollection<KV<K, Iterable<V>>>に変換してくれるものと考えることができます。 あとは、単純にこれを処理するPTransformを用意すれば任意の集計が可能です。

ほかにも、CombineCoGroupByなどいくつかの手段が標準で提供されています。 詳しくは公式ドキュメントを参照してみてください。

おわりに

以上でおおまかな概観が掴めたのではないかと思います。 これをもとにとりあえず触ってみて、触り心地を確かめてみると学習のモチベーションになるのではないかと思います。

さらなる理解を深めるためには公式ドキュメントを参照することも不可欠ですので、いくつかキーとなるドキュメントを紹介します。

具体的にどのようにパイプラインを設計していけばよいかは Design Your Pipeline が最もわかりやすいかと思います。 基本的な概念は Basics of the Beam model で学ぶとよいでしょう。詳細に関しては Beam Programming Guide と各クラスのリファレンスを参照するとよいかと思います。

Beam SDKの概観が少しでも伝わってBeamを利用する際の一助になれば幸いです。

最後まで読んでいただき、ありがとうございます!
この記事をシェアしていただける方はこちらからお願いします。

recruit

DeNAでは、失敗を恐れず常に挑戦し続けるエンジニアを募集しています。