engineering

6min read

Apache Beam: Tutorial and Beginners' Guide

Maintaining different technologies is always a big challenge for both developers and business. This is especially important in the big data world. There are so many big data technologies like Hadoop, Apache Spark, Apache Flink, etc. that it is easy to get lost. Which tool is the best for real-time streaming? Is the speed of one particular tool enough in our use case? How should you integrate different data sources? If these questions often appear in your company, you may want to consider Apache Beam.

What is Apache Beam? It’s a programming model to define and execute both batch and streaming data processing pipelines. The history of Apache Beam started in 2016 when Google donated the Google Cloud Dataflow SDK and a set of data connectors to access Google Cloud Platform to the Apache Software Foundation. This started the Apache incubator project. It did not take a long time until Apache Beam graduated, becoming a new Top-Level Project in early 2017. Since then, the project has experienced significant growth both in its features and surrounding community.

text

Apache Beam Overview

How does Apache Beam work? First, you need to choose your favorite programming language from a set of provided SDKs. Currently, you can choose Java, Python or Go. Using your chosen language, you can write a pipeline, which specifies where does the data come from, what operations need to be performed, and where should the results be written. Then, you choose a data processing engine in which the pipeline is going to be executed. Beam supports a wide range of data processing engines (using Beam’s terminology: runners), including Google Cloud Dataflow, Apache Flink, Apache Spark, and many others. Of course, you can execute your pipeline locally. This is especially useful in case of testing and debugging.

And why Apache Beam is so useful? Before Apache Beam appeared, there had been no unified API in the big data world. Frameworks like Hadoop, Flink, and Spark provided their own way to define data processing pipelines. Beam lets you write your application once, saving the cost and time.

Here are the types of use cases where Beam can prove its value:

  1. Moving data between different storage media
  2. Transforming data into a more desirable format
  3. Real-time data processing (e.g. detecting anomalies in real-time data)

Concepts

Pipeline

A Pipeline is a definition of your data processing task. All components are defined in the scope of the Pipeline. This is also a place where you provide execution options that tell Beam where and how to run.

PCollection

A PCollection stands for a data set that Beam’s pipeline works on. The data set can be either bounded or unbounded. We say the data set is bounded when it came from a fixed source, e.g. from a file or a database table. The data set is unbounded when new data can arrive at any moment. PCollections are the inputs and outputs for each PTransform.

PTransform

A PTransform is a single data processing operation. A PTransform takes one or more PCollections as input, performs a specified operation on each element in PCollection and returns zero or more PCollections as output.

Beam offers the following build-in basic PTransforms:

  1. ParDo
  2. GroupByKey
  3. CoGroupByKey
  4. Combine
  5. Flatten
  6. Partition

The user is also encouraged to use these transforms as a starting point for writing his or her own transforms.

text

A code example

Let’s now show an example of a pipeline written in Python SDK which reads a text file and calculates the frequency of letters in the text. For simplicity, I limited it to the letters from the English alphabet only. Here is the code:

from __future__ import print_function
from string import ascii_lowercase

import apache_beam as beam


class CalculateFrequency(beam.DoFn):
  def process(self, element, total_characters):
	letter, counts = element
	yield letter, '{:.2%}'.format(counts / float(total_characters))


def run():
  with beam.Pipeline() as p:
	letters = (p | beam.io.ReadFromText('romeojuliet.txt')
             	| beam.FlatMap(lambda line: (ch for ch in line.lower() if ch
                                          	in ascii_lowercase))
             	| beam.Map(lambda x: (x, 1)))

	counts = (letters | beam.CombinePerKey(sum))

	total_characters = (letters | beam.MapTuple(lambda x, y: y)
                            	| beam.CombineGlobally(sum))

	(counts | beam.ParDo(CalculateFrequency(),
                     	beam.pvalue.AsSingleton(total_characters))
        	| beam.Map(lambda x: print(x)))


if __name__ == '__main__':
  run()

Let’s now go through the step-by-step explanation. 1.

letters = (p | beam.io.ReadFromText('romeojuliet.txt')

First, we specify the source of the data. The ReadFromText transform returns a PCollection, which contains all lines from the file.

2.

| beam.FlatMap(lambda line: (ch for ch in line.lower() if ch
                                          	in ascii_lowercase))

This step processes all lines and emits English lowercase letters, each of them as a single element.

3.

| beam.Map(lambda x: (x, 1)))

For each letter return a two-tuple containing the letter and one. Map transform is like FlatMap except its callable returns only one element.

4.

counts = (letters | beam.CombinePerKey(sum))

This step takes all the pairs that have the same key (in our case — the same letter) and calculates the sum of ones. The results are emitted to the counts PCollection.

5.

	total_characters = (letters | beam.MapTuple(lambda x, y: y)
                            	| beam.CombineGlobally(sum))

CombineGlobally takes all elements from the input PCollection and applies sum to them. Because sum, which is a Python built-in, accepts the integers only, we must abandon the first part of the tuple.

6.

	(counts | beam.ParDo(CalculateFrequency(),
                     	beam.pvalue.AsSingleton(total_characters))
        	| beam.Map(lambda x: print(x)))

Now we are able to calculate the frequency. Our transform takes two PCollections — counts and total_characters. For each count record it simply divides count by total_characters. The results are then shown on the screen:

(u'n', '6.19%')
(u'o', '8.20%')
(u'l', '4.58%')
(u'm', '3.29%')
(u'j', '0.27%')
(u'k', '0.81%')
(u'h', '6.60%')
(u'i', '6.42%')
(u'f', '2.00%')
(u'g', '1.77%')
(u'd', '3.74%')
(u'e', '11.89%')
(u'b', '1.66%')
(u'c', '2.05%')
(u'a', '7.78%')
(u'z', '0.03%')
(u'x', '0.13%')
(u'y', '2.50%')
(u'v', '1.01%')
(u'w', '2.47%')
(u't', '9.12%')
(u'u', '3.42%')
(u'r', '6.20%')
(u's', '6.33%')
(u'p', '1.46%')
(u'q', '0.06%')

Summary

We have seen that Apache Beam is a project that aims to unify multiple data processing engines and SDKs around one single model. Many of the features are not yet compatible with all runners, however, Beam is still under active development. One of the most remarkable efforts is the Portability Framework, with a vision of running pipelines on any SDK on any runner.

Additional resources

I recommend reading The World Beyond Batch Streaming 101 and The World Beyond Batch Streaming 102 articles if you’re interested in some data processing challenges that you can face using Beam. If you have any questions about the Apache Beam, do not hesitate to contact us directly or get in touch with the developer communities.

Kamil Wasilewski

Software Engineer

Did you enjoy the read?

If you have any questions, don’t hesitate to ask!