月別: April 2018

Apache Beamを使用したデータ処理ジョブの開発

このブログ投稿は、Apache Beamに関するシリーズの第1部です。 皆さんはApache Beamについて詳しくご存じでしょうか? あまり知らなくとも、決して恥ずべきことではありません。Apache Software Foundationによって開発され、2016年6月に最初にリリースされた新しいプロジェクトであるApache Beamは、まだデータ処理の世界では比較的なじみの薄いものです。実際のところ、私も最近になって使い込むようになってから、Apache Beamについて学ぶことが楽しくなり、その素晴らしさがわかってきました。 Apache Beamは、バッチとストリーミングのデータ処理ジョブを実装し、さまざまなIOのセットを使用して任意の実行エンジンで実行するための簡単な方法を提供する統一プログラミングモデルです。有望に聞こえますが、まだわかりにくいと感じるでしょうか? Apache Beamに一連のブログ記事を掲載することにしたのはこのためです。今回と次回のブログでは、具体的な例を使用して、Apache Beamを使用したデータ処理ジョブのユースケースを紹介します。 今回のトピックはバッチ処理です。次の例を見てみましょう。自動車販売店での一定期間内の自動車販売を分析したいとします(たとえば、各ブランドの自動車の販売台数など)。これは、データセットが制限され(有限量のデータ)、更新されない(過去に売上が発生した)ことを意味します。この場合のデータ分析にはバッチプロセスを使用できます。 入力データとして、以下の形式の販売車のテキストログがあります。 id,brand_name,model_name,sales_number 例: 1,Toyota,Prius,3 2,Nissan,Sentra,2 3,Ford,Fusion,4 初めてBeamアプリケーションの実装を開始する前に、常に使用される中心的概念について認識しておく必要があります。Beamには、パイプライン、PCollection、PTransformの3つの主な概念があります。 パイプラインは、データ処理タスク全体のワークフローを最初から最後までカプセル化します。 PCollectionは、BeamがPTransform間でデータを転送するために使用する分散データセットの抽象化です。 PTransformは、入力データ(入力PCollection)を操作して出力データ(出力PCollection)を生成するプロセスです。通常、最初と最後のPTransformは、制限されている(バッチ処理)または制限されていない(ストリーミング処理)データを入出力する方法を表します。 簡単にするために、パイプラインはワークフロー全体を表すDAG(有向非巡回グラフ)として、PTransformはノード(データを変換する)として、PCollectionsはこのグラフのエッジとして考えることができます。詳細については、「Beam Programming Guide」を参照してください。 例に戻って、提供されたデータセットを処理する最初のパイプラインを実装してみましょう。 パイプラインの作成 まず、新しいパイプラインを作成します。 Pipeline pipeline = Pipeline.create(); 次に、pipeline.apply()メソッドを使って新しいPTransformを作成しましょう。これはテキストファイルからデータを読み込み、文字列の新しいPCollectionを作成します。これを行うには、Beamですでに実装されているIOの1つであるTextIOを使用します。TextIOはテキストファイルの1行ずつの読み書きを可能にします。さまざまなファイルシステムの使用、ファイルパターンのサポート、ファイルのストリーミングなど、ほかにも多くの機能があります。詳しくは、Apache Beamのドキュメントを参照してください。 apply(TextIO.read().from(“/path/to/input/file”)) このPTransformの出力は、PCollectionの新しいインスタンスであり、集合の各エントリは入力ファイルのテキスト行です。 結果としてブランド別の総販売数を求めたいので、それらをグループ化する必要があります。したがって、次のステップでは、すべての行を解析し、keyがブランド名でvalueが販売数となるキー/値ペアを作成します。前のPTransformからの出力PCollectionが、ここでは入力PCollectionになります。 このステップでは、Beamの内部PTransform(MapElements)を使用して、提供されるSimpleFunctionインターフェイスの実装を使用して、入力項目ごとに新しいキー/値ペアを作成します。 次に、別のBeamの変換であるGroupByKeyを使用してブランド別に販売数をグループ化します。出力結果となるキー/値のPCollectionは、keyがブランド名で、valueがそのブランドの反復可能な売上高の集まりです。 .apply(GroupByKey.<String, Integer>create())   これで、ParDo変換の独自の実装を使用して、ブランド別の自動車販売数を集計する準備が整いました。 パイプラインを完成させるために、別のIO変換を適用し、文字列のPCollectionを取得してテキストファイルに書き込みます。 .apply(TextIO.write().to(“/path/to/output/dir”).withoutSharding()); 最後に、作成したパイプラインを実行します。 pipeline.run(); とても簡単ですね。これが、最小限のコードで複雑なデータ処理パイプラインを作成できるようにするApache Beamの力です。 Hadoopに精通している方は、このパイプラインが以下の点で何かに似ていることにお気づきかと思います。 テキストデータを1行ずつ読み込んで解析し、新しいキー/値ペアを作成する(Map) 次に、これらのキー/値をキーでグループ化する(GroupBy) […]


Apache SparkとTalend:パフォーマンスと調整

このブログシリーズでは、すでに2つの記事でTalendとApache Sparkに関して説明してきました。 このブログシリーズのこれまでの記事をまだ読んでいない場合は、最初に第1部「TalendとApache Spark:技術的な手引きと概要」と第2部「Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?」をお読みください。 Apache Sparkに関するシリーズの最初の2回は、Talendとspark-submitの類似点、およびTalendのSparkジョブで使用可能な構成オプションの概要について説明しました。 このブログでは、Apache Sparkのパフォーマンスと調整について説明します。これは、Talendユーザーに限らず、Apache Sparkを使用しているほとんどのユーザーに共通の議論です。最初のSparkジョブを開発して実行するときには、常に次のような疑問が浮かびます。 Sparkジョブにはいくつのエクゼキューターを割り当てる必要があるのか? 各エグゼキューターに必要なメモリー量は? コアをいくつ使用する必要があるのか? 一部のSparkジョブは10GB程度のデータを処理するのに何時間もかかるが、この問題をどうやって解決できるか? このブログでは、これらの質問を取り上げ、答えと洞察を提供します。その前に、このブログで使用されるいくつかの重要な概念を紹介します。 パーティション:パーティションは分散データセットの一部です。デフォルトのHDFSブロックサイズで作成されます。Sparkはパーティションを利用してデータセットを並列処理します。 タスク:タスクは、エクゼキューター内で実行できる作業単位です。 コア:コアは、エクゼキューター内で実行できるSpark内の並列タスクの数を決定する、CPU内の処理単位です。 Sparkエグゼキューター:ワーカーノード上で開始され、メモリーまたはディスク内でジョブのサブミットを実行するプロセスです。 アプリケーションマスター:各YARNアプリケーションは、リソースマネージャーからリソースを要求する責任を持つアプリケーションマスタープロセスをスピンアップします。リソースが割り当てられると、プロセスはノードマネージャーと連携して、ノードマネージャー内で必要なコンテナーを起動します。 Sparkの調整 最初に、Talend内のApache Sparkジョブを調整する方法を検討しましょう。前述のように、Talend Sparkジョブには、[Spark Configuration]タブがあり、ここで調整プロパティを設定できます。Talendでは、これはデフォルトでは常にオフになっています。 このセクションには、アプリケーションのマスターとエグゼキューターが使用するメモリーとコア、およびジョブが要求するエグゼキューターの数を設定するオプションがあります。このセクションで値を指定する際の主な疑問は、「アプリケーションマスターやエグゼキューターがパフォーマンスを向上させるために必要なコア数またはメモリー数をどのように決定するのか」ということです。 Sparkジョブのコア数を選択する方法 この時点では、先に進む前に考慮しなければならない要素がいくつかあります。 データセットのサイズ ジョブが完了する必要がある時間枠 ジョブが実行している処理とアクション これらの要素を念頭に置いて、パフォーマンスを最大化するようにジョブを構成し始めることができます。まずアプリケーションマスターの調整から始めましょう。アプリケーションマスターの場合、リソースのオーケストレーションを行うだけで、処理は行わないため、デフォルト値をそのまま使用できます。つまり、メモリーやコアの値を大きくする必要はありません。 次のステップは、エクゼキューター用にメモリーとコアを構成することです。ここでの主な問題は、エクゼキューター、メモリー、コアをいくつ使うべきかということです。その答えを見つけるために、それぞれに32コアと120GBのメモリーを使用する6つのワーカーノードを持つHadoopクラスターがあるとします。おそらく頭に浮かぶ最初の考えは、エグゼキューターごとに持つことができる同時タスクが多いほど、パフォーマンスが向上するということです。これについて調べると、Hadoopディストリビューションのパフォーマンスチューニングガイド(Clouderaの例はこちら)では、1エクゼキューターあたり5コアを超えるとHDFS I/Oが低下することがわかります。したがって、高いパフォーマンスのためのコアの最適値は5です。 次に、いくつのエグゼキューターを起動したいのかを見てみましょう。コアとノードの数に基づいて、この数を簡単に判断できます。前述したように、5コアがエグゼキューターごとに使用するのに最適な数です。さて、ノードあたりの32個のコアのそれぞれから、ノードで実行されているオペレーティングシステムとHadoopデーモンで必要とされているために、ジョブに使用できないものを削除する必要があります。Hadoopクラスター管理ツールはすでにこれを行っているので、ノードあたりのSparkジョブに使用できるコアの数を簡単に判断できます。 この計算を行った後、使用可能なノードあたり30コアが残っているとしましょう。5コアがエグゼキューターあたりの最適な数であるとすでに決定したので、ノードあたり最大6つのエグゼキューターを実行できることを意味します。簡単に特定できましたね! 最後に、使用可能なメモリー量を計算します。上記のハードウェア仕様に基づいて、ノードごとに120GBのメモリーがあることがわかりますが、コアについて説明したときに述べたように、オペレーティングシステムが一部を使用する必要があるため、ジョブ用にそのメモリーをすべて使用できません。ここでも、Hadoopクラスター管理ツールは、メモリーのうちどれだけをジョブに使用できるかを判断できます。オペレーティングシステムとHadoopデーモンに2GBのメモリーが必要な場合、Sparkジョブに使用するために118GBのメモリーが残ります。ノードごとに6つのエクゼキューターを持つことができると決定済みなので、エクゼキューターごとに最大約20GBのメモリーを使用できることになります。ただし、これは100%正しいわけではありません。各エクゼキューターが持つメモリーオーバーヘッドも計算する必要があるためです。前のブログで、オーバーヘッドのデフォルトは384MBであると述べました。これを20GBから差し引くと、1エグゼキューターあたり最大19GBを指定できると言えます。 クラスターリソースの動的割り当て vs. 固定割り当て 上記の数値は、Sparkジョブ内のクラスターリソースの固定または動的割り当てに使用できます。両者の違いは動的割り当てです。動的割り当てでは、使用されるエクゼキューターの最初の数、それほどワークロードがない場合にジョブが使用できる最低限のエクゼキューター、より多くの処理能力が必要な場合の最大数を指定できます。ジョブのためにクラスターのすべてのリソースを使うことができれば素晴らしいですが、その処理能力をクラスター上で実行される他のジョブと共有する必要があります。そのため、Talend Sparkジョブの調整を検討するために先に定義した要因を検討する際に、要件として特定したものに基づいて、最大値の何パーセントを使用可能か決定します。 ジョブを構成したので、次は実際にジョブを実行しましょう。上記で定義した最大設定でもSparkジョブが完了までに時間がかかることがわかった場合は、最大のパフォーマンスを引き出すために、ジョブに戻り、さらにいくつかの設定を確認する必要があります。 Sparkのパフォーマンス まず、Sparkのジョブで2つのテーブルを結合しましょう。Sparkジョブの最適化を開始する前に検討した要因の1つは、データセットのサイズです。テーブルのサイズを確認し、1つが50GBで、もう1つが100MBであると判断したら、Talendコンポーネントのレプリケート結合を利用しているかどうかを確認する必要があります。 レプリケート結合 大きなテーブルと小さなテーブルを結合して小さなテーブルからすべてのエグゼキューターにデータをブロードキャストする場合、レプリケート結合(マップサイド結合とも呼ばれます)が広く使用されます。この場合、小さいデータセットはメモリーに収まるので、レプリケート結合を使用してすべてのエグゼキューターにブロードキャストし、Sparkジョブのパフォーマンスを最適化できます。 テーブルデータはエクゼキューターレベルでサイドデータと結合される必要があるので、より小さいデータセットをすべてのエクゼキューターにブロードキャストすることによって、より大きなテーブルのデータがネットワークを介して送信されるのを回避できます。Sparkのパフォーマンス上の問題の多くは、ネットワーク上で大量のデータが送信されるために発生します。以下に示すように、tMapコンポーネントの[Use replicated join]オプションを有効にすることで、Talendジョブ内でこれを簡単に確認できます。これにより、ルックアップテーブルのデータがすべてのエグゼキューターにブロードキャストされます。   次のステップは、コストのかかる再計算を実行している処理がジョブに含まれていないかを確認することです。 Sparkのキャッシュ […]