Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?

Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?

  • Petros Nomikos
    I have 3 years of experience with installation, configuration, and troubleshooting of Big Data platforms such as Cloudera, MapR, and HortonWorks. I joined Talend in 2014, and prior to Talend I held positions as manager of technical support, and project manager for data warehouse implementations. In my current role, I assist companies in understanding how to implement Talend in their Big Data Ecosystem.

前回のブログ、「TalendとApache Spark:技術的な手引きと概要」では、Talend Sparkジョブとspark-submitの対応について説明しました。このブログ記事では、Apache spark-submitとの比較でTalend Sparkの構成を引き続き評価していきます。最初に、Talend Sparkジョブでの[Spark Configuration]タブのオプションをspark-submitに引数として渡す先にマッピングする方法を検討し、それらの使用について説明します。

コマンドの違い

お使いの環境でApache Sparkジョブ(Sparkが正常に機能することを確認するために使用される、Hadoopクラスターでデフォルトとして提供されるApache Sparkサンプルジョブなど)を実行するときは、次のコマンドを使用します。

export HADOOP_CONF_DIR=XXX./bin/spark-submit  --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode client  --executor-memory 5G --num-executors 10 /path/to/examples.jar 1000

上記の2つのコマンドは、spark-submitジョブがクラスター構成ファイルを読み込むディレクトリーを設定します。次に、Sparkサンプルジョブを実行するために、10のエクゼキューターと5Gのメモリーを使用して、クライアントモードによりYARNクラスター上でSparkを実行するspark-submitコマンドを発行します。

次に、同じSparkサンプルジョブがTalendでどのように実行されるのかを見てみましょう。TalendでSparkサンプルジョブ(上記のようなもの)を実行すると、すべてのSpark構成情報が実行タブ内の次のタブに入力されます。

Figure 1

ここでいくつか疑問が生まれます。Talendに入力した情報は、Sparkジョブを実行するために端末に入力した情報にどのように対応するのか? どのくらいの数のエグゼキューターとメモリーを要求したのかを、どうやって知ることができるのか? トラブルシューティングについてはどうか? これらの質問すべてに答えていきます。

まず、このブログで使用されるspark-submitのオプションをいくつか紹介します。Apache Sparkドキュメントによると、これらはspark-submitスクリプトに渡すことができる一般的なオプションです。

--class:これは、Sparkアプリケーションの主なエントリーポイントです。

--master:このオプションでは、SparkマスターをスタンドアロンのSparkとして使用するか、YARN上でSparkを使用するかを指定します。

--deploy-mode:前のブログで述べたように、これは利用可能な2つのYARNモードに移り、Sparkドライバーの展開方法を詳述します。

--conf:このオプションでは、ジョブに使用させる追加のSpark構成(たとえば、spark.executor.userClassPathFirst=true)を渡します。

--application-jar:これは、Apache Sparkが実行するSparkのコンパイル済みコードを配置した場所のパスを指します。

--application-arguments:このオプションでは、Sparkコードに固有の引数を渡します。

では、Talend Sparkジョブ内で上記のオプションがどのように使用されるのかを見てみましょう。実行タブの[Spark Configuration]タブでは、設定可能なさまざまなオプションが論理的に次のカテゴリに分類されています。

  1. クラスターのバージョン
  2. 構成
  3. 認証
  4. 調整
  5. Sparkの履歴

クラスターのバージョン

[Cluster Version]カテゴリーのTalendジョブにある最初のオプションの1つから始めましょう。これは[Spark Mode]オプションです。

このオプションでは、SparkマスターをYARNに接続するのか、スタンドアロンのSparkを使用するのかを指定できます。このオプションは、前述のspark-submitオプションについて説明した「--deploy-mode」と「--master」オプションに対応します。たとえば、Talendで[Spark Mode]に[YARN Client]を選択した場合、これはspark-submitで「--master yarn --deploy-mode client」を指定することに相当します。また、このドロップダウンボックスで[Standalone]モードを選択すると、spark-submitと同様に、TalendからSparkマスターURLの情報を入力するように求められます。これは、spark-submitで「--master spark://127.0.0.1:7077」という引数を渡すことに対応します。

構成

Talendの[Configuration]カテゴリーでは、以下の情報が要求されます。

最初に示される一連のチェックボックスでは、リソースマネージャー、リソースマネージャースケジューラーのアドレス、ジョブ履歴のアドレス、およびステージングディレクトリーに関する情報を入力するように求められます。

spark-submitを使用すると、HADOOP_CONF_DIRを通じてこれらすべての情報がSparkジョブに注入されます。spark-submitスクリプトを実行する前にこれを環境変数として設定するか、/etc/environmentまたは/etc/profileで永続的な環境変数として設定することができます。これらの環境変数はすべて、spark-submitを実行するときにSparkジョブによって提供される環境シェルスクリプトにも設定されます。そのファイルの名前はspark-env.shであり、Sparkホストの「/etc/spark/conf」ディレクトリーの下に配置されています。クラスター内のこの設定ファイルの例は次のとおりです。

次のチェックボックスでは、Hadoopホームディレクトリー(Sparkジョブから必要とされることがあります)を定義するかどうかを尋ねられます。spark-submitジョブでも、この情報は同じ方法で渡されますが、環境変数名はHADOOP_HOMEです。Talend Sparkジョブでのこのチェックボックスの処理は、spark-submitスクリプトの「spark-env.sh」ファイルが実行するものであり、Sparkジョブの実行時にこれらの値を読み込みます。

Talend内のSpark構成で構成カテゴリーを完成させる最後のオプションは、Sparkドライバーのホスト名またはIPアドレスの定義です。これは、Sparkジョブが実行されるシステムが内部IPアドレスおよび外部IPアドレスを使用している場合や、SparkマスターとエグゼキューターがSparkドライバーに接続しようとしたときに問題を引き起こす可能性のあるホスト名解決の問題がある場合に便利です。

デフォルトでは、このオプションが指定されていないと、ローカルホスト名を使用してそのIPアドレスを解決しようとします。前回のブログで述べたように、Talendは現在YARNクライアントモードを使用しているため、Sparkドライバーは常にSparkジョブの起動元のシステム上で実行されます。これをspark-submitで提供されるオプションにマッピングします。これは、「--conf」を使用して指定し、次にキー/値ペア「spark.driver.host=127.0.0.1」を提供します。これで、[Spark Configuration]タブの構成サブメニューの下にあるオプションのマッピングが完了しました。

認証

[Authentication]カテゴリーでは、Hadoopクラスターで使用される認証方法を選択するためのオプションが提供されます。

このカテゴリーにチェックを入れない場合、単純な認証がクラスターによって使用されていると見なされ、そこで指定したユーザー名を使用してHadoopクラスターに接続しようとします。spark-submitの場合、この情報は、サブミットを実行するアプリケーションのSpark構成に入力されます。

次に、[Use Kerberos authentication]オプションをチェックすると、次の情報を追加するように求められます。

最初の2つのフィールドは、リソースマネージャーとジョブ履歴サービスによって使用されるサービスプリンシパル名です。「keytab」を使用するオプションがチェックされていない場合、ジョブが実行されると、実行されるシステムでKerberosチケットのキャッシュを探し、また、ジョブを開始したユーザーに固有のキャッシュで有効なKerberosチケットを探します。

「keytab」オプションをチェックした場合は、使用されるkeytabと、発行先のユーザーのプリンシパル名を指定する必要があります。これにより、ジョブが開始されると、そのkeytabに基づいて、ジョブで使用されるプリンシパル向けにKerberosチケットが生成されます。spark-submitの場合は、Kerberosを認証に使用するようにコードで設定したSpark構成をアプリケーションで渡します。ただし、spark-submitを実行する前に、キータブを使用していない場合は、Kerberosのkinitコマンドを実行してチケットを生成する必要があります。また、キータブを使用している場合は、必要なフラグを指定してkeytabコマンドを実行してチケットを生成するか、またはSparkアプリケーションコード内でkeytabからログインするように指定します。

調整

Talendの[Tuning]カテゴリーに移ります。ここで提供される[Set tuning properties]オプションは、デフォルトではオフになっています。[Set tuning properties]をチェックすると、自動的に以下のオプションが表示されます。

これらのオプションがspark-submitにどのように対応するか見ていきましょう。

ここでの最初のオプション、[Set application master tuning properties]では、YARNアプリケーションマスターが使用する必要があるメモリーの量とコアの数を設定できます。

YARNアプリケーションマスターインスタンスの目的は、リソースマネージャーからリソースのネゴシエーションを行い、その後、ノードマネージャーと通信してリソースの使用率を監視し、コンテナーを実行することです。このオプションを設定しない場合は、デフォルトでYARNアプリケーションマスターに512mと1コアが割り当てられます。Talendの場合、Sparkサブミットのオプションとして渡す方法は、「--conf」オプションを使用してから、「spark.yarn.am.memory=512m,spark.yarn.am.cores=1」というキー/値ペアをそれに渡します。

さらに、エクゼキューターの数、各エクゼキューターのメモリー量、エクゼキューターごとのコア数など、追加設定を指定できます。また、次のオプションでエクゼキューターごとに割り当てることができるオーバーヘッドメモリーの量も設定できます。

デフォルト値はエクゼキューターメモリーあたり1g、エクゼキューターあたり1コア、エクゼキューターメモリーオーバーヘッドは、使用されるエクゼキューターメモリーの10%(最小は384m)となり、2つのエクゼキューターが必要とされます。spark-submitのオプションとしては、2つの異なる実行方法があります。上記の「--executor-memory 5G --num-executors 10」のspark-submitコマンドの例のように使用するか、または「--conf」オプションを使用してそれらを渡してから「spark.executor.instances=2, spark.executor.cores=1, spark.executor.memory=2, spark.yarn.executor.memoryOverhead=384m」というキー/値ペアを使用します。

次のオプションは、YARNリソースの割り当てに関するものです。

ここでのオプションは自動、固定、動的ですが、これらはエクゼキューターの割り当て方法を指します。

Autoのまま変更しない場合、YARNによって割り当てられたデフォルト(2つのエグゼキューター)が使用されるため、エクゼキューターの数を設定するオプションが表示されなくなります。これをFixedに設定すると、ジョブが要求するエクゼキューターの数を設定するオプションが表示されます。最後のオプションはDynamicです。これは、Sparkが提供するメカニズムを使用して、実行時に必要に応じてSparkジョブに割り当てられたエグゼキューターを動的に調整する機能を提供します。これは、実行中のアプリケーションが必要に応じて追加のエグゼキューターを要求し、使用されていないときにYARNに戻すことができることを意味します。このオプションを選択すると、以下の構成が表示されます。

最初にYARNから要求するエクゼキューターの数を選択し、Sparkによって実行されたときのジョブのワークロードに応じて、ジョブが持つことができるエクゼキューターの最小値と最大値を指定できます。spark-submitで[Dynamic]オプションを渡すには、「--conf」オプションを使用してから、キー/値ペア「spark.dynamicAllocation.enabled=true, spark.shuffle.service.enabled=true」を使用します。Sparkのドキュメント(https://spark.apache.org/docs/1.6.1/job-scheduling.html#dynamic-resource-allocation target="_blank")によると、この機能を使用するには、これら2つのプロパティが必要です。

Talendの場合、[Spark Configuration]タブ内の調整カテゴリーでは、次のチェックボックスは[Set Web UI port]です。これを選択すると、ポートを指定できます(デフォルトは4040)。このオプションの目的は、Sparkアプリケーションが実行されているときに、SparkドライバーがWeb UIを起動することです。このWeb UIは、実行中のSparkジョブを監視し、ジョブの実行を検査するために使用できます。このオプションを選択していない場合は、上記のデフォルトポートから開始し、開いているポートが見つかるまでポート番号を増やし続けます。Sparkジョブを実行しているシステムでポート4040が利用できないことを把握していて、開いているポートを見つけようとしているSparkアプリケーションの代わりに特定のポートを使用するよう指定したい場合に、このオプションが使われます。spark-submitでこのオプションを設定する場合は、「--conf」オプションを使用してから、「spark.ui.port=4041」のキー/値ペアを使用します。

次のオプションは[Broadcast Factory]です。ここでは、いくつかの選択肢が提供されます。

では、「Broadcast Factory」は何をするのでしょうか。Sparkアプリケーションでのブロードキャストの役割は、クラスター内のエグゼキューター間で変数をブロードキャストすることです。これによって、1つのノードですべての処理を実行する代わりに、変数を迅速かつ効率的に分散できます。ここでは3つのオプションが提供されます。最初のオプションは自動で、これを選択するとデフォルトが使用されます。2番目と3番目のオプションでは、ブロードキャストファクトリーとしてTorrentまたはHTTPを選択できます。spark-submitでは、「--conf」オプションを使用してこれを渡し、デフォルトを使用したくない場合は、「spark.broadcast.factory=org.apache.spark.broadcast.TorrentBroadcastFactory」のキー/値ペアを使用します。この場合、通常はTorrentが使用されます。

[Tuning]カテゴリーの最後のオプションは、Sparkシリアル化のカスタマイズに関するものです。

Sparkのドキュメント(https://spark.apache.org/docs/latest/tuning.html#data-serialization)にも記載されているように、Sparkでのシリアル化の重要性は、分散環境でパフォーマンスを向上させるためにエグゼキューター間でデータをシリアル化することです。デフォルトでは、このオプションが選択されていない場合、Talendは最も効率的と見なされるKryoシリアル化として使用されるように設定します。Sparkで同一のオプションを使用する場合は、「--conf」オプションを使用して、キー/値ペア「spark.serializer=org.apache.spark.serializer.KryoSerializer」を指定します。このオプションがspark-submitで指定されていない場合はデフォルトのJavaシリアライザーが使用され、Spark SQL Thrift Serverが使用されている場合はデフォルトでKryoが使用されます。

Sparkの履歴

では、最後の[Spark History]カテゴリーに移りましょう。Sparkロギングを有効にすると、次のオプションが示されます。

イベントログを有効にすると、ジョブ履歴ファイルをSpark Historyサーバーで読み込むことができるHDFS内のディレクトリーを指定し、履歴サーバーのアドレスを指定することができます。spark-submitでこれを有効にする場合は、「--conf」オプションに対してキー/値ペア「spark.eventLog.enabled=true,spark.eventLog.dir=hdfs://namenode_host:namenode_port/user/spark/applicationHistory,spark.yarn.historyServer.address=http://spark_history_server:history_port」を渡す必要があります。

追加の構成

[Spark Configuration]タブでさまざまなカテゴリーを設定しましたが、使用できるオプションが3つ残っていることがわかります。その1つは[Spark “scratch” directory]です。このオプションは、アプリケーションの実行中にSparkジョブが開始されるシステムのローカルディスク上で使用されるスクラッチディレクトリーを指定します。spark-submitを使用して、「--conf」を使用してから「spark.local.dir=/tmp」を渡します。何も指定しないと、デフォルトで/tmpディレクトリーが使用されます。

次のオプションは、Sparkのチェックポイントを有効にするために使用されます。これはSparkジョブが失敗した場合に特定の時点から回復する能力を与えます。有効にすると、ローカルファイルシステムまたはHDFSでディレクトリーを指定して、ジョブの進行に合わせて保存できるようになります。spark-submitで有効にする場合は、Sparkドキュメントでも指摘されているように、Sparkコード内で実行する必要があります。Sparkドキュメントに例が記載されています(https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing)。

最後のオプションは詳細プロパティです。このオプションでは、アプリケーションに渡すSparkプロパティをキー/値ペアで追加できます。これはspark-submitを使用する場合と同じことです。その場合は、「--conf」オプションを使用して渡します。

HadoopクラスターとSpark Gatewayノードの1つを詳しく見ると、spark-submitを実行するときに使用されるspark-defaults.confというファイル内で、前述のデフォルト選択の多くがすでに指定されていることがわかります。このファイルは/etc/spark/confの下にあります。ファイルを開くと、ここに記載されているほとんどのプロパティが表示されます。spark-submitでオプションとして渡すことによって、前述したようにオーバーライドすることも可能です。以下に例を示します。

まとめ

Talendには、Sparkアプリケーションの構成に使用できるさまざまなオプションがすべて用意されています。チェックボックスやドロップダウンの選択を使用して、使用するオプションや使用するデフォルトを簡単に指定できます。Talend Sparkジョブで使用できるさまざまな設定を確認し、環境に合わせて構成・最適化するのがいかに簡単か、実際に体験してください。

参考文献

https://spark.apache.org/docs/latest/submitting-applications.html

https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html#checkpointing

https://spark.apache.org/docs/latest/tuning.html#data-serialization

https://spark.apache.org/docs/1.6.1/job-scheduling.html#dynamic-resource-allocation

ディスカッションに参加

0 Comments

コメントを残す

Your email address will not be published. Required fields are marked *