Using Kafka Connect JDBC Source: a PostgreSQL® example
Find out how to use Apache Kafka® Connect to update an old app-to-db design to use up-to-date tech tools without disrupting the original solution.
If we go back in history few years, the typical data pipeline was an app creating events and pushing them to a backend database. Data was then propagated to downstream applications via dedicated ETL flows at regular intervals, usually daily.
In these modern times, Apache Kafka® has become the default data platform. Apps write events to Kafka, which then distributes them in near-real-time to downstream sinks like databases or cloud storages. Apache Kafka® Connect, a framework to stream data into and out of Apache Kafka, represents a further optimisation that makes the ingestion and propagation of events just a matter of config files settings.
What if we're facing an old app-to-database design? How can we bring it up-to-date and include Kafka in the game? Instead of batch exporting to the database at night, we can add Kafka to the existing system. Kafka Connect lets us integrate to an existing system and make use of more modern tech tools, without disrupting the original solution.
One way to do this is to use the Kafka Connect JDBC Connector. This post will walk you through an example of sourcing data from an existing table in PostgreSQL® and populating a Kafka topic with only the changed rows. This is a great approach for many use cases. But when no additional query load to the source system is allowed, you could also make use of change data capture solutions based on tools like Debezium. As we'll see later, Aiven provides Kafka Connect as a managed service for both options. You can start your connectors without the hassle of managing a dedicated cluster.
This blog post provides an example of the Kafka Connect JDBC Source based on a PostgreSQL database. A more detailed explanation of the connector is provided in our documentation, Create a JDBC source connector from PostgreSQL® to Apache Kafka®
In our example, we first create a PostgreSQL database to act as backend data storage for our imaginary application. Then we create a Kafka cluster with Kafka Connect and show how any new or modified row in PostgreSQL appears in a Kafka topic.
Creating the PostgreSQL Source system
We'll create the whole setup using the Aiven Command Line Interface. Follow the instructions in that document to install the avn
command and log in.
Once you've logged in to the Aiven client, we can create a PostgreSQL database with the following avn
command in our terminal:
avn service create pg-football \ -t pg \ --cloud google-europe-west3 \ -p business-4
This command creates a PostgreSQL database (that's what -t pg
does) named pg-football
on region google-europe-west3
. The selected plan driving the amount of resources available and associated billing is business-4
.
The create command returns immediately, Aiven received the request and started creating the instance. We can wait for the database to be ready with the following command:
avn service wait pg-football
The wait
command can be executed against any Aiven instance, and returns only when the service is in RUNNING
mode.
Time to Scout Football Players
Now let's create our playground: we are a football scouting agency, checking players all over the world and our app pushes the relevant data to a PostgreSQL table. Let's login to PostgreSQL from the terminal:
avn service cli pg-football
Our agency doesn't do a great job at scouting, all we are able to capture is the player's name
, nationality
and a flag is_retired
showing their activity status.
We create a simple football_players
table containing the above information together with two control columns:
created_at
keeping the record's creation timemodified_at
for the row's last modification time
These two columns will later be used from the Kafka Connect connector to select the recently changed rows.
Now it's time to create the table from the PostgreSQL client:
CREATE TABLE football_players ( name VARCHAR ( 50 ) PRIMARY KEY, nationality VARCHAR ( 255 ) NOT NULL, is_retired BOOLEAN DEFAULT false, created_at TIMESTAMP NOT NULL DEFAULT NOW(), modified_at TIMESTAMP );
The created_at
field will work as expected immediately, with the DEFAULT NOW()
definition.
The modified_at
on the other side, requires a bit more tuning to be usable. We'll need to create a trigger that inserts the current timestamp in case of updates. The following SQL can be executed from the PostgreSQL client:
CREATE OR REPLACE FUNCTION change_modified_at() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN NEW.modified_at := NOW(); RETURN NEW; END; $$ ; CREATE TRIGGER modified_at_updates BEFORE UPDATE ON football_players FOR EACH ROW EXECUTE PROCEDURE change_modified_at();
The first statement creates the change_modified_at
function that will later be used by the modified_at_updates
trigger.
Football Scouting App at Work
We can now simulate our football scouting app behaviour by manually inserting three rows in the football_players
table from the PostgreSQL client with
INSERT INTO football_players (name, nationality, is_retired) VALUES ('Andrea Pirlo','Italian', true); INSERT INTO football_players (name, nationality, is_retired) VALUES ('Cristiano Ronaldo','Portuguese', false); INSERT INTO football_players (name, nationality, is_retired) VALUES ('Megan Rapinoe','American', true);
We can verify that the created_at
column is successfully populated in PostgreSQL with
SELECT * FROM football_players;
Which will output
name | nationality | is_retired | created_at | modified_at -------------------+-------------+------------+----------------------------+------------- Andrea Pirlo | Italian | t | 2021-03-11 10:35:52.04076 | Cristiano Ronaldo | Portuguese | f | 2021-03-11 10:35:52.060104 | Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | (3 rows)
Perfect, the app is working when inserting new rows. If only we could have an update to an existing row...
Well, this was somehow expected, Juventus FC went out of Champions League and needed new energy in the midfield. We can update the relevant row with
UPDATE football_players SET is_retired=false WHERE name='Andrea Pirlo';
We can check that the modified_at
is correctly working by issuing the same select * from football_players;
statement in the PostgreSQL client and checking the following output
name | nationality | is_retired | created_at | modified_at ------------------+-------------+------------+----------------------------+---------------------------- Cristiano Ronaldo | Portuguese | f | 2021-03-11 10:35:52.060104 | Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286 (3 rows)
Ok, we recreated the original setup: our football scouting app is correctly storing data in the football_players
table. In the old days the extraction of that data was demanded to an ETL flow running overnight and pushing it to the downstream applications. Now, as per our original aim, we want to include Apache Kafka in the game, so... let's do it!
Creating a Kafka environment
As stated initially, our goal is to base our data pipeline on Apache Kafka without having to change the existing setup. We don't have a Kafka environment available right now, but we can easily create one using Aiven's CLI from the terminal with the following avn
command
avn service create kafka-football \ -t kafka \ --cloud google-europe-west3 \ -p business-4 \ -c kafka.auto_create_topics_enable=true \ -c kafka_connect=true
The command creates an Apache Kafka instance (-t kafka
) in google-europe-west3
with the business-4
plan.
Additionally it enables the topic auto-creation (-c kafka.auto_create_topics_enable=true
) so our applications can create topics on the fly without forcing us to create them beforehand.
Finally, it enables Kafka Connect (-c kafka_connect=true
) on the same Kafka instance. We can use the avn wait
command mentioned above to pause until the Kafka cluster is in RUNNING
state.
Note that on Kafka instances part of the startup
plans, you'll be forced to create a standalone Kafka Connect instance. For production systems, we recommend using standalone Kafka Connect for the separation of concerns principle.
Connecting the dots
The basic building blocks are ready: our source system represented by the pg-football
PostgreSQL database with the football_players
table and the kafka-football
Apache Kafka instances are running. It's now time to connect the two: creating a new event in Kafka every time an insert or modified row appears in PostgreSQL. That can be achieved by creating a Kafka Connect JDBC source connector.
Create a JSON configuration file
Start by creating a JSON configuration file like the following:
{ "name": "pg-timestamp-source", "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require", "connection.user": "<PG_USER>", "connection.password": "<PG_PASSWORD>", "table.whitelist": "football_players", "mode": "timestamp", "timestamp.column.name":"modified_at,created_at", "poll.interval.ms": "2000", "topic.prefix": "pg_source_" }
Where the important parameters are:
name
: the name of the Kafka Connect connector, in our casepg-timestamp-source
connection.url
: the connection URL pointing to the PostgreSQL database, in the form ofjdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?<ADDITIONAL_PARAMETERS>
, we can create it with thedbname
,host
,port
output of the followingavn
command
avn service get pg-football --format '{service_uri_params}'
connection.user
andconnection.user
: PostgreSQL credentials, the defaultavnadmin
credentials are available asuser
andpassword
output of theavn
command abovetable.whitelist
: list of tables to source from PostgreSQL, in our case isfootball_players
mode
: Kafka Connect JDBC mode. Three modes are available:bulk
,incrementing
,timestamp
. For this post we'll use thetimestamp
one. For a more detailed description of modes, please refer to the help articletimestamp.column.name
: list of timestamp column names: The value for this setting should bemodified_at,created_at
sincemodified_at
will contain the most recent update timestamp, and in case of null value, we can rely on thecreated_at
column.poll.interval.ms
: time between database pollstopic.prefix
: prefix for topic, the full topic name will be a concatenation oftopic.prefix
and the PostgreSQL table name.
Start the JDBC connector
After storing the above JSON in a file named kafka_jdbc_config.json
, we can now start the Kafka Connect JDBC connector in our terminal with the following command:
avn service connector create kafka-football @kafka_jdbc_config.json
We can verify the status of the Kafka Connect connector with the following avn
command:
avn service connector status kafka-football pg-timestamp-source
Note that the last parameter pg-timestamp-source
in the avn
command above refers to the Kafka Connect connector name defined in the name
setting of the kafka_jdbc_config.json
configuration file. If all settings are correct, the above command will show our healthy connector being in RUNNING
mode
{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }
Check the data in Kafka with kcat
The data should now have landed in Apache Kafka. How can we check it?
We can use kcat a nice command line utility.
Once kcat
is installed, we'll need to set up the connection to our Kafka environment.
Aiven by default enables SSL certificate based authentication. The certificates are available from the Aiven console for manual download. In Aiven CLI you can avoid the clicking with the following avn
command in our terminal:
mkdir -p kafkacerts avn service user-creds-download kafka-football \ -d kafkacerts \ --username avnadmin
These commands create a kafkacerts
folder (if not existing already) and download in it the ca.pem
, service.cert
and service.key
SSL certificates required to connect.
The last missing piece of information that kcat
needs is where to find our Kafka instance in terms of hostname
and port
. This information can be displayed in our terminal with the following avn
command
avn service get kafka-football --format '{service_uri}'
Once we collected the required info we can create a kcat.config
file with the following entries
bootstrap.servers=<KAFKA_SERVICE_URI> security.protocol=ssl ssl.key.location=kafkacerts/service.key ssl.certificate.location=kafkacerts/service.cert ssl.ca.location=kafkacerts/ca.pem
Remember to substitute the <KAFKA_SERVICE_URI>
with the output of the avn service get
command mentioned above.
Now we are ready to read the topic from Kafka by pasting the following command in our terminal:
kcat -F kcat.config -C -t pg_source_football_players
Note that we are using kcat
in consumer mode (flag -C
) reading from the topic pg_source_football_players
which is the concatenation of the topic.prefix
setting in Kafka Connect and the name of our football_players
PostgreSQL table.
As expected, since the connector is working, kcat
will output the three messages present in the Kafka topic matching the three rows in the football_players
PostgreSQL table
{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":false,"created_at":1615458952060,"modified_at":null} {"name":"Megan Rapinoe","nationality":"American","is_retired":true,"created_at":1615458952673,"modified_at":null} {"name":"Andrea Pirlo","nationality":"Italian","is_retired":false,"created_at":1615458952040,"modified_at":1615459189198} % Reached end of topic pg_source_football_players [0] at offset 3
Updating the listings
Now, let's see if our football scouts around the world can fetch some news for us
Wow, we found a new talent named Enzo Gorlami
and Cristiano Rolando
officially retired today from professional football (please be aware this post is not reflecting football reality). Let's push the two news to PostgreSQL:
INSERT INTO football_players (name, nationality, is_retired) VALUES ('Enzo Gorlami','Italian', false); UPDATE football_players SET is_retired=true WHERE name='Cristiano Ronaldo';
We can verify that the data is correctly stored in the database:
defaultdb=> select * from football_players; name | nationality | is_retired | created_at | modified_at -------------------+-------------+------------+----------------------------+---------------------------- Megan Rapinoe | American | t | 2021-03-11 10:35:52.673554 | Andrea Pirlo | Italian | f | 2021-03-11 10:35:52.04076 | 2021-03-11 10:39:49.198286 Enzo Gorlami | Italian | f | 2021-03-11 11:09:49.411885 | Cristiano Ronaldo | Portuguese | t | 2021-03-11 10:35:52.060104 | 2021-03-11 11:11:36.790781 (4 rows)
And in Kafkacat we receive the following two updates:
{"name":"Enzo Gorlami","nationality":"Italian","is_retired":false,"created_at":1615460989411,"modified_at":null} % Reached end of topic pg_source_football_players [0] at offset 4 {"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":true,"created_at":1615458952060,"modified_at":1615461096790} % Reached end of topic pg_source_football_players [0] at offset 5
Further reading
If you want to know more about Aiven, Kafka, Kafka Connect or PostgreSQL, check the references below:
- Kafka Connect JDBC Source PostgreSQL
- An introduction to PostgreSQL
- List of Aiven supported Kafka Connect Connectors
- Kafka Connect Preview
- Aiven Command Line Interface
Wrapping up
This blog post showed how to easily integrate PostgreSQL and Apache Kafka with a fully managed, config-file-driven Kafka Connect JDBC connector. We used a timestamp-based approach to retrieve the changed rows since the previous poll and push them to a Kafka topic increasing the query load to the source database.
An alternative method is represented by Change Data Capture solutions like Debezium, which, in case of PostgreSQL, reads changes directly from WAL files avoiding any additional query load on the source database. A guide on how to setup CDC for Aiven PostgreSQL is provided in Create a Debezium source connector from PostgreSQL® to Apache Kafka®.
--
Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!
In the meantime, make sure you follow our changelog and RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.