月別: April 2018

Apache Beamを使用したデータ処理ジョブの開発

このブログ投稿は、Apache Beamに関するシリーズの第1部です。 皆さんはApache Beamについて詳しくご存じでしょうか? あまり知らなくとも、決して恥ずべきことではありません。Apache Software Foundationによって開発され、2016年6月に最初にリリースされた新しいプロジェクトであるApache Beamは、まだデータ処理の世界では比較的なじみの薄いものです。実際のところ、私も最近になって使い込むようになってから、Apache Beamについて学ぶことが楽しくなり、その素晴らしさがわかってきました。 Apache Beamは、バッチとストリーミングのデータ処理ジョブを実装し、さまざまなIOのセットを使用して任意の実行エンジンで実行するための簡単な方法を提供する統一プログラミングモデルです。有望に聞こえますが、まだわかりにくいと感じるでしょうか? Apache Beamに一連のブログ記事を掲載することにしたのはこのためです。今回と次回のブログでは、具体的な例を使用して、Apache Beamを使用したデータ処理ジョブのユースケースを紹介します。 今回のトピックはバッチ処理です。次の例を見てみましょう。自動車販売店での一定期間内の自動車販売を分析したいとします(たとえば、各ブランドの自動車の販売台数など)。これは、データセットが制限され(有限量のデータ)、更新されない(過去に売上が発生した)ことを意味します。この場合のデータ分析にはバッチプロセスを使用できます。 入力データとして、以下の形式の販売車のテキストログがあります。 id,brand_name,model_name,sales_number 例: 1,Toyota,Prius,3 2,Nissan,Sentra,2 3,Ford,Fusion,4 初めてBeamアプリケーションの実装を開始する前に、常に使用される中心的概念について認識しておく必要があります。Beamには、パイプライン、PCollection、PTransformの3つの主な概念があります。 パイプラインは、データ処理タスク全体のワークフローを最初から最後までカプセル化します。 PCollectionは、BeamがPTransform間でデータを転送するために使用する分散データセットの抽象化です。 PTransformは、入力データ(入力PCollection)を操作して出力データ(出力PCollection)を生成するプロセスです。通常、最初と最後のPTransformは、制限されている(バッチ処理)または制限されていない(ストリーミング処理)データを入出力する方法を表します。 簡単にするために、パイプラインはワークフロー全体を表すDAG(有向非巡回グラフ)として、PTransformはノード(データを変換する)として、PCollectionsはこのグラフのエッジとして考えることができます。詳細については、「Beam Programming Guide」を参照してください。 例に戻って、提供されたデータセットを処理する最初のパイプラインを実装してみましょう。 パイプラインの作成 まず、新しいパイプラインを作成します。 Pipeline pipeline = Pipeline.create(); 次に、pipeline.apply()メソッドを使って新しいPTransformを作成しましょう。これはテキストファイルからデータを読み込み、文字列の新しいPCollectionを作成します。これを行うには、Beamですでに実装されているIOの1つであるTextIOを使用します。TextIOはテキストファイルの1行ずつの読み書きを可能にします。さまざまなファイルシステムの使用、ファイルパターンのサポート、ファイルのストリーミングなど、ほかにも多くの機能があります。詳しくは、Apache Beamのドキュメントを参照してください。 apply(TextIO.read().from(“/path/to/input/file”)) このPTransformの出力は、PCollectionの新しいインスタンスであり、集合の各エントリは入力ファイルのテキスト行です。 結果としてブランド別の総販売数を求めたいので、それらをグループ化する必要があります。したがって、次のステップでは、すべての行を解析し、keyがブランド名でvalueが販売数となるキー/値ペアを作成します。前のPTransformからの出力PCollectionが、ここでは入力PCollectionになります。 このステップでは、Beamの内部PTransform(MapElements)を使用して、提供されるSimpleFunctionインターフェイスの実装を使用して、入力項目ごとに新しいキー/値ペアを作成します。 次に、別のBeamの変換であるGroupByKeyを使用してブランド別に販売数をグループ化します。出力結果となるキー/値のPCollectionは、keyがブランド名で、valueがそのブランドの反復可能な売上高の集まりです。 .apply(GroupByKey.<String, Integer>create())   これで、ParDo変換の独自の実装を使用して、ブランド別の自動車販売数を集計する準備が整いました。 パイプラインを完成させるために、別のIO変換を適用し、文字列のPCollectionを取得してテキストファイルに書き込みます。 .apply(TextIO.write().to(“/path/to/output/dir”).withoutSharding()); 最後に、作成したパイプラインを実行します。 pipeline.run(); とても簡単ですね。これが、最小限のコードで複雑なデータ処理パイプラインを作成できるようにするApache Beamの力です。 Hadoopに精通している方は、このパイプラインが以下の点で何かに似ていることにお気づきかと思います。 テキストデータを1行ずつ読み込んで解析し、新しいキー/値ペアを作成する(Map) 次に、これらのキー/値をキーでグループ化する(GroupBy) […]