Apache Beam in Action: Same Code, Several Execution Engines

In my previous article, I provided a quick introduction to Apache Beam, a new distributed processing tool that's currently being incubated at the ASF. Apache Beam provides an abstraction layer allowing developers to focus on Beam code, using the Beam programming model. Thanks to Apache Beam, an implementation is agnostic to the runtime technologies being used, meaning you can switch to technologies quickly and easily.

Now that Apache Beam 0.2.0-incubating has just been released, it’s a perfect time to jump into the key features that the technology provides. In this article I’ll show a first pipeline use case, and then will show you how to execute the same pipeline code on different execution engines.

Context: GDELT analyses

We are going to create a pipeline to analyze GDELT project data which monitors the world's broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organizations, counts, themes, sources, emotions, counts, quotes, images and events driving our global society every second of every day, creating a free open platform for computing on the entire world. It creates daily CSV files, containing one line per event.

For example, an event from the CSV file looks like:

Our purpose in using Apache Beam is to extract each event (each line), extract the location code (JPN), and group the events per location. Then, we will be able to simply count the number of events per location. The location of the CSV file should be an option of the pipeline.

To do this, we will need to create a “wrapper” class. It will contain the pipeline options definition, and a main method running the pipeline:

1

2

3

public class EventsByLocation {

 

}

Next, create an inner interface to describe the pipeline options:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

private interface Options extends PipelineOptions {

    String GDELT_EVENTS_URL = "http://data.gdeltproject.org/events/";

 

     @Description("GDELT file date")

    @Default.InstanceFactory(GDELTFileFactory.class)

    String getDate();

    void setDate(String value);

 

    @Description("Input Path")

    String getInput();

    void setInput(String value);

 

    @Description("Output Path")

    String getOutput();

    void setOutput(String value);

 

    class GDELTFileFactory implements DefaultValueFactory<String> {

        public String create(PipelineOptions options) {

            SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");

            return format.format(new Date());

        }

    }

}

Our Options interface simply extends PipelineOptions. We use annotations provided by Apache Beam to describe the option (description of the option, defaut value, …).

Then, we create the main method that will create the actual pipeline and run it:

1

2

3

public static void main(String[] args) throws Exception {

 

}

Finally, we load the pipeline options using our options interface:

1

2

3

4

5

6

7

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

if (options.getInput() == null) {

    options.setInput(Options.GDELT_EVENTS_URL + options.getDate() + ".export.CSV.zip");

}

if (options.getOutput() == null) {

    options.setOutput("/tmp/gdelt-" + options.getDate());

}

We can now create the pipeline using these options:

1

Pipeline pipeline = Pipeline.create(options);

Let’s now create the different steps in our pipeline.

The first step is a source: we read the GDELT CSV file using the TextIO.

1

.apply("GDELTFile", TextIO.Read.from(options.getInput()))

TextIO gives us a PCollection of lines (String) contained in the file.

We now add a second step: it’s a function to parse and split the line to extract the location.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

.apply("ExtractLocation", ParDo.of(new DoFn<String, String>() {

    @ProcessElement

    public void processElement(ProcessContext c) {

        String[] fields = c.element().split("\\t+");

        if (fields.length > 22) {

            if (fields[21].length() > 2) {

                c.output(fields[21].substring(0, 1));

            } else {

                c.output(fields[21]);

            }

        } else {

            c.output("NA");

        }

    }

}))

We now have a PCollection of locations. Before counting, we do a cleanup and filtering of misformed locations:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

.apply("Filtering", Filter.by(new SerializableFunction<String, Boolean>() {

    public Boolean apply(String input) {

        if (input.equals("NA")) {

            return false;

        }

        if (input.startsWith("-")) {

            return false;

        }

        if (input.length() != 2) {

            return false;

        }

        return true;

    }

}))

We can now count the locations, simply counting the same location elements in the PCollection:

1

.apply("CountPerLocation", Count.<String>perElement())

We now have a PCollection of location and count. The next step formats the PCollection as PCollection of String using “location: count” format:

1

2

3

4

5

.apply("StringFormat", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {

    public String apply(KV<String, Long> input) {

        return input.getKey() + ": " + input.getValue();

    }

}))

The final step writes the result in an output file:

1

.apply("Results", TextIO.Write.to(options.getOutput()));

Our pipeline is ready to be run!

1

pipeline.run();

Your class should look like this:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

package org.apache.beam.samples;

 

import org.apache.beam.sdk.Pipeline;

import org.apache.beam.sdk.io.TextIO;

import org.apache.beam.sdk.options.*;

import org.apache.beam.sdk.transforms.*;

import org.apache.beam.sdk.values.KV;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import java.text.SimpleDateFormat;

import java.util.Date;

 

public class EventsByLocation {

 

    private static final Logger LOG = LoggerFactory.getLogger(EventsByLocation.class);

 

    /**

     * Specific pipeline options.

     */

    private interface Options extends PipelineOptions {

        String GDELT_EVENTS_URL = "http://data.gdeltproject.org/events/";

 

        @Description("GDELT file date")

        @Default.InstanceFactory(GDELTFileFactory.class)

        String getDate();

        void setDate(String value);

 

        @Description("Input Path")

        String getInput();

        void setInput(String value);

 

        @Description("Output Path")

        String getOutput();

        void setOutput(String value);

 

        class GDELTFileFactory implements DefaultValueFactory<String> {

            public String create(PipelineOptions options) {

                SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");

                return format.format(new Date());

            }

        }

    }

 

    public static void main(String[] args) throws Exception {

        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        if (options.getInput() == null) {

            options.setInput(Options.GDELT_EVENTS_URL + options.getDate() + ".export.CSV.zip");

        }

        if (options.getOutput() == null) {

            options.setOutput("/tmp/gdelt-" + options.getDate());

        }

        LOG.info(options.toString());

 

        Pipeline pipeline = Pipeline.create(options);

        pipeline

                .apply("GDELTFile", TextIO.Read.from(options.getInput()))

                .apply("ExtractLocation", ParDo.of(new DoFn<String, String>() {

                    @ProcessElement

                    public void processElement(ProcessContext c) {

                        String[] fields = c.element().split("\\t+");

                        if (fields.length > 22) {

                            if (fields[21].length() > 2) {

                                c.output(fields[21].substring(0, 1));

                            } else {

                                c.output(fields[21]);

                            }

                        } else {

                            c.output("NA");

                        }

                    }

                }))

                .apply("Filtering", Filter.by(new SerializableFunction<String, Boolean>() {

                    public Boolean apply(String input) {

                        if (input.equals("NA")) {

                            return false;

                        }

                        if (input.startsWith("-")) {

                            return false;

                        }

                        if (input.length() != 2) {

                            return false;

                        }

                        return true;

                    }

                }))

                .apply("CountPerLocation", Count.<String>perElement())

                .apply("StringFormat", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {

                    public String apply(KV<String, Long> input) {

                        return input.getKey() + ": " + input.getValue();

                    }

                }))

                .apply("Results", TextIO.Write.to(options.getOutput()));

 

        pipeline.run();

    }

 

}

Execution engines abstraction

In order to build and execute our pipeline, we package our class in Apache Maven project.

This Maven project is pretty simple as we need only two dependencies:

●       the Apache Beam Java SDK itself

●       the SLF4J dependency as we use a logger

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

 

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>org.apache.beam.samples</groupId>

    <artifactId>EventsByLocation</artifactId>

    <packaging>jar</packaging>

 

    <dependencies>

        <dependency>

            <groupId>org.apache.beam</groupId>

            <artifactId>beam-sdks-java-core</artifactId>

            <version>0.2.0-incubating</version>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-simple</artifactId>

            <version>1.7.13</version>

        </dependency>

    </dependencies>

 

</project>

Note that we didn’t define anything related to the runner.

Here, we want to show one Beam feature: the same code running on different execution runtimes. In order to do that, we will use Maven profiles. Each profile will define a specific runner dependencies set. Then, we will be able to execute our pipeline (exactly the same code) on a target runner, just using specifying a JVM argument to identify the runner.

Direct runner

Let’s start with the Direct runner. This is the preferred runner to use for test: it uses several threads in the JVM.

It’s pretty easy to use as it just requires a dependency. So, we create a Maven profile with the Direct runner dependency:

1

2

3

4

5

6

7

8

9

10

11

12

13

<profile>

    <id>direct-runner</id>

    <activation>

        <activeByDefault>true</activeByDefault>

    </activation>

    <dependencies>

        <dependency>

            <groupId>org.apache.beam</groupId>

            <artifactId>beam-runners-direct-java</artifactId>

            <version>0.2.0-incubating</version>

        </dependency>

    </dependencies>

</profile>

We can now run our pipeline on this runner. For that, we use our direct-runner profile and use --runner=DirectRunnerJVM argument:

1

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pdirect-runner -Dexec.args="--runner=DirectRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Apache Spark runner

The Spark runner requires more dependencies (due to Apache Spark runtime). So, again, we create a Maven profile to easily define the dependencies. The Apache Spark engine itself as well as the required Spark dependencies.

Webinar >> Accelerating Real-Time Analytics with Spark

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

<profile>

    <id>spark-runner</id>

    <dependencies>

        <dependency>

            <groupId>org.apache.beam</groupId>

            <artifactId>beam-runners-spark</artifactId>

            <version>0.2.0-incubating</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-core_2.10</artifactId>

            <version>1.6.2</version>

        </dependency>

        <dependency>

            <groupId>org.apache.spark</groupId>

            <artifactId>spark-streaming_2.10</artifactId>

            <version>1.6.2</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.core</groupId>

            <artifactId>jackson-core</artifactId>

            <version>2.7.2</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.core</groupId>

            <artifactId>jackson-annotations</artifactId>

            <version>2.7.2</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.core</groupId>

            <artifactId>jackson-databind</artifactId>

            <version>2.7.2</version>

        </dependency>

        <dependency>

            <groupId>com.fasterxml.jackson.module</groupId>

            <artifactId>jackson-module-scala_2.10</artifactId>

            <version>2.7.2</version>

        </dependency>

    </dependencies>

</profile>

Similar to the Direct runner, we can directly use the spark-runner profile and the --runner=SparkRunner JVM argument to execute our pipeline on Apache Spark. Basically, it performs the equivalent of a spark-submit:

1

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pspark-runner -Dexec.args="--runner=SparkRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Apache Flink runner

The Apache Flink runner is easier to use than the Spark runner as it’s packaged as a shaded jar that has embedded all required dependencies (including Flink itself).

So, the flink-runner profile just contains a dependency:

1

2

3

4

5

6

7

8

9

10

<profile>

    <id>flink-runner</id>

    <dependencies>

        <dependency>

            <groupId>org.apache.beam</groupId>

            <artifactId>beam-runners-flink_2.10</artifactId>

            <version>0.2.0-incubating</version>

        </dependency>

    </dependencies>

</profile>

As before, we can run our pipeline on Flink cluster using the flink-runner profile and the --runner=FlinkRunner JVM argument:

1

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pflink-runner -Dexec.args="--runner=FlinkRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Google Dataflow Runner

Finally, we can run our pipeline on Google Cloud Dataflow platform, leveraging features provided by the platform (like dynamic scalability for instance).

The Google Cloud Dataflow runner is easy to use as all of it is packaged in a shaded jar. So the google-cloud-dataflow-runner profile just defines one dependency:

1

2

3

4

5

6

7

8

9

10

<profile>

    <id>google-cloud-dataflow-runner</id>

    <dependencies>

        <dependency>

            <groupId>org.apache.beam</groupId>

            <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>

            <version>0.2.0-incubating</version>

        </dependency>

    </dependencies>

</profile>

As usual, using the google-cloud-dataflow-runner profile and the --runner=DataflowRunner JVM argument, our pipeline will be executed on the Google Cloud Dataflow platform:

1

mvn compile exec:java -Dexec.mainClass=org.apache.beam.samples.EventsByLocation -Pflink-runner -Dexec.args="--runner=DataflowRunner --input=/home/dataset/gdelt/2014-2016/201605*.zip --output=/tmp/gdelt/output/"

Conclusion

In this article, we saw how to execute exactly the same code (no change at all in the pipeline definition) on different execution engines like Apache Spark, Google Dataflow and Apache Flink. Utilizing Apache Beam, we can easily switch from one engine to another simply by changing the profile and runner. In a next article, we will take a deeper look on the Beam IOs: the concepts (sources, sinks, watermark, split, …) and how to use and write a custom IO.

Post by Jean-Baptiste Onofre (@jbonofre)

About Jean-Baptiste

ASF Member, PMC for Apache Karaf, PMC for Apache ServiceMix, PMC for Apache ACE, PMC for Apache Syncope, Committer for Apache ActiveMQ, Committer for Apache Archiva, Committer for Apache Camel, Contributor for Apache Falcon

Related Resources

7 Steps to Faster Analytics Processing with Open Source

Products Mentioned

Talend Big Data

Share

Leave a comment

コメントを追加

More information?