Apache Beamを使用したデータ処理ジョブの開発 – ストリーミングパイプライン

Apache Beamを使用したデータ処理ジョブの開発 – ストリーミングパイプライン

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

前回のブログでは、Apache Beamを使用したデータ処理ジョブの開発を取り上げました。今回は、ビッグデータ処理で現在最もニーズが高まっているストリーミングデータの処理を取り上げます。

バッチとストリーミングの一番の違いは、入力データソースの種類です。データセットが(規模が非常に大きいとは言え)限定的であり、処理中に更新されないのであれば、バッチパイプラインを使用するでしょう。この場合の入力には、ファイル、データベーステーブル、オブジェクトストレージ内のオブジェクトなど、どのようなソースでも使用できます。ここで強調したいのは、バッチでは処理中はデータが不変であり、入力レコードの数が一定であると仮定している点です。なぜ、これが重要なのでしょうか。それは、ファイルであっても、常に追加されたり変更されたりしていれば、データストリームが無制限になる可能性があるためです。したがって、ストリーミングのアプローチでデータを扱う必要があります。このように、限定的かつ不変のデータについては、バッチパイプラインを構築する必要があります。

一方で、データセットが限定的でない(絶え間なく送り込まれている)、または可変である場合は、処理がより複雑になります。このようなソースの例としては、メッセージシステム(Apache Kafkaなど)、ディレクトリ内の新しいファイル(Webサーバーのログなど)、リアルタイムのデータを収集しているシステム(IoTセンサーなど)が挙げられます。これらのソースに共通するのは、常に新しいデータを待たなければならない、ということです。確かに、データを(一定の時間またはサイズごとに)分割してバッチとして処理することも可能です。しかし、消費したデータセット全体に関数を適用したり、そのためにパイプライン全体を作成したりするのはかなり難しい作業です。幸い、Apache Spark、Apache Flink、Apache Apex、Google DataFlow などのストリーミングエンジンを使用することで、このようなデータ処理が容易になります。Apache Beamは、これらすべてのエンジンに対応しています。また、コードを変更せずに異なるエンジンで同じパイプラインを実行できます。さらに、最小限の変更で同じパイプラインをバッチモードでもストリーミングモードでも使用できます。入力ソースを適切に設定するだけで、魔法のようにすぐに実行できるのです。少し前の、バッチジョブをストリーミングジョブに書き換えていた時代から考えれば、夢のような話です。

理屈はこのくらいにして、例を使って最初のストリーミングコードを書いてみましょう。以下の例では、Kafka(バインドされていないソース)からデータを読み込み、単純なデータ処理を実行して、結果を再度Kafkaに書き出します。


ここでは、地図上の何らかのオブジェクト(車など)を示す地理座標(XとY)のデータを使用します。データはリアルタイムで到着する無制限のストリームであり、そこから特定区域内のデータのみを選択したいと思います。つまり、Kafkaトピックからテキストデータを消費し、解析し、制限を加えてフィルタリングした後、別のKafkaトピックに書き込む必要があります。この処理をApache Beamでどのように行うのか見ていきましょう。


すべてのKafkaメッセージには、次の形式のテキストデータが含まれます:
id,x,y

それぞれの値の意味は、次のとおりです。
id – uniqオブジェクトの一意のID
x, y - 地図上の座標(整数)

データの形式が有効でない場合にレコードをスキップするように、処理する必要があります。

パイプラインの作成

前回のブログで取り上げたバッチ処理と同様に、パイプラインを作成します。

Pipeline pipeline = Pipeline.create(options);

コマンドラインオプションをパイプラインに渡すためのOptionsオブジェクトは、細かく指定できます。詳細については、Githubで例の全体を参照してください。

次に、Kafla入力トピックからデータを読み込みます。前述のとおり、Apache BeamはさまざまなIOコネクターを提供しており、KafkaIOもその1つです。そこで、指定されたKafkaトピックからの受信メッセージを消費し、次のステップに伝えるバインドされていない新しいPTransformを作成します。

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))


デフォルトでは、KafkaIOはすべての消費対象メッセージをKafkaRecordオブジェクトにカプセル化します。ただし、次の変換では、新しく作成されたDoFnオブジェクトを使用してペイロード(文字列の値)の取得だけを実行します。

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)


このステップが終わったらレコードをフィルタリングしますが(上記の初期タスクを参照)、その前に文字列の値を定義された形式に従って解析します。これにより、Beamの内部変換Filterで使用される単一の機能オブジェクトに値をカプセル化できます。

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)


.次に、KafkaIOを含む様々なIOコネクターで使用できる内部Beam KVクラスを使用して新しい鍵/値のペアを作成し、フィルタリング済みメッセージをKafkaに再び書き込む準備をします。

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)


最後の変換は、Kafkaにメッセージを書き込むために必要です。ここでは、単純にKafkaIO.write()(シンクの実装)を使用します。読み込みについては、Kafkaブートストラップサーバー、出力トピック名、鍵/値のシリアライザーなどの必要なオプションを、この変換の構成に含める必要があります。

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);


最後に、いつものようにパイプラインを実行します。

pipeline.run();

前回のブログに比べて少し複雑に見えるかもしれませんが、お気づきのとおり、パイプラインをストリーミングに対応させるために特別なことは何もしていません。その役割をApache Beamデータモデルの実装が全面的に担うことで、Beamユーザーはバッチ処理とストリーミング処理を簡単に切り替えることができます。

パイプラインの構築と実行

Beam KafkaIOを使用できるようにするために、必要な依存関係を追加しましょう。

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>


次に、jarをビルドしてDirectRunnerで実行し、動作を確認します。

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

必要に応じて、exec.argsオプションにより、パイプラインで使用されているほかの引数を追加します。また、Beamパイプラインの実行前に、Kafkaサーバーが利用可能であり、適切に指定されていることを確認してください。最後に、Mavenコマンドでパイプラインが起動され、手動で止めるまで永続的に実行されます(最大実行時間を任意に指定することも可能です)。これで、データはストリーミングモードで継続的に処理されます。

As usual, all code of this example is published on this github repository.

今回も、この例で使用したコードは、すべてgithubリポジトリーで公開されています。

Happy streaming!

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *