Enabling change data capture from MySQL to Apache Kafka® with Debezium
Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.
Change Data Capture (CDC) is the process of tracking the changes happening in one or more database table in a source system and propagating them to a target technology. The objective of this tutorial is to create a CDC flow from a source table in a MySQL database to Apache Kafka® via the Kafka Connect Debezium Source connector.
Debezium 2.5
This article describes the configuration for Debezium version 2.5 and later.Change Data Capture from MySQL to Apache Kafka via Kafka Connect
In order to setup a change data capture process tracking the changes from a MySQL database to Apache Kafka, we can use Kafka Connect, a framework which integrates Apache Kafka with external systems.
There are two types of Kafka Connect connectors we can use to set up a CDC process between MySQL and Apache Kafka:
- The JDBC Source connector, a query based approach which periodically pulls the latest changes from MySQL into Kafka.
- The Debezium Source connector, which reads from a MySQL binary log (binlog) and streams the changes to Apache Kafka.
While both connectors can move MySQL data into Apache Kafka, the Debezium connector tracks the changes in streaming mode (not in a query based batch mode). This allows us to enhance the extracted data with useful metadata like transaction, IDs and timestamps, and pre-post update details.
The following tutorial guides you through setting up a CDC process using the Debezium connector.
Environment setup
The first step into setting up a CDC process is to create the source MySQL database and the target Apache Kafka cluster with Kafka Connect enabled. We'll use Aiven for MySQL and Aiven for Apache Kafka® to do this.
- Create an Aiven account
- Install the Aiven Command Line Interface (CLI)
- Login into the Aiven CLI using:
avn user login
Create a Aiven for MySQL service named demo-mysql-source
:
avn service create demo-mysql-source \ --service-type mysql \ --plan free-1-5gb \ --cloud aws-eu-west-1
Create an Aiven for Apache Kafka service named demo-kafka
with Kafka Connect and Schema Registry enabled:
avn service create demo-kafka \ --service-type kafka \ --plan business-4 \ --cloud aws-eu-west-1 \ -c kafka_connect=true \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=true
Wait for the services to start with:
avn service wait demo-mysql-source avn service wait demo-kafka
Create and populate the MySQL table
Once the services are up and running, we can create the table in MySQL in which we'll track the changes. To create and populate the table:
Get the Aiven for MySQL connection details:
avn service get demo-mysql-source --format '{service_uri_params}'
Use the mysql
command to connect to the database. Replace the placeholders for <MYSQL_USERNAME>
, <MYSQL_PASSWORD>
, <MYSQL_HOSTNAME>
, <MYSQL_PORT>
and <MYSQL_DATABASE_NAME>
with the values taken from the command above. Alternatively, you can find the pre-filled mysql
command from the Quick Connect button in the Aiven Console service overview.
mysql --user <MYSQL_USERNAME> \ --password=<MYSQL_PASSWORD> \ --host <MYSQL_HOSTNAME> \ --port <MYSQL_PORT> \ <MYSQL_DATABASE_NAME>
Create a table called users
with a numeric id
column, a username
string and populate with three rows for Francesco
, Ana
, and Floor
create table users (id serial primary key, username varchar(100)); insert into users (username) values ('Francesco'),('Ana'),('Floor');
Verify that the above commands are successful by checking the users
table with:
select * from users;
The result should be similar to the below:
+----+-----------+ | id | username | +----+-----------+ | 1 | Francesco | | 2 | Ana | | 3 | Floor | +----+-----------+
Create a Kafka Connect Connector
To create the CDC pipeline, we need to define the settings of the Kafka Connect Debezium Source connector. These settings contain:
- The source MySQL connection parameters, which allow the connector to fetch the data
- The target Apache Kafka connection parameters, which allow the connector to store the versioning of the table structure. This step is optional, but it might be useful to track DDL changes to be able to replicate changes to a target MySQL database.
- The target Apache Kafka schema registry connection parameters, which allow the connector to define schemas and store the data in Apache Avro™ format.
Get the connection parameters
As done in the section above, we can use the avn service get
command to retrieve the Aiven for MySQL parameters:
avn service get demo-mysql-source --format '{service_uri_params}'
We can do the same to retrieve the Apache Kafka Schema Registry parameters needed for the schema integration. You can fetch the connection details with:
avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'
Note
Note: the above command uses jq to parse theavn
command output and retrieve the connection URI which will be in the format https://<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>@<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>
. You'll need to parse the information to fill the Kafka Connect connector configuration file in the following section.To retrieve the Apache Kafka endpoint, which we'll use to track the DDL changes in a topic:
avn service get demo-kafka --format '{service_uri}'
Define the Debezium source connector configuration file
In a file named mysql_source_deb_connector.json
add the following JSON configuration to define the Debezium connector configuration:
{ "name":"<CONNECTOR_NAME>", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<MYSQL_HOST>", "database.port": "<MYSQL_PORT>", "database.user": "<MYSQL_USER>", "database.password": "<MYSQL_PASSWORD>", "database.dbname": "<MYSQL_DATABASE_NAME>", "database.ssl.mode": "<MYSQL_SSL_MODE>", "database.server.id": "<UNIQUE_ID>", "topic.prefix": "<KAFKA_TOPIC_PREFIX>", "table.include.list": "<MYSQL_TABLES>", "tasks.max":"<NR_TASKS>", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "key.converter.basic.auth.credentials.source": "USER_INFO", "key.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "schema.history.internal.kafka.topic": "<HISTORY_TOPIC_NAME>", "schema.history.internal.kafka.bootstrap.servers": "<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>", "schema.history.internal.producer.security.protocol": "SSL", "schema.history.internal.producer.ssl.keystore.type": "PKCS12", "schema.history.internal.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.producer.ssl.keystore.password": "password", "schema.history.internal.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.producer.ssl.truststore.password": "password", "schema.history.internal.producer.ssl.key.password": "password", "schema.history.internal.consumer.security.protocol": "SSL", "schema.history.internal.consumer.ssl.keystore.type": "PKCS12", "schema.history.internal.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.consumer.ssl.keystore.password": "password", "schema.history.internal.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.consumer.ssl.truststore.password": "password", "schema.history.internal.consumer.ssl.key.password": "password", "include.schema.changes": "true" }
We need to replace the following values:
<CONNECTOR_NAME>
is the name we're giving this connector. For this tutorial, set it tomysql_source_deb_connector
.- The MySQL connection details from the previous section:
<MYSQL_USERNAME>
,<MYSQL_PASSWORD>
,<MYSQL_HOSTNAME>
,<MYSQL_PORT>
and<MYSQL_DATABASE_NAME>
and<MYSQL_SSL_MODE>
<UNIQUE_ID>
a number, which must be different for each connector running. For this tutorial, set it to12345
.<KAFKA_TOPIC_PREFIX>
will be used as a prefix for all the Kafka topics that this connector sends events to. For this tutorial, set it toexample
.<MYSQ_TABLES>
is the list of MySQL tables to capture changes from. This parameter is optional. The default is the empty string, which means capture all tables. For this tutorial, set it todefaultdb.users
, the one table we have. (Note that there's also a parameter calledtable.exclude.list
for the opposite effect.)<NR_TASKS>
should be set to 1, since the MySQL connector always uses a single task. This is actually the default value, so thetasks.max
parameter can also be omitted.<HISTORY_TOPIC_NAME>
is the name of the Apache Kafka topic that contains the history of schema changes. For this tutorial, set it toddl_history
.- The Apache Kafka schema registry details from the previous section:
<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>
and<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>
- The Apache Kafka details from the previous section:
<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>
See the Debezium MySQL connector documentation for more infomation.
Start the Debezium source connector
Once the parameters are substituted, we can start the Debezium source connector with:
avn service connector create demo-kafka @mysql_source_deb_connector.json
Since we set the name
parameter to mysql_source_deb_connector
, the above starts a connector with that name.
We can check its status with:
avn service connector status demo-kafka mysql_source_deb_connector
The status should be RUNNING
.
{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }
Check the data in Apache Kafka with kcat
If the connector is up and running, we can check that the data is flowing from MySQL to Apache Kafka with kcat. Get the kcat parameters and download the certificates needed to connect with:
avn service connection-info kcat demo-kafka -u avnadmin -W
The above command will store the three certificates (ca.pem
, service.crt
and service.key
) in the current local folder and return the command to execute.
To verify the data in Apache Kafka we can use the following command, substituting the placeholders for <KAFKA_HOST>
, <KAFKA_PORT>
, <KAFKA_SCHEMA_REGISTRY_USR>
, <KAFKA_SCHEMA_REGISTRY_PWD>
, <KAFKA_HOST>
, and <KAFKA_SCHEMA_REGISTRY_PORT>
.
kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example.defaultdb.users \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>
Note
Note: the above command is consuming (-C
flag) from the topic example.defaultdb.users
(-t
flag) in Apache Avro format (-s
flag). The topic name is determined by the concatenation of the connector parameter topic.prefix
(example
) and the table name (defaultdb.users
).After executing the kcat
command, the data in Apache Kafka should be similar to the following:
{"id": 1}{"before": null, "after": {"Value": {"id": 1, "username": {"string": "Francesco"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "first"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275527}, "transaction": null} {"id": 2}{"before": null, "after": {"Value": {"id": 2, "username": {"string": "Ana"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275530}, "transaction": null} {"id": 3}{"before": null, "after": {"Value": {"id": 3, "username": {"string": "Floor"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275531}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 3
Not only do we get the three values stored in the users
table, but we also get additional useful metadata like when the insert/update/deletion happened (in the ts_ms
parameter) and the position in the binlog file.
If we now add a new value in MySQL users
table with:
insert into users(username) values ('Carlo');
We should see it immediately pop up in the Apache Kafka topic:
{"id": 4}{"before": null, "after": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226538000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:39"}, "file": "binlog.000290", "pos": 420, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "c", "ts_ms": {"long": 1733226538394}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 4
If we now remove the row containing Carlo
with:
delete from users where username='Carlo';
We see the corresponding delete ("op": "d"
) in Apache Kafka:
{"id": 4}{"before": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "after": null, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226572000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:40"}, "file": "binlog.000290", "pos": 725, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "d", "ts_ms": {"long": 1733226572691}, "transaction": null} {"id": 4} % Reached end of topic example.defaultdb.users [0] at offset 6
Altering the schema
Where is the MySQL schema stored? There are two topics:
- one called
ddl_history
(named by thedatabase.history.kafka.topic
parameter) which stores the schema for Debezium's inner workings so that Debezium can resume work if it crashes - the other called
example
(based on thetopic.prefix
parameter) where the connector stores all the DDLs needed to replicate the state in another MySQL database.
We can fetch the latter topic with kcat:
kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>
We get the full list of DDLs needed to replicate the state into a new MySQL database.
{"databaseName": ""}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226274272, "snapshot": {"string": "true"}, "db": "", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275042, "databaseName": {"string": ""}, "schemaName": null, "ddl": {"string": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275044, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275086, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP TABLE IF EXISTS `defaultdb`.`users`"}, "tableChanges": [{"type": "DROP", "id": "\"defaultdb\".\"users\"", "table": null}]} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275092, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275098, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP DATABASE IF EXISTS `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275098, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275107, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE DATABASE `defaultdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275107, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275109, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "USE `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275124, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275197, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE TABLE \"users\" (\n \"id\" bigint unsigned NOT NULL AUTO_INCREMENT,\n \"username\" varchar(100) DEFAULT NULL,\n PRIMARY KEY (\"id\"),\n UNIQUE KEY \"id\" (\"id\")\n)"}, "tableChanges": [{"type": "CREATE", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 6
Warning
The response also contains database truncation statements which might delete other tables present in the same target database.If we now add a column to the source MySQL database with:
alter table users add column country varchar(3);
We get a new entry in the example
topic defining the DDL change we just implemented:
{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226759134, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:41"}, "file": "binlog.000291", "pos": 274, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226759153, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "alter table users add column country varchar(3)"}, "tableChanges": [{"type": "ALTER", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "country", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 3}, "scale": null, "position": 3, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 7
Conclusion
The Debezium connector for Apache Kafka tracks, in real time, all the changes happening in one or more MySQL tables into a series of topics by reading the binlog. The messages in Apache Kafka contain the update data and are also enriched with metadata information like the transaction ids and timestamp. Furthermore, the usage of a dedicated topic to track DDL changes enables additional control over the table structures useful to replicate the database structure when needed.
You can dig more with the following resources: