engineering

November 03, 2020   |   5min read

Getting Started with Snowflake and Apache Beam on Google Dataflow

As the title suggests this tutorial is going to show you how to get started in building data processing pipelines on Google Cloud Dataflow using Apache Beam together with Snowflake and the recently built SnowflakeIO connector.

Ok, it sounds sooo cool….

But what is Apache Beam? What is Google Dataflow? What is Snowflake? What is SnowflakeIO?

Apache Beam is an open-source, unified model for defining both batch and streaming data processing pipelines. Behind the scenes, Beam is using one of the supported distributed processing back-ends such as Apache Flink, Apache Spark, or Google Cloud Dataflow. For a more specific introduction to Apache Beam, you can check our previous blog post: Apache Beam: Tutorial and Beginners’ Guide.

Google Cloud Dataflow is a fully managed cloud-based data processing service for both batch and streaming pipelines. Currently, Apache Beam is the most popular way of writing data processing pipelines for Google Dataflow.

Snowflake is a data platform which was built for the cloud and runs on AWS, Azure, or Google Cloud Platform. Snowflake acts as a data warehouse, data lake, database, or as a secure data exchange.

SnowflakeIO is a built-in I/O connector inside Apache Beam which was recently developed by Polidea. There are a few features worth pointing about the Snowflake connector. First, It supports batching as well as streaming data processing pipelines. Second, it’s possible to write pipelines using Java as well as Python thanks to multi-SDK Connector efforts inside Apache Beam. Third, it supports runtime options which allow the creation of reusable Dataflow templates. Finally, it can work with Google Cloud Storage, Azure Blob Storage or S3 from Amazon Web Services. More information about the Snowflake connector can be found in the official Apache Beam documentation under the Snowflake I/O connector section.

Overview

At a high level, we are going to extend the official Apache Beam WordCount tutorial by adding SnowflakeIO. We will make two pipelines to demonstrate reading from and writing to Snowflake. The first pipeline is going to read some books, count words using Apache Beam on Google Dataflow, and finally save those counts into Snowflake as shown in picture 1. The second pipeline is going to read previously saved counts from Snowflake and save those counts into a bucket as shown in picture 2.

The first pipeline is going to read some books, count words using Apache Beam on Google Dataflow, and finally save those counts into Snowflake

The second pipeline is going to read previously saved counts from Snowflake and save those counts into a bucket

How are we going to achieve that?

  1. Setting up Snowflake and Google Cloud Platform
  2. Running WordCount example.

Setting up Snowflake and Google Cloud Platform

Please note that the following steps assume that you’re starting from scratch, so If you already have existing Snowflake and Google Cloud Platform accounts you’re free to use them. Just be aware that you will need the ACCOUNTADMIN role for setting up a storage integration in Snowflake (if you don’t already have one) and the ability to set privileges on GCS buckets (if not already set up) on Google Cloud Platform.

  1. Create a Snowflake account by going to the Snowflake trial website. After this step, you should have access to the console as shown in the screenshot below.

text

  1. Create a new user with a default role and warehouse by executing:
CREATE USER test_user PASSWORD='test_user' default_role=SYSADMIN, default_warehouse=COMPUTE_WH;
  1. Set the default role for the created user to ACCOUNTADMIN by executing
GRANT ROLE ACCOUNTADMIN TO USER test_user;
  1. Create a new Snowflake database by executing
CREATE DATABASE test_database;
  1. Create a Google Cloud Platform account followed by creating a new GCP project and creating GCP bucket. Remember the names of the created project and bucket.
  2. Create a Snowflake storage integration object by executing the below commands. The first command is changing the session role to the ACCOUNTADMIN role which is required to be able to create a storage integration object. After this step, you can set your previous session role.
USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE STORAGE INTEGRATION integration_test
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
note: <BUCKET NAME> replace with your created bucket name
  1. Retrieve the Cloud Storage Service Account by following Step 2: Retrieve the Cloud Storage Service Account for your Snowflake Account
  2. Authorize Snowflake to operate on your bucket by following Step 3. Grant the Service Account Permissions to Access Bucket Objects
  3. Setup gcloud on your computer by following Using the Google Cloud SDK installer

Running the WordCount example.

  1. Clone BeamSnowflake-GettingStarted repository by executing:
git clone https://github.com/PolideaInternal/BeamSnowflake-GettingStarted.git
  1. Go to the project folder by executing:
cd BeamSnowflake-GettingStarted
  1. Run the example by executing the following command with your own variables:
./gradlew run -PmainClass=batching.WordCountExample --args=" \
    --inputFile=gs://apache-beam-samples/shakespeare/* \
    --output=gs://<BUCKET NAME>/counts \
    --serverName=<SNOWFLAKE SERVER NAME> \
    --username=<SNOWFLAKE USERNAME> \
    --password=<SNOWFLAKE PASSWORD> \
    --database=<SNOWFLAKE DATABASE> \
    --schema=<SNOWFLAKE SCHEMA> \
    --table=<SNOWFLAKE TABLE NAME>  \
    --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
    --stagingBucketName=<GCS BUCKET NAME> \
    --runner=DataflowRunner \
    --project=<GCP PROJECT NAME> \
    --gcpTempLocation=<GCS TEMP LOCATION> \
    --region=<GCP REGION> \
    --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"

In my case

  ./gradlew run -PmainClass=batching.WordCountExample --args=" \
      --inputFile=gs://apache-beam-samples/shakespeare/* \
      --output=gs://test-bucket/counts \
      --serverName=xxxxxx.us-central1.gcp.snowflakecomputing.com \
      --username=user_test \
      --password=user_test \
      --database=database_test \
      --schema=public \
      --table=words  \
      --storageIntegrationName=integration_test \
      --stagingBucketName=gs://test-bucket/ \
      --runner=DataflowRunner \
      --project=project-test \
      --gcpTempLocation=gs://test-bucket/temp \
      --region=us-central1 \
      --appName=WordCountExample"
  1. Wait a few minutes.
  2. Go to the Dataflow console to check the submitted jobs:

text

  1. Go to the Snowflake console to check the saved counts by executing :
SELECT * FROM test_database.public.words;

text 7. Go to the created bucket to check created files:

text

That’s it for the tutorial, thanks for sticking around to the end. All of the code is available on GitHub. If you have any questions regarding the development of data processing pipelines or have a project in mind, don’t hesitate to contact us.

Paweł Urbanowicz

Software Engineer

Did you enjoy the read?

If you have any questions, don’t hesitate to ask!