Apache Beamを使用したデータ処理ジョブの開発

Apache Beamを使用したデータ処理ジョブの開発

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

このブログ投稿は、Apache Beamに関するシリーズの第1部です。

皆さんはApache Beamについて詳しくご存じでしょうか? あまり知らなくとも、決して恥ずべきことではありません。Apache Software Foundationによって開発され、2016年6月に最初にリリースされた新しいプロジェクトであるApache Beamは、まだデータ処理の世界では比較的なじみの薄いものです。実際のところ、私も最近になって使い込むようになってから、Apache Beamについて学ぶことが楽しくなり、その素晴らしさがわかってきました。

Apache Beamは、バッチとストリーミングのデータ処理ジョブを実装し、さまざまなIOのセットを使用して任意の実行エンジンで実行するための簡単な方法を提供する統一プログラミングモデルです。有望に聞こえますが、まだわかりにくいと感じるでしょうか? Apache Beamに一連のブログ記事を掲載することにしたのはこのためです。今回と次回のブログでは、具体的な例を使用して、Apache Beamを使用したデータ処理ジョブのユースケースを紹介します。

今回のトピックはバッチ処理です。次の例を見てみましょう。自動車販売店での一定期間内の自動車販売を分析したいとします(たとえば、各ブランドの自動車の販売台数など)。これは、データセットが制限され(有限量のデータ)、更新されない(過去に売上が発生した)ことを意味します。この場合のデータ分析にはバッチプロセスを使用できます。

入力データとして、以下の形式の販売車のテキストログがあります。

id,brand_name,model_name,sales_number

例:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

初めてBeamアプリケーションの実装を開始する前に、常に使用される中心的概念について認識しておく必要があります。Beamには、パイプライン、PCollection、PTransformの3つの主な概念があります。

  • パイプラインは、データ処理タスク全体のワークフローを最初から最後までカプセル化します。
  • PCollectionは、BeamがPTransform間でデータを転送するために使用する分散データセットの抽象化です。
  • PTransformは、入力データ(入力PCollection)を操作して出力データ(出力PCollection)を生成するプロセスです。通常、最初と最後のPTransformは、制限されている(バッチ処理)または制限されていない(ストリーミング処理)データを入出力する方法を表します。

簡単にするために、パイプラインはワークフロー全体を表すDAG(有向非巡回グラフ)として、PTransformはノード(データを変換する)として、PCollectionsはこのグラフのエッジとして考えることができます。詳細については、「Beam Programming Guide」を参照してください。

例に戻って、提供されたデータセットを処理する最初のパイプラインを実装してみましょう。

パイプラインの作成

まず、新しいパイプラインを作成します。

Pipeline pipeline = Pipeline.create();

次に、pipeline.apply()メソッドを使って新しいPTransformを作成しましょう。これはテキストファイルからデータを読み込み、文字列の新しいPCollectionを作成します。これを行うには、Beamですでに実装されているIOの1つであるTextIOを使用します。TextIOはテキストファイルの1行ずつの読み書きを可能にします。さまざまなファイルシステムの使用、ファイルパターンのサポート、ファイルのストリーミングなど、ほかにも多くの機能があります。詳しくは、Apache Beamのドキュメントを参照してください。

apply(TextIO.read().from(/path/to/input/file))

このPTransformの出力は、PCollectionの新しいインスタンスであり、集合の各エントリは入力ファイルのテキスト行です。

結果としてブランド別の総販売数を求めたいので、それらをグループ化する必要があります。したがって、次のステップでは、すべての行を解析し、keyがブランド名でvalueが販売数となるキー/値ペアを作成します。前のPTransformからの出力PCollectionが、ここでは入力PCollectionになります。

このステップでは、Beamの内部PTransformMapElements)を使用して、提供されるSimpleFunctionインターフェイスの実装を使用して、入力項目ごとに新しいキー/値ペアを作成します。

次に、別のBeamの変換であるGroupByKeyを使用してブランド別に販売数をグループ化します。出力結果となるキー/値のPCollectionは、keyがブランド名で、valueがそのブランドの反復可能な売上高の集まりです。

.apply(GroupByKey.<String, Integer>create())

 

これで、ParDo変換の独自の実装を使用して、ブランド別の自動車販売数を集計する準備が整いました。

パイプラインを完成させるために、別のIO変換を適用し、文字列のPCollectionを取得してテキストファイルに書き込みます。

.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

最後に、作成したパイプラインを実行します。

pipeline.run();

とても簡単ですね。これが、最小限のコードで複雑なデータ処理パイプラインを作成できるようにするApache Beamの力です。

Hadoopに精通している方は、このパイプラインが以下の点で何かに似ていることにお気づきかと思います。

  • テキストデータを1行ずつ読み込んで解析し、新しいキー/値ペアを作成する(Map
  • 次に、これらのキー/値をキーでグループ化する(GroupBy
  • 最後に、ユーザー関数を適用して1つのキーのすべての値を反復処理する(Reduce

そうです! このシンプルなパイプラインは、従来のMapReduceジョブで実行できます。しかし、Beamの方が大幅に単純で明確です(Javaであるにもかかわらず!)。別の変換を追加してパイプラインを拡張する場合も、それほど複雑化することはありません。

パイプラインの構築と実行

前述したように、Beamパイプラインはさまざまなランナー(処理エンジン)で実行できます。

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

これを行うには、MavenまたはGradleプロジェクト構成に対応する依存関係を追加するだけです。これには、各ランナーで実行するために、パイプラインコードを調整したり書き直したりする必要がないというメリットがあります。さらに、以前に必要なランナーの依存関係がすべて含まれていた場合は、jarを再コンパイルする必要はありません。使用するランナーを選択するだけでいいのです。

Direct Runnerは、通常はパイプラインをテストするために使用されるローカルランナーです。Javaを使用するときは、pom.xmlでDirect Runnerへの依存関係を指定する必要があります。


<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.3.0</version>
   <scope>runtime</scope>
</dependency>


その後、プロジェクトをコンパイルする必要があります。
# mvn clean package

そして、ダイレクトランナーでパイプラインを実行します。
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

たとえば、入力ファイルに次のデータが含まれているとします。
# cat /tmp/beam/cars_sales_log
1,Toyota,Prius,3
2,Nissan,Sentra,2
1,Toyota,Yaris,4
3,Ford,Fusion,5
3,Ford,Kuga,3

すると、最終結果は次のようになります。
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

サポートされているすべてのランナーのリストと指示、使用方法は、このページをご覧ください。

最後に、この例で使用したすべてのコードは、すべてGitHubレポジトリ(https://github.com/aromanenko-dev/beam-tutorial)に公開されています。

このブログ記事シリーズの次回は、Beamでのストリーミングデータ処理について説明します。無制限のデータソースを使用したデータ分析タスクの例を取り上げ、Beamの機能を確認します。

ディスカッションに参加

0 Comments

コメントを残す

Your email address will not be published. Required fields are marked *