A First for Apache Beam

At Talend, we like to be first. Back in 2014, we made a bet on Apache Spark for our Talend Data Fabric platform which paid off beyond our expectations. Since then, most of our competitors tried to catch-up…

Last year we announced that we were joining efforts with Google, Paypal, DataTorrent, dataArtisans and Cloudera to work on Apache Beam which since has become an Apache Top Level Project.

On January 23, 2017, we released Winter 17’, our latest Integration Platform, which included Talend Data Preparation on Big Data. In this blog, I’d like to drill a little bit more into the technology and architecture behind it, as well as how we are leveraging Apache Beam for scale and runtime agility.

1) Architecture

   a) Overview

Figure1 below represents a high-level architecture of Talend Data Preparation Big Data with both the application layer and the backend server side.

You’ll notice the Beam JobServer part and more specifically the Beam Compiler (that allows the generation of an Apache Beam pipeline out of the JSON document) as well as the Beam runners where we specify the set of properties for Apache Beam runner target (Spark, Flink, Apex or Google DataFlow).

Note that in our Winter 17’ version, the only Apache Beam runner we support for the full run is Spark.

 

Figure 1. Talend Data Preparation with Apache Beam runtime

 

b) Workflow

Figure 2. From preparation DSL to Apache Beam pipeline

 

The Beam Compiler is invoked to transform the DSL into an optimized Beam Pipeline where the source, sink, and various actions are defined.

 

 

 

 

 

 

 

 

 

2) Details: What Gets Generated

       a) Preparation DSL: JSON Document

Figure 3. Build your Data Preparation Recipe

As you apply your cleaning and enrichment steps, Talend Data Preparation generates a recipe which then gets transformed into a JSON document.

In the JSON example below, the input is a .csv file stored into HDFS. The file contains only two string columns

We applied the “uppercase” function on the 1st column

 

 

 

 

 

 

{
   "input": {
      "dataset": {
"format": "CSV",
"path": "/tmp/input",
"fieldDelimiter": ";",
"recordDelimiter": "\n",
"type": "HdfsDataset",
"@definitionName": "HdfsDataset"
      },
      "datastore": {
"@definitionName": "HdfsDatastore",
"username": "testuser",
"type": "HdfsDatastore"
      },
      "properties": {
"type": "HdfsInput"
      }
   },
   "preparation": {
      "name": "sample_prep",
      "rowMetadata": {
"columns": [
   {
       "id": "0000",
       "name": "a1",
       "type": "string"
   },
   {
      "id": "0001",
      "name": "a2",
      "type": "string"
   }
]
      },
      "actions": [
{
       "action": "uppercase",
       "parameters": {
       "column_id": "0000",
       "scope": "column",
       "column_name": "a1"
       }
}
        ]
},
   "output": {
      "dataset": {
"format": "CSV",
"path": "/tmp/output",
"fieldDelimiter": ";",
"recordDelimiter": "\n",
"@definitionName": "HdfsDataset",
"type": "HdfsDataset"
      },
      "datastore": {
"@definitionName": "HdfsDatastore",
"username": "testuser",
"type": "HdfsDatastore"
      },
      "properties": {
"type": "HdfsOutput"
      }
   },
    "authentication": {
      "principal": "USER@REALM.COM",
      "realm": "REALM.COM",
      "useKeytab": true,
      "keytabPath": "/keytabs/mykeytab.keytab",
      "kinitPassword": "nothing"
  }
}

b) The Beam Compiler

Below is a snapshot of the code that creates the Apache Beam pipeline based on the various Talend components

public class RuntimeFlowBeamCompiler {
public Pipeline compile(RuntimeFlowBeamCompilerContext bcc) {
    RuntimeFlow runtimeFlow = bcc.getRuntimeFlow();
    // Start to create Beam pipeline from RuntimeFlow
    PipelineSpec<RuntimeComponent, RuntimeLink, RuntimePort> pipelineSpec =
bcc.getPipelineSpec();
    ...
    // Create Beam pipeline to build the job into.
    Pipeline pipeline = Pipeline.create(bcc.getBeamPipelineOptions());
   ...
   RuntimeFlowBeamJobContext ctx = new RuntimeFlowBeamJobContext(pipelineSpec, pipeline,
...);
   // Compile the components in topological order.
   Iterator<RuntimeComponent> components = pipelineSpec.topologicalSort().toIterator();
   components.forEachRemaining(component -> compileComponent(ctx, component));
   // Return the resulting Beam pipeline
   return ctx.getPipeline();
}
}

 c) The Beam Jobserver

Below is a snapshot of the code that validates and runs the actual Apache Beam pipeline:

public class BeamJobController {

   private RuntimeFlowBeamCompilerContext pipelineSpecContext = null;
 
   public JobValidation validate(Config config) {

Config dslConfig = config.getConfig("job");

String dsl = dslConfig.root().render(renderOpts);

// Create the compiler context

pipelineSpecContext = new RuntimeFlowBeamCompilerContext(dsl);

// Validate before execution

try {
   pipelineSpecContext.validate();
   return JobValid;
} catch(...) {
   return SparkJobInvalid(e.getCause().toString());
}
   }
   public void runJob(Config config) {
RuntimeFlowBeamCompiler bc = ...
// pre compilation
RuntimeFlowBeamCompilerContext optimizedPipelineSpecContext =
bc.preCompile(pipelineSpecContext);
// compilation
Pipeline compiledBeamPipeline = bc.compile(optimizedPipelineSpecContext);
// post compilation
Pipeline optimizedCompiledBeamPipeline = bc.postCompile(
optimizedPipelineSpecContext, compiledBeamPipeline
);
// run Beam Pipeline
try {
   optimizedCompiledBeamPipeline.run().waitUntilFinish();
} catch {
   ...
}
   }
}

Talend Data Preparation is the first Talend Big Data application that leverages the portability and richness of Apache Beam. As we move forward, Apache Beam footprint will continue to be a growing part of Talend’s technology strategy and the backend part (as presented in this blog) will be reused by other applications in both Batch and Streaming contexts where the essence of Apache Beam and its runners will be used to their full extend. Stay tuned for more information!

Share

Leave a comment

コメントを追加

More information?