Data refinement in action: Apache Flink® for data transformation into Google BigQuery

Learn how to draw meaningful conclusions from ecommerce data using Apache Flink® as a data pipeline into Google BigQuery

In today's companies, with data assets flowing across different technologies and formats, it's essential to be able to reshape ongoing streams of information before they reach their target destination. Typical use cases include eliminating sensitive information, cleaning up irrelevant data, converting bulky JSON objects into organized structures, or altering the shape of the data to gain new insights.

The broader, and maybe more important use case for such streaming transformation is building a comprehensive 360-degree view on the data which brings a new challenge: due to the abundance of data in those events, untangling them to find meaningful information can be a problem.

Streaming analytics at Crab Mart

Today, we'll explore one such scenario together. Imagine you're a data engineer at Crab Mart — a Brick and Mortar store that has expanded online to sell groceries in the top 10 cities across the US. Your role involves evaluating seasonal promotions and ensuring that products are appropriately priced and timed for sale. To achieve this, you're looking to create a data pipeline that helps you understand the sales patterns by location and time, enabling you to draw meaningful conclusions.

Details about each individual transfer from all the shops go directly into a distributed event store Apache Kafka®. Once the data is there it is used for various different purposes, including those analytical scenarios you're interested in.

Given the substantial volume of raw information, you've opted to sink the data into Google BigQuery — a data warehousing solution for running complex analytical queries and addressing Crab Mart's biggest questions.

Google BigQuery works best when it gets clean and prepared data, that's why your aim is to minimize joins or transformations within Google BigQuery itself. You want to do all the cleaning before data arrives to the warehouse.

This is where the strengths of Apache Flink® and its Flink to BigQuery sink connector come into play. By using Apache Flink you can cleanse and restructure the event data into records that align with BigQuery's preferences for fast queries.

The complete data flow looks like this:

Our goal is to convert this detailed JSON object for each of the purchases:

{ "id": 7100374005816377000, "city": "New York City", "purchase_day": 1607221440, "total_purchases": 36, "purchases": [ { "id": 7100374005814829000, "city": "New York City", "item": "Grapes (1kg)", "price": 5.99, "influencer": null }, { "id": 7100374005814829000, "city": "New York City", "item": "Conditioner (500ml)", "price": 4.99, "influencer": null }, .... ] }

Into distinct product entries stored in a BigQuery table:

Preparing the services

For our simulation we'll be using the following building blocks:

  • Apache Kafka as a storage for arriving sales events,
  • Google BigQuery, where data for analytical queries will ultimately reside,
  • Apache Flink - a stream-processing framework that connects Kafka and BigQuery,
  • Data generator to simulate shopping events.

To demonstrate a smooth and easy integration between the elements above we'll use Aiven - the trusted open source data platform for everyone. If you haven't registered for it yet, start here and receive free credits for running the simulation described in this article.

Once you've registered, head to Aiven's console. This is where we'll create Aiven for Apache Kafka® and Aiven for Apache Flink® services.

Kafka service

Start by creating a new Aiven for Apache Kafka service. Select your preferred cloud and geographical location.

For our purposes, we'll need a topic: CDC-input. You can easily create it in your Apache Kafka cluster by following the instructions in our documentation.

Our next step is to set up the Apache Flink service. This can be done using the Aiven console. The smallest plan available in the chosen region will be enough for the purpose of the tutorial.

Preparing BigQuery

Before moving on, make sure you have a Google Cloud account.

In Google Cloud, we need to do a few things to get ready for data:

First, create a new service account. Give it a name and description, skipping the optional steps:

Once the service account is created, generate the keys to get access to Google Cloud services. We'll need these keys to integrate Apache Flink with BigQuery. There are different types of the keys, but you need the JSON ones:

Download the key and keep it close and secure. We'll come back to using this key in the following step, but while we're at the Google Cloud console, let's do a couple more actions!

To store the data in BigQuery, we'll need a dataset and a table.
For the dataset, choose a name and your preferred location type - I kept it Multi-region:

Within the created dataset add a new table and define its structure. Your BigQuery table should have the following properties:

  • city of type STRING
  • purchase_day of type TIMESTAMP
  • id of type BIGNUMERIC
  • influencer of type STRING
  • item of type STRING
  • price of type NUMERIC

Finally, on the Google Cloud side we need to grant permissions to the service account created above to allow adding data to the table. For this, go to Sharing > Permissions and add a new principal. Enter the name of the service account that you created in one of the previous steps.

For the role, find "BigQuery Data Owner" and select it from the list.

With the Google Cloud side prepared, let's return to the Aiven platform.

Adding an external service to Aiven

To be able to send data from Aiven to BigQuery, we'll need to define an integration between the systems. With the information about your Google Cloud account in hand:

  • Head to the Aiven console,
  • Navigate to Integration endpoints,
  • Select Google Cloud BigQuery.

Provide a suitable endpoint name, your Google Cloud Project ID (if not sure, you can also find it in the body of the certificate), and paste the JSON key from your service account:

After creating the endpoint, we can now use it with our Apache Flink instance. But first, we need some data!

Generating sample data

We'll use a grocery generator that you can clone from the grocery-generator repository.

Install the dependencies by running:

pip install -r requirements.txt

To configure the access to your Apache Kafka service:

  • Rename config-example.py into config.py and then replace the placeholders with your Aiven for Apache Kafka URI.
  • Create a folder called certificates and store these Apache Kafka certificates in it: ca.pem, service.cert and service.key.

All this information can be taken from your Aiven for Apache Kafka service page:

Now it's time to run the scripts and generate the data:

python makeData.py

After a minute or two, details about grocery purchases will start flowing into the Apache Kafka topics. You'll be able to also see the data in the topic by getting messages through Aiven interface:

Now that we have data in our Apache Kafka service, let's shift our focus to Apache Flink.

Navigate to your Aiven for Apache Flink service to create a new data pipeline. We'll need two integrations: one for Apache Kafka to source the data and another for BigQuery to sink the flow of transformed assets.

For Apache Kafka, access the Create service integration tab and select your Aiven for Apache Kafka service:

For BigQuery, go to Create external integration endpoint, and you'll find the previously added BigQuery integration:

Add both integrations to your Apache Flink service.

Next, create a new Apache Flink application. Choose a suitable name and create the application:

Each Apache Flink application consists of three components: source definition, sink definition, and transformation statement.

Our data source is the Apache Kafka topic that we need to map to an Apache Flink table.
For this, you need to do two things:

  • Choose your Aiven for Apache Kafka service from the list of integrated services.
  • Define an SQL query to create a table, specify properties to retrieve from Apache Kafka topic, as well as the name of the topic, the format of the data and from which point to read the data:
CREATE TABLE basket_table ( city STRING, id BIGINT, purchase_day BIGINT, total_purchases INT, purchases ARRAY<ROW< city STRING, id BIGINT, influencer STRING, item STRING, price DECIMAL(10, 2) >> ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'topic' = 'CDC-input', 'value.format' = 'json', 'scan.startup.mode' = 'earliest-offset' )

To make sure everything is set up right and to see an example of data arriving, tap on the Run button. This will return a bunch of values from the Apache Kafka topic as a Flink table:

Include the source table and click Next to move ahead.
Next we need to define the destination for the data processed by Flink, BigQuery. As with Apache Kafka you need to do two actions - select BigQuery from the list of integrated services and write SQL definition for the table:

CREATE TABLE sold_groceries ( city STRING, purchase_day TIMESTAMP(3), id BIGINT, influencer STRING, item STRING, price DECIMAL(10, 2) ) WITH ( 'connector' = 'bigquery', 'project-id'= 'aiven-devrel-demo', 'dataset' = 'e_commerce_analytics_dataset', 'table' = 'sold_groceries' )

Make sure that you correctly define the names of your dataset, table and project-id, as well as the structure of the table.

The final step is to create a statement to transform the input data that comes from a source and send it to the destination. In this part, we use SQL to insert values from basket_table into the BigQuery table sold_groceries. We also use CROSS JOIN UNNEST to generate a row for each item in the purchases array that we have in basket_table:

INSERT INTO sold_groceries SELECT basket_table.city, TO_TIMESTAMP(FROM_UNIXTIME(basket_table.purchase_day)), p.id, p.influencer, p.item, p.price FROM basket_table CROSS JOIN UNNEST(purchases) AS p(city, id, influencer, item, price)

We're almost ready. Time to deploy Flink code and see our data moving to BigQuery. Now click to create a deployment and then choose to create without a savepoint:

Once you create a deployment, go to Flink's UI by clicking on Flink Web UI in the console and entering username and password provided in the Connection Information:

Flink UI is a convenient tool to observe the status of Flink jobs and get trace information about the errors:

Getting the results

Once your job is running, go back to your table in BigQuery to see the data and perform analytical queries that you need!

Conclusions

In this article, we've explored how to use Apache Flink and its Flink to BigQuery sink connector to transform the data and send it into Google BigQuery. In several steps we configured BigQuery and Aiven for Apache Flink services and created a new data pipeline.

Interested to learn more? Check these resources: