engineering

November 05, 2020   |   6min read

Apache Beam: Tutorial and Beginners' Guide

Maintaining different technologies is always a big challenge for both developers and business. This is especially important in the big data world that is constantly expanding. There are so many big data technologies like Hadoop, Apache Spark, Apache Flink, etc. that it is easy to get lost. Which tool is the best for real-time streaming? Is the speed of one particular tool enough in our use case? How should you integrate different data sources? If these are the questions that often appear in your company, you may want to consider Apache Beam.

So what is Apache Beam? It’s a programming model to define and execute both batch and streaming data processing pipelines. The history of Apache Beam started in 2016 when Google donated the Google Cloud Dataflow SDK and a set of data connectors to access Google Cloud Platform to the Apache Software Foundation. This started the Apache incubator project. It did not take long until Apache Beam graduated, becoming a new Top-Level Project in the early 2017. Since then, the project has experienced significant growth both in its features and surrounding community.

In this article you’ll find an overview of the functionalities and features that Apache Beam offers, the glossary of the basic concepts and a Apache Beam code example.

open source engineer

Apache Beam Overview

How does Apache Beam work? First, you need to choose your favorite programming language from a set of provided SDKs. Currently, you can choose Java, Python or Go. Using your chosen language, you can write a pipeline, which specifies where does the data come from, what operations need to be performed, and where should the results be written. Then, you choose a data processing engine in which the pipeline is going to be executed. Beam supports a wide range of data processing engines (using Apache Beam’s terminology: runners), including Google Cloud Dataflow, Apache Flink, Apache Spark and many others. Of course, you can execute your pipeline locally. This is especially useful in case of testing and debugging.

And why is Apache Beam so useful? Before Apache Beam appeared, there had been no unified API in the big data world. Frameworks like Hadoop, Flink and Spark provided their own way to define data processing pipelines. Beam lets you write your application once, saving the cost and time.

Here are the types of use cases where Apache Beam can prove its value:

  • Moving data between different storage media
  • Transforming data into a more desirable format
  • Real-time data processing (e.g. detecting anomalies in real-time data)

Concepts

Pipeline

A Pipeline is a definition of your data processing task. All components are defined in the scope of the Pipeline. This is also a place where you provide execution options that tell Beam where and how to run.

PCollection

A PCollection stands for a data set that Beam’s pipeline works on. The data set can be either bounded or unbounded. We say the data set is bounded when it came from a fixed source, e.g. from a file or a database table. The data set is unbounded when new data can arrive at any moment. PCollections are the inputs and outputs for each PTransform.

PTransform

A PTransform is a single data processing operation. A PTransform takes one or more PCollections as input, performs a specified operation on each element in PCollection and returns zero or more PCollections as output.

Beam offers the following build-in basic PTransforms:

  • ParDo
  • GroupByKey
  • CoGroupByKey
  • Combine
  • Flatten
  • Partition

The user is also encouraged to use these transforms as a starting point for writing his or her own transforms.

software developer

A code example

Let’s now show an example of a pipeline written in Python SDK which reads a text file and calculates the frequency of letters in text. For simplicity, I limited it to the letters from the English alphabet only. Here is the code:

import sys
from string import ascii_lowercase
from typing import Tuple

import apache_beam as beam
from apache_beam.transforms import combiners


class CalculateFrequency(beam.DoFn):
  def process(self, element, total_characters):
    letter, counts = element
    yield letter, counts / total_characters


def run():
  with beam.Pipeline(argv=sys.argv) as p:
    letters = (
        p
        | beam.io.ReadFromText(
            'gs://apache-beam-samples/shakespeare/romeoandjuliet.txt')
        | beam.FlatMap(
            lambda line: (ch for ch in line.lower()
                          if ch in ascii_lowercase)).with_output_types(str))

    total_characters = letters | combiners.Count.Globally()
    counts = (
        letters
        | beam.Map(lambda ch: (ch, 1)).with_output_types(Tuple[str, int])
        | beam.CombinePerKey(sum))

    _ = (
        counts
        | beam.ParDo(
            CalculateFrequency(),
            beam.pvalue.AsSingleton(total_characters)).with_output_types(
                Tuple[str, float])
        | beam.MapTuple(lambda letter, freq: print(f'{letter}: {freq:.2%}')))


if __name__ == '__main__':
  run()

Let’s now go through the step-by-step explanation.

1.

letters = (
    p
    | beam.io.ReadFromText(
        'gs://apache-beam-samples/shakespeare/romeoandjuliet.txt')

First, we specify the source of the data. The ReadFromText transform returns a PCollection, which contains all lines from the file. Apache Beam can read files from the local filesystem, but also from a distributed one. In this example, Apache Beam will read the data from the public Google Cloud Storage bucket.

2.

    | beam.FlatMap(
        lambda line: (ch for ch in line.lower()
                      if ch in ascii_lowercase)).with_output_types(str))

This step processes all lines and emits English lowercase letters, each of them as a single element. You may wonder what with_output_types does. Apache Beam’s Python SDK has its own type annotations system, which enables type checking at pipeline construction time. When combined with the --runtime/_type/_check pipeline option, it also enables type checking at pipeline execution. Our transformations are pretty straightforward, but for more complex ones, typehints could be a real lifesaver if there are some bugs in the code.

3.

total_characters = letters | combiners.Count.Globally()

This is where we create one of two new PCollections from the input PCollection. Because we want to calculate the frequency of letters, for each letter, we need two values—the number of occurrences of the letter and the total number of all letters. This code fragment involves combining. The combining operation combines all elements in the pipeline into one value using a provided combiner. In this case, we simply count all letters.

4.

counts = (
    letters
    | beam.Map(lambda ch: (ch, 1)).with_output_types(Tuple[str, int])

For each letter return a two-tuple containing the letter and one.

5.

    | beam.CombinePerKey(sum))

Take all the pairs that have the same key (in our case—the same letter) and calculate the sum of ones. The results are emitted to the counts PCollection.

6.

_ = (
    counts
    | beam.ParDo(
        CalculateFrequency(),
        beam.pvalue.AsSingleton(total_characters)).with_output_types(
            Tuple[str, float])

Now we are able to calculate the frequency. Our transform takes two PCollections: counts and total_characters. For each count record it simply divides count by total_characters.

7.

    | beam.MapTuple(lambda letter, freq: print(f'{letter}: {freq:.2%}')))

The final step transforms the results into a more user-friendly format. MapTuple is like a standard Map, but it expects tuple inputs. Each tuple is unpacked into multiple arguments.

The results are then shown on the screen:

Results

r: 6.13%
o: 8.33%
m: 3.19%
e: 12.10%
a: 7.82%
n: 6.13%
d: 3.72%
j: 0.29%
u: 3.57%
l: 4.69%
i: 6.54%
t: 9.18%
s: 6.20%
p: 1.47%
c: 2.19%
f: 1.91%
v: 1.05%
y: 2.46%
g: 1.73%
b: 1.61%
k: 0.78%
h: 6.36%
w: 2.36%
z: 0.03%
q: 0.06%
x: 0.12%

Summary

We have seen that Apache Beam is a project that aims to unify multiple data processing engines and SDKs around one single model. Many of the features are not yet compatible with all runners, however, Beam is still under active development. One of the most remarkable efforts is Portability Framework, with a vision of running pipelines on any SDK on any runner.

Additional resources

I recommend reading The World Beyond Batch Streaming 101 and The World Beyond Batch Streaming 102 articles if you’re interested in some data processing challenges that you can face using Apache Beam. If you have any questions about the Apache Beam, do not hesitate to contact us directly or get in touch with the developer communities.

Kamil Wasilewski

Software Engineer

Did you enjoy the read?

If you have any questions, don’t hesitate to ask!