As part of a POC of Talend v6.1 Big Data capabilities, I was asked by one of our long-time customers, a major e-commerce company, to present a solution for aggregating huge files of clickstream data on Hadoop.
The input data was a giant clickstream file (larger than 100GB, or even terabytes) from a website. Our goal was to aggregate the file and add a session ID column. We needed to create session records for all users (user IDs) and for all their clicks that were made within 30 minutes of their previous clicks on the site.
If the user had not been active in the last 30 minutes, you would create a new session ID. Otherwise, you’d use the same session ID.
The company had already implemented a solution for this problem using a combination of three different languages with an early version of Talend Studio: Java, Hive, and Python UDF.
With the coming of Talend v6 the company was interested in testing the new Talend Big Data capabilities.
The goals were to:
- Build the same logic to achieve the same results
- Reduce the multi-language complexity (no UDFs)
- Make the job graphical and as easy to read as possible (significantly reduce the code, as the previous solution was very code-heavy)
- Improve or retain the current performance
To solve this use case, I decided to create a Talend Apache Spark batch job. Talend provides a graphical interface that generates code, in this case, based on the Spark Java API. How did we go about solving this clickstream use case with Talend and Spark? This case leads us to tackle Spark shuffle, partitioning, and Spark secondary sort.
While working on this Spark Job, I needed a Spark expert to review the way I was building it. I turned to a friend and Big Data expert, Gilad Moscovitch. He wasn’t familiar with Talend, so he wanted to write his own Spark code and then compare it to my Talend job. Gilad started designing the solution with Spark. After each step, he’d ask me if Talend supported the method he was using.
1. Partition the data into userID partitions
After having all of a user’s clicks in the same partition, you can create the user sessions in parallel, with no additional shuffling.
a. The Spark method: use partitionBy(new HashPartitioner(100)) on a key-value RDD
b. The Talend method: use the “tPartition” component, which is the same as a Spark “partitionBy.”
2. Sort each partition by user ID and event time (secondary sort)
This step is essential for understanding where a session starts and ends. You go through each user’s clicks until you find one made at least 30 minutes after the previous one.
At this point, an interesting question came up for us: How can we keep the data partitioned and sorted?
That’s a challenge. When we sort the entire data set, we shuffle in order to get sorted RDDs and create new partitions, which are different than the partitions we got from Step 1. And what if we do the opposite?
Sort first by creation time and then partition the data? We’ll encounter the same problem. The re-partitioning will cause a shuffle and we’ll lose the sort. How can we avoid that?
Partition→sort = losing the original partitioning
Sort→partition = losing the original sort
There’s a solution for that in Spark. In order to partition and sort in Spark, you can use repartitionAndSortWithinPartitions.
The secondary sort is based on a composite key of the two columns and of a custom partitioner that is only partitioned by the first column. We get all the values of each real key sitting within the same partition. Now we get Spark to sort all the partition elements by the combined key. With the method that does both partitioning and sorting, along with a custom partitioner (repartitionAndSortWithinPartitions(partitioner), we no longer need Step 1.
The last thing we need to do is supply a custom comparator for the combined key, because we can’t expect Spark to know how to sort the combination of number and date (the tuple is kind of a new type).
So how do we set the comparator? We create a new class as our combined key, along with an implementation of a custom comparator.
Gilad thought there was no chance Talend would support all of this complicated functionality, and I was a bit skeptical, too.
But after some research and assistance from the Talend R&D team, we found the solution. As mentioned earlier, Talend has a tPartition component that supports sorting alongside the partitioning transformation. This is using the exact same method that we would have used in Spark (repartitionAndSortWithinPartitions).
In this screenshot, you can see that I chose a combined partition key and set a custom partitioner. In this case, in Talend Studio, it is easier than just writing Spark because we don’t need to create a dedicated class with a comparator for the secondary sort. Talend Studio understands the type of combined key. We still need to create a custom partitioner (view the video at the end of this blog for details on that).
3. Create the sessions
For each partition, in parallel, execute a code segment for all clicks and create a session field.
In Spark, it’s a simple fold call with inner logic. It’s the same in Studio, which offers a tJavaRow for custom Java code that runs after the partitioning and sorting.
In order to know if this job scales, I created a test file with 900M records (31.3 GB). I ran this job a few times with different Spark configurations. I tried different combinations of - number of executors, cores per executor, memory per executor and number of partitions on tPartition.
I notice the job scales linearly. Running on 10 executors with 10GB ram each, the job ran for 9.3 minutes. When I scaled it up to 20 executors with the same configuration, it took only 4.9 minutes. This linear trend gives us a good indication that this job could also handle larger input given a bigger cluster with more resources.
To summarize, Spark is a great platform for this use case. It can perform distributed aggregation on large amounts of clickstream data and create aggregated session keys that require sorting.
Talend-Spark integration supports all that’s required, plus it lets you enjoy the benefits of a mature, rich ETL tool.
This post is based on an article I coauthored with Gilad Moscovitch, a senior consultant at UXC Professional Solutions in Melbourne, Australia, and a great source of help on all Big Data issues. I also want to thank Ryan Skraba from the Talend R&D team. He helped me put the final touches on this job. For more details, see the original blog post, Spark ClickStream with Talend, and this YouTube video I created on how to implement this use case with Spark and Talend.