Apache SparkとTalend:パフォーマンスと調整

Apache SparkとTalend:パフォーマンスと調整

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

このブログシリーズでは、すでに2つの記事でTalendとApache Sparkに関して説明してきました。

このブログシリーズのこれまでの記事をまだ読んでいない場合は、最初に第1部「TalendとApache Spark:技術的な手引きと概要」と第2部「Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?」をお読みください。

Apache Sparkに関するシリーズの最初の2回は、Talendとspark-submitの類似点、およびTalendのSparkジョブで使用可能な構成オプションの概要について説明しました。

このブログでは、Apache Sparkのパフォーマンスと調整について説明します。これは、Talendユーザーに限らず、Apache Sparkを使用しているほとんどのユーザーに共通の議論です。最初のSparkジョブを開発して実行するときには、常に次のような疑問が浮かびます。

  • Sparkジョブにはいくつのエクゼキューターを割り当てる必要があるのか?
  • 各エグゼキューターに必要なメモリー量は?
  • コアをいくつ使用する必要があるのか?
  • 一部のSparkジョブは10GB程度のデータを処理するのに何時間もかかるが、この問題をどうやって解決できるか?

このブログでは、これらの質問を取り上げ、答えと洞察を提供します。その前に、このブログで使用されるいくつかの重要な概念を紹介します。

パーティション:パーティションは分散データセットの一部です。デフォルトのHDFSブロックサイズで作成されます。Sparkはパーティションを利用してデータセットを並列処理します。

タスク:タスクは、エクゼキューター内で実行できる作業単位です。

コア:コアは、エクゼキューター内で実行できるSpark内の並列タスクの数を決定する、CPU内の処理単位です。

Sparkエグゼキューター:ワーカーノード上で開始され、メモリーまたはディスク内でジョブのサブミットを実行するプロセスです。

アプリケーションマスター:各YARNアプリケーションは、リソースマネージャーからリソースを要求する責任を持つアプリケーションマスタープロセスをスピンアップします。リソースが割り当てられると、プロセスはノードマネージャーと連携して、ノードマネージャー内で必要なコンテナーを起動します。

Sparkの調整

最初に、Talend内のApache Sparkジョブを調整する方法を検討しましょう。前述のように、Talend Sparkジョブには、[Spark Configuration]タブがあり、ここで調整プロパティを設定できます。Talendでは、これはデフォルトでは常にオフになっています。

このセクションには、アプリケーションのマスターとエグゼキューターが使用するメモリーとコア、およびジョブが要求するエグゼキューターの数を設定するオプションがあります。このセクションで値を指定する際の主な疑問は、「アプリケーションマスターやエグゼキューターがパフォーマンスを向上させるために必要なコア数またはメモリー数をどのように決定するのか」ということです。

Sparkジョブのコア数を選択する方法

この時点では、先に進む前に考慮しなければならない要素がいくつかあります。

  1. データセットのサイズ
  2. ジョブが完了する必要がある時間枠
  3. ジョブが実行している処理とアクション

これらの要素を念頭に置いて、パフォーマンスを最大化するようにジョブを構成し始めることができます。まずアプリケーションマスターの調整から始めましょう。アプリケーションマスターの場合、リソースのオーケストレーションを行うだけで、処理は行わないため、デフォルト値をそのまま使用できます。つまり、メモリーやコアの値を大きくする必要はありません。

次のステップは、エクゼキューター用にメモリーとコアを構成することです。ここでの主な問題は、エクゼキューター、メモリー、コアをいくつ使うべきかということです。その答えを見つけるために、それぞれに32コアと120GBのメモリーを使用する6つのワーカーノードを持つHadoopクラスターがあるとします。おそらく頭に浮かぶ最初の考えは、エグゼキューターごとに持つことができる同時タスクが多いほど、パフォーマンスが向上するということです。これについて調べると、Hadoopディストリビューションのパフォーマンスチューニングガイド(Clouderaの例はこちら)では、1エクゼキューターあたり5コアを超えるとHDFS I/Oが低下することがわかります。したがって、高いパフォーマンスのためのコアの最適値は5です。

次に、いくつのエグゼキューターを起動したいのかを見てみましょう。コアとノードの数に基づいて、この数を簡単に判断できます。前述したように、5コアがエグゼキューターごとに使用するのに最適な数です。さて、ノードあたりの32個のコアのそれぞれから、ノードで実行されているオペレーティングシステムとHadoopデーモンで必要とされているために、ジョブに使用できないものを削除する必要があります。Hadoopクラスター管理ツールはすでにこれを行っているので、ノードあたりのSparkジョブに使用できるコアの数を簡単に判断できます。

この計算を行った後、使用可能なノードあたり30コアが残っているとしましょう。5コアがエグゼキューターあたりの最適な数であるとすでに決定したので、ノードあたり最大6つのエグゼキューターを実行できることを意味します。簡単に特定できましたね!

最後に、使用可能なメモリー量を計算します。上記のハードウェア仕様に基づいて、ノードごとに120GBのメモリーがあることがわかりますが、コアについて説明したときに述べたように、オペレーティングシステムが一部を使用する必要があるため、ジョブ用にそのメモリーをすべて使用できません。ここでも、Hadoopクラスター管理ツールは、メモリーのうちどれだけをジョブに使用できるかを判断できます。オペレーティングシステムとHadoopデーモンに2GBのメモリーが必要な場合、Sparkジョブに使用するために118GBのメモリーが残ります。ノードごとに6つのエクゼキューターを持つことができると決定済みなので、エクゼキューターごとに最大約20GBのメモリーを使用できることになります。ただし、これは100%正しいわけではありません。各エクゼキューターが持つメモリーオーバーヘッドも計算する必要があるためです。前のブログで、オーバーヘッドのデフォルトは384MBであると述べました。これを20GBから差し引くと、1エグゼキューターあたり最大19GBを指定できると言えます。

クラスターリソースの動的割り当て vs. 固定割り当て

上記の数値は、Sparkジョブ内のクラスターリソースの固定または動的割り当てに使用できます。両者の違いは動的割り当てです。動的割り当てでは、使用されるエクゼキューターの最初の数、それほどワークロードがない場合にジョブが使用できる最低限のエクゼキューター、より多くの処理能力が必要な場合の最大数を指定できます。ジョブのためにクラスターのすべてのリソースを使うことができれば素晴らしいですが、その処理能力をクラスター上で実行される他のジョブと共有する必要があります。そのため、Talend Sparkジョブの調整を検討するために先に定義した要因を検討する際に、要件として特定したものに基づいて、最大値の何パーセントを使用可能か決定します。

ジョブを構成したので、次は実際にジョブを実行しましょう。上記で定義した最大設定でもSparkジョブが完了までに時間がかかることがわかった場合は、最大のパフォーマンスを引き出すために、ジョブに戻り、さらにいくつかの設定を確認する必要があります。

Sparkのパフォーマンス

まず、Sparkのジョブで2つのテーブルを結合しましょう。Sparkジョブの最適化を開始する前に検討した要因の1つは、データセットのサイズです。テーブルのサイズを確認し、1つが50GBで、もう1つが100MBであると判断したら、Talendコンポーネントのレプリケート結合を利用しているかどうかを確認する必要があります。

レプリケート結合

大きなテーブルと小さなテーブルを結合して小さなテーブルからすべてのエグゼキューターにデータをブロードキャストする場合、レプリケート結合(マップサイド結合とも呼ばれます)が広く使用されます。この場合、小さいデータセットはメモリーに収まるので、レプリケート結合を使用してすべてのエグゼキューターにブロードキャストし、Sparkジョブのパフォーマンスを最適化できます。

テーブルデータはエクゼキューターレベルでサイドデータと結合される必要があるので、より小さいデータセットをすべてのエクゼキューターにブロードキャストすることによって、より大きなテーブルのデータがネットワークを介して送信されるのを回避できます。Sparkのパフォーマンス上の問題の多くは、ネットワーク上で大量のデータが送信されるために発生します。以下に示すように、tMapコンポーネントの[Use replicated join]オプションを有効にすることで、Talendジョブ内でこれを簡単に確認できます。これにより、ルックアップテーブルのデータがすべてのエグゼキューターにブロードキャストされます。

 

次のステップは、コストのかかる再計算を実行している処理がジョブに含まれていないかを確認することです。

Sparkのキャッシュ

再計算について確認するため、顧客の購入データをファイルにロードする簡単な例で考えてみましょう。このデータから、次の2つのメトリクスを取得します。

  • 合計顧客数
  • 購入商品数

この場合、Sparkキャッシュを使用しないと、上記の各操作でデータがロードされます。これは、コストのかかる再計算を発生させるため、パフォーマンスに影響します。このデータセットをジョブで使用する必要があることがわかっているので、Sparkキャッシュを使用して後で使用するためにメモリーにキャッシュし、再ロードし続けないようにするのが最善です。

Talend Sparkジョブで、これはtCacheIntCacheOutのコンポーネントで行われます。これは、TalendのApache Sparkパレットで利用可能であり、異なるオプションを提供するSparkキャッシングメカニズムを利用できるようにするものです。

また、データをディスクのみにキャッシュするかどうかを選択すると、キャッシュされたデータをメモリー、ディスク、またはその両方についてシリアル化するオプションも表示されます。最後に、キャッシュされたデータを他の2つのノードに複製するように選択することもできます。最も使用されるオプションは、速度で勝るシリアル化なしのメモリーですが、キャッシュされたRDDがメモリーに収まらず、ディスクに書き出したくない場合は、データセットが消費するスペースを減らすためにシリアル化が選択されます。ただし、この場合はパフォーマンスに影響を与える追加のオーバーヘッドが発生します。このため、オプションを検討し、ニーズに最も適したものを選択する必要があります。

 

この後でもパフォーマンスの問題が解決しない場合は、Spark History Webインターフェイスで何が起こっているのかを確認する必要があります。前のブログで述べたように、Talendの[Spark Configuration]の[Spark History]セクションでSparkロギングを有効にすることができます。Sparkロギングは、ジョブが終了した後もログを保存してSpark History Web Interfaceから利用できるようにすることで、Sparkジョブに関する問題のトラブルシューティングに役立ちます。SparkジョブでSparkイベントログを有効にすることはベストプラクティスであり、パフォーマンスの問題をより簡単にトラブルシューティングできます。

 

Sparkイベントログを有効にしたら、Spark History Webインターフェイスに移動します。ジョブのアプリケーション番号を確認するときに、以下のタブが表示されています。

上記のSpark UIで、[Stages]タブで、ジョブのパフォーマンスに影響を与えているものを特定し、詳細に移動して、以下のような動作が見られるかどうかを確認します。

 

10個のエグゼキューターを割り当てた後でも、1つだけがほとんどのデータを処理し、残りはアイドル状態です。では、なぜこれが起こるのでしょうか。それに答えるには、問題が存在するジョブのステージを特定する必要があります。例では、圧縮ファイルからデータを読み込むというSparkジョブの部分でこれが起こっていることがわかります。アーカイブファイルはデフォルトでは読み込み時にパーティション分割されないため、読み込む各アーカイブファイルに対して1つのパーティションを持つRDDが作成されるため、この動作が見られます。その圧縮ファイルがBZIP2のように分割可能なアーカイブ形式であり、読み込み時に分割できる場合は、tFileInputDelimitedの詳細設定でプロパティ「Set Minimum Partitions」を有効にし、次に出発点としてエグゼキューターと同じ数のパーティションを最小値に設定します。

しかし、GZIPのように読み取り時に再分割できないアーカイブファイルの場合は、tPartitionコンポーネントを使用して明示的に再分割できます。以下に示すように、このコンポーネントを使用すると、ファイルを再分割して、エグゼキューター間で負荷を均等に分散できるようになります。

読み取り時のパーティション化は、以下のプロパティにより、tJDBCコンポーネントを使用してデータベースから読み取るときにも使用できます。

上記のように、再分割は特定の状況でのみ適用できます。データセットが結合に使用しているキーに偏っていると判断した場合は、別の方法を使用する必要があります。それでは、データの偏りをどのように特定できるのでしょうか? これは、パーティション別にデータセットを調べ、結合に使用しているキーの間でデータがどのようにグループ化されているかを確認します。以下は、パーティションごとに偏ったデータセットの例です。

この場合、別のキーで再分割できない場合は、Talend Sparkジョブを改善するためのほかの方法を検討する必要があります。広く使用されている1つの手法は「ソルティング」と呼ばれます。ソルティングを使用すると、パーティションごとのデータの分散を均等にするために、実際のキーに偽のキーを追加します。これは、以下の例のようにSparkジョブのtmapコンポーネントを介して実行できます。

上記のように、tmapレベルで偽のキーを数値の乱数として追加し、実際のキーと共にルックアップデータセットに結合し、そこにも偽のキーを追加しました。結合は、実際のキーとディストリビューション用に生成した偽のキーに基づいて行われるため、Sparkでデータセットを結合するときにパフォーマンスに影響を与える可能性があるパーティションの偏りを避けるのに役立ちます。

まとめ

Talend Sparkジョブのパフォーマンスを向上し、調整するために使用できる手法は多数あります。このブログでの説明を参考に、TalendでのSparkジョブの構築を成功させてください。

参考文献:

https://spark.apache.org/docs/latest/tuning.html

https://www.cloudera.com/documentation/enterprise/latest/topics/admin_spark_tuning1.html

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tuning.html

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.4/bk_spark-component-guide/content/ch_tuning-spark.html

ディスカッションに参加

0 Comments

コメントを残す

Your email address will not be published. Required fields are marked *