Sparkでのジョブの実行

このチュートリアルでは、Sparkフレームワークを使用してビッグデータバッチジョブを作成し、HDFSからデータを読み込み、ソートしてコンソールに表示する方法を紹介します。

このチュートリアルでは、Talend Data Fabric Studioバージョン6とHadoopクラスターのCloudera CDHバージョン5.4を使用します。また、「クラスター接続メタデータの作成」というタイトルのチュートリアルで作成されたHDFS接続メタデータを再利用します。

1. Sparkフレームワークを使って新しいビッグデータバッチジョブを作成する

ビッグデータ処理のため、Talend StudioではSparkまたはMapReduce上で実行するバッチジョブとストリーミングジョブを作成できます。ここでは、Spark上で実行されるビッグデータバッチジョブを作成します。

  1. [Integration]パースペクティブが選択されていることを確認します。
  2. Hadoopクラスター接続とHDFS接続メタデータがプロジェクトリポジトリに作成されていることを確認します。
  3. リポジトリで、[Job Designs]を展開し、[Big Data Batch]を右クリックして、[Create Big Data Batch Job]をクリックします。
  4. [Name]フィールドにReadHDFS_Sparkと入力します。[Framework]リストでSparkが選択されていることを確認します。[Purpose]フィールドにRead and sort customer dataと入力し、[Description]フィールドにRead and sort customer data stored in HDFS from a Big Data Batch Job running on Sparkと入力し、[Finish]をクリックします。

ジョブは、リポジトリの[Job Designs] > [Big Data Batch]の下に表示され、ジョブデザイナーで開きます。

2. HDFSメタデータ定義を使用して、HDFSへの接続とSparkでの実行を構成する

YARNとは異なり、SparkはHDFS、Amazon S3、Cassandraなどのさまざまなファイルストレージシステムに接続できます。HDFSからデータを読み取るには、まずHDFSへの接続を構成する必要があります。そのためには、リポジトリにあるHDFS接続メタデータを使用します。メタデータはSparkでJobの実行を構成する場合にも役立ちます。

  1. リポジトリで[Metadata] > [HadoopCluster] > [MyHadoopCluster] > [HDFS]をクリックし、[MyHadoopCluster_HDFS]をクリックしてジョブデザイナーにドラッグします。[Components]リストで[tHDFSConfiguration]を選択し、[OK]をクリックします。[Hadoop Configuration Update Confirmation]ウィンドウが開きます。
  2. StudioがSpark構成を更新してクラスターメタデータに対応するようにするには、OKをクリックします。
  3. [Run]ビューで、[Spark Configuration]をクリックして、リポジトリで使用可能なHDFS接続メタデータを使用して実行が構成されていることを確認します。

ジョブは、Spark Standaloneモード、Spark Standaloneモード、またはSpark on YARNモードに構成できます。ローカルモードは、設計段階でジョブをテストするために使用されます。Spark StandaloneまたはSpark on YARNの選択は、クラスターにインストールされているSparkのバージョンによって異なります。このチュートリアルでは、Spark on YARNを使用します。

この構成のためにStudioが使用するHDFS接続メタデータは、クラスターのバージョン、ディストリビューション、およびリソースマネージャーアドレスも構成します。

3. HDFSからのデータ読み取りのためにtFileInputDelimitedコンポーネントを構成する

これで、HDFSに接続でき、ジョブがクラスターで実行されるように構成されました。Sparkでジョブを実行するため、tFileInputDelimitedコンポーネントを使用することで、さまざまなファイルストレージシステムからデータを読み取ることができます。

  1. ジョブデザイナーで[tFileInputDelimited]を追加します。
  2. tFileInputDelimitedコンポーネントの[Component]ビューを開くには、コンポーネントをダブルクリックします。
  3. [Storage]パネルで、tHDFSConfigurationコンポーネントがストレージ構成コンポーネントとして選択されていることを確認します。
  4. スキーマエディターを開くには、[Edit schema]をクリックします。
  5. スキーマに列を追加するには、[+]アイコンを3回クリックして、列名をCustomerIDFirstNameLastNameとして入力します。
  6. [CustomerID]列のタイプを変更するには、[Type]フィールドをクリックし、[Integer]をクリックします。[OK]をクリックしてスキーマを保存します。

    代替方法:リポジトリからのメタデータを使用してスキーマを構成します。詳細な手順は、チュートリアル「メタデータの作成と使用」をご覧ください。
  7. 読み取るファイルの場所を指定するには、[Folder/File]フィールドの横にある[…]をクリックし、user/student/CustomersDataを参照して見つけ、[OK]をクリックします。

tFileInputDelimitedコンポーネントが、HDFSから顧客データを読み取るように構成されました。

AVRO、JSON、XMLなど、ほかのファイルタイプもサポートされており、ファイルを区切る必要はありません。

4. 顧客IDの値に基づいて顧客データを昇順にソートする

  1. [tSortRow]を追加します。
  2. MyHadoopCluster_HDFSという名前のtFileInputDelimitedコンポーネントを、Mainを使用してtSortRowコンポーネントに接続します。
  3. tSortRowコンポーネントの[Component]ビューを開くには、コンポーネントをダブルクリックします。
  4. スキーマを構成するには、[Sync columns]をクリックします。
  5. [Criteria]テーブルに新しい条件を追加するには、[+]アイコンをクリックします。[Schema]列にCustomerIDと入力します。[sort num or alpha?]列で[num]を選択し、[Order asc or desc?]列で[asc]を選択します。

これで、tSortRowコンポーネントが構成されました。

5. tLogRowコンポーネントを使用し、ソートされたデータをコンソールに表示する

  1. tLogRowコンポーネントを追加し、[Main]を使用してtSortRowコンポーネントに接続します。
  2. tLogRowコンポーネントの[Component]ビューを開くには、コンポーネントをダブルクリックします。
  3. [Mode]パネルで[Table]を選択します。

これで、ジョブを実行する準備ができました。ジョブは、HDFSからデータを読み取り、ソートしてコンソールに表示します。

6.ジョブを実行し、コンソールで結果を確認する

ジョブを実行するには、[Run]ビューで[Basic Run]タブを開き、[Run]をクリックします。ジョブデザイナーでは、実行終了時のジョブの割合が100%になります。

ソートされたデータがコンソールに表示されます。