Enabling change data capture from MySQL to Apache Kafka® with Debezium

Implement a change data capture workflow in Apache Kafka®, a key component of any organization with high data integrity requirements.

Change Data Capture (CDC) is the process of tracking the changes happening in one or more database table in a source system and propagating them to a target technology. The objective of this tutorial is to create a CDC flow from a source table in a MySQL database to Apache Kafka® via the Kafka Connect Debezium Source connector.

Debezium 2.5

This article describes the configuration for Debezium version 2.5 and later.

Change Data Capture from MySQL to Apache Kafka via Kafka Connect

In order to setup a change data capture process tracking the changes from a MySQL database to Apache Kafka, we can use Kafka Connect, a framework which integrates Apache Kafka with external systems.

There are two types of Kafka Connect connectors we can use to set up a CDC process between MySQL and Apache Kafka:

  • The JDBC Source connector, a query based approach which periodically pulls the latest changes from MySQL into Kafka.
  • The Debezium Source connector, which reads from a MySQL binary log (binlog) and streams the changes to Apache Kafka.

While both connectors can move MySQL data into Apache Kafka, the Debezium connector tracks the changes in streaming mode (not in a query based batch mode). This allows us to enhance the extracted data with useful metadata like transaction, IDs and timestamps, and pre-post update details.

The following tutorial guides you through setting up a CDC process using the Debezium connector.

Environment setup

The first step into setting up a CDC process is to create the source MySQL database and the target Apache Kafka cluster with Kafka Connect enabled. We'll use Aiven for MySQL and Aiven for Apache Kafka® to do this.

  1. Create an Aiven account
  2. Install the Aiven Command Line Interface (CLI)
  3. Login into the Aiven CLI using:
avn user login

Create a Aiven for MySQL service named demo-mysql-source:

avn service create demo-mysql-source \ --service-type mysql \ --plan free-1-5gb \ --cloud aws-eu-west-1

Create an Aiven for Apache Kafka service named demo-kafka with Kafka Connect and Schema Registry enabled:

avn service create demo-kafka \ --service-type kafka \ --plan business-4 \ --cloud aws-eu-west-1 \ -c kafka_connect=true \ -c schema_registry=true \ -c kafka.auto_create_topics_enable=true

Wait for the services to start with:

avn service wait demo-mysql-source avn service wait demo-kafka

Create and populate the MySQL table

Once the services are up and running, we can create the table in MySQL in which we'll track the changes. To create and populate the table:

Get the Aiven for MySQL connection details:

avn service get demo-mysql-source --format '{service_uri_params}'

Use the mysql command to connect to the database. Replace the placeholders for <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> with the values taken from the command above. Alternatively, you can find the pre-filled mysql command from the Quick Connect button in the Aiven Console service overview.

mysql --user <MYSQL_USERNAME> \ --password=<MYSQL_PASSWORD> \ --host <MYSQL_HOSTNAME> \ --port <MYSQL_PORT> \ <MYSQL_DATABASE_NAME>

Note

You can install the mysql command line tool using the MySQL installation guide.

Create a table called users with a numeric id column, a username string and populate with three rows for Francesco, Ana, and Floor

create table users (id serial primary key, username varchar(100)); insert into users (username) values ('Francesco'),('Ana'),('Floor');

Verify that the above commands are successful by checking the users table with:

select * from users;

The result should be similar to the below:

+----+-----------+ | id | username | +----+-----------+ | 1 | Francesco | | 2 | Ana | | 3 | Floor | +----+-----------+

Create a Kafka Connect Connector

To create the CDC pipeline, we need to define the settings of the Kafka Connect Debezium Source connector. These settings contain:

  • The source MySQL connection parameters, which allow the connector to fetch the data
  • The target Apache Kafka connection parameters, which allow the connector to store the versioning of the table structure. This step is optional, but it might be useful to track DDL changes to be able to replicate changes to a target MySQL database.
  • The target Apache Kafka schema registry connection parameters, which allow the connector to define schemas and store the data in Apache Avro™ format.

Get the connection parameters

As done in the section above, we can use the avn service get command to retrieve the Aiven for MySQL parameters:

avn service get demo-mysql-source --format '{service_uri_params}'

We can do the same to retrieve the Apache Kafka Schema Registry parameters needed for the schema integration. You can fetch the connection details with:

avn service get demo-kafka --json | jq '.connection_info.schema_registry_uri'

Note

Note: the above command uses jq to parse the avn command output and retrieve the connection URI which will be in the format https://<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>@<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>. You'll need to parse the information to fill the Kafka Connect connector configuration file in the following section.

To retrieve the Apache Kafka endpoint, which we'll use to track the DDL changes in a topic:

avn service get demo-kafka --format '{service_uri}'

Define the Debezium source connector configuration file

In a file named mysql_source_deb_connector.json add the following JSON configuration to define the Debezium connector configuration:

{ "name":"<CONNECTOR_NAME>", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "<MYSQL_HOST>", "database.port": "<MYSQL_PORT>", "database.user": "<MYSQL_USER>", "database.password": "<MYSQL_PASSWORD>", "database.dbname": "<MYSQL_DATABASE_NAME>", "database.ssl.mode": "<MYSQL_SSL_MODE>", "database.server.id": "<UNIQUE_ID>", "topic.prefix": "<KAFKA_TOPIC_PREFIX>", "table.include.list": "<MYSQL_TABLES>", "tasks.max":"<NR_TASKS>", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "key.converter.basic.auth.credentials.source": "USER_INFO", "key.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "https://<APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT>", "value.converter.basic.auth.credentials.source": "USER_INFO", "value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>", "schema.history.internal.kafka.topic": "<HISTORY_TOPIC_NAME>", "schema.history.internal.kafka.bootstrap.servers": "<APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>", "schema.history.internal.producer.security.protocol": "SSL", "schema.history.internal.producer.ssl.keystore.type": "PKCS12", "schema.history.internal.producer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.producer.ssl.keystore.password": "password", "schema.history.internal.producer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.producer.ssl.truststore.password": "password", "schema.history.internal.producer.ssl.key.password": "password", "schema.history.internal.consumer.security.protocol": "SSL", "schema.history.internal.consumer.ssl.keystore.type": "PKCS12", "schema.history.internal.consumer.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12", "schema.history.internal.consumer.ssl.keystore.password": "password", "schema.history.internal.consumer.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks", "schema.history.internal.consumer.ssl.truststore.password": "password", "schema.history.internal.consumer.ssl.key.password": "password", "include.schema.changes": "true" }

We need to replace the following values:

  • <CONNECTOR_NAME> is the name we're giving this connector. For this tutorial, set it to mysql_source_deb_connector.
  • The MySQL connection details from the previous section: <MYSQL_USERNAME>, <MYSQL_PASSWORD>, <MYSQL_HOSTNAME>, <MYSQL_PORT> and <MYSQL_DATABASE_NAME> and <MYSQL_SSL_MODE>
  • <UNIQUE_ID> a number, which must be different for each connector running. For this tutorial, set it to 12345.
  • <KAFKA_TOPIC_PREFIX> will be used as a prefix for all the Kafka topics that this connector sends events to. For this tutorial, set it to example.
  • <MYSQ_TABLES> is the list of MySQL tables to capture changes from. This parameter is optional. The default is the empty string, which means capture all tables. For this tutorial, set it to defaultdb.users, the one table we have. (Note that there's also a parameter called table.exclude.list for the opposite effect.)
  • <NR_TASKS> should be set to 1, since the MySQL connector always uses a single task. This is actually the default value, so the tasks.max parameter can also be omitted.
  • <HISTORY_TOPIC_NAME> is the name of the Apache Kafka topic that contains the history of schema changes. For this tutorial, set it to ddl_history.
  • The Apache Kafka schema registry details from the previous section: <APACHE_KAFKA_HOST>:<SCHEMA_REGISTRY_PORT> and <SCHEMA_REGISTRY_USER>:<SCHEMA_REGISTRY_PASSWORD>
  • The Apache Kafka details from the previous section: <APACHE_KAFKA_HOST>:<APACHE_KAFKA_PORT>

See the Debezium MySQL connector documentation for more infomation.

Start the Debezium source connector

Once the parameters are substituted, we can start the Debezium source connector with:

avn service connector create demo-kafka @mysql_source_deb_connector.json

Since we set the name parameter to mysql_source_deb_connector, the above starts a connector with that name.

We can check its status with:

avn service connector status demo-kafka mysql_source_deb_connector

The status should be RUNNING.

{ "status": { "state": "RUNNING", "tasks": [ { "id": 0, "state": "RUNNING", "trace": "" } ] } }

Check the data in Apache Kafka with kcat

If the connector is up and running, we can check that the data is flowing from MySQL to Apache Kafka with kcat. Get the kcat parameters and download the certificates needed to connect with:

avn service connection-info kcat demo-kafka -u avnadmin -W

The above command will store the three certificates (ca.pem, service.crt and service.key) in the current local folder and return the command to execute.

To verify the data in Apache Kafka we can use the following command, substituting the placeholders for <KAFKA_HOST>, <KAFKA_PORT>, <KAFKA_SCHEMA_REGISTRY_USR>, <KAFKA_SCHEMA_REGISTRY_PWD>, <KAFKA_HOST>, and <KAFKA_SCHEMA_REGISTRY_PORT>.

kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example.defaultdb.users \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>

Note

Note: the above command is consuming (-C flag) from the topic example.defaultdb.users (-t flag) in Apache Avro format (-s flag). The topic name is determined by the concatenation of the connector parameter topic.prefix (example) and the table name (defaultdb.users).

After executing the kcat command, the data in Apache Kafka should be similar to the following:

{"id": 1}{"before": null, "after": {"Value": {"id": 1, "username": {"string": "Francesco"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "first"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275527}, "transaction": null} {"id": 2}{"before": null, "after": {"Value": {"id": 2, "username": {"string": "Ana"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275530}, "transaction": null} {"id": 3}{"before": null, "after": {"Value": {"id": 3, "username": {"string": "Floor"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275000, "snapshot": {"string": "last"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "op": "r", "ts_ms": {"long": 1733226275531}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 3

Not only do we get the three values stored in the users table, but we also get additional useful metadata like when the insert/update/deletion happened (in the ts_ms parameter) and the position in the binlog file.

If we now add a new value in MySQL users table with:

insert into users(username) values ('Carlo');

We should see it immediately pop up in the Apache Kafka topic:

{"id": 4}{"before": null, "after": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226538000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:39"}, "file": "binlog.000290", "pos": 420, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "c", "ts_ms": {"long": 1733226538394}, "transaction": null} % Reached end of topic example.defaultdb.users [0] at offset 4

If we now remove the row containing Carlo with:

delete from users where username='Carlo';

We see the corresponding delete ("op": "d") in Apache Kafka:

{"id": 4}{"before": {"Value": {"id": 4, "username": {"string": "Carlo"}}}, "after": null, "source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226572000, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:40"}, "file": "binlog.000290", "pos": 725, "row": 0, "thread": {"long": 20671}, "query": null}, "op": "d", "ts_ms": {"long": 1733226572691}, "transaction": null} {"id": 4} % Reached end of topic example.defaultdb.users [0] at offset 6

Altering the schema

Where is the MySQL schema stored? There are two topics:

  • one called ddl_history (named by the database.history.kafka.topic parameter) which stores the schema for Debezium's inner workings so that Debezium can resume work if it crashes
  • the other called example (based on the topic.prefix parameter) where the connector stores all the DDLs needed to replicate the state in another MySQL database.

We can fetch the latter topic with kcat:

kcat -b <KAFKA_HOST>:<KAFKA_PORT> \ -X security.protocol=SSL \ -X ssl.ca.location=ca.pem \ -X ssl.key.location=service.key \ -X ssl.certificate.location=service.crt \ -C -t example \ -s avro \ -r https://<KAFKA_SCHEMA_REGISTRY_USR>:<KAFKA_SCHEMA_REGISTRY_PWD>@<KAFKA_HOST>:<KAFKA_SCHEMA_REGISTRY_PORT>

We get the full list of DDLs needed to replicate the state into a new MySQL database.

{"databaseName": ""}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226274272, "snapshot": {"string": "true"}, "db": "", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275042, "databaseName": {"string": ""}, "schemaName": null, "ddl": {"string": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275044, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275086, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP TABLE IF EXISTS `defaultdb`.`users`"}, "tableChanges": [{"type": "DROP", "id": "\"defaultdb\".\"users\"", "table": null}]} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275092, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275098, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "DROP DATABASE IF EXISTS `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275098, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275107, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE DATABASE `defaultdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275107, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": null, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275109, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "USE `defaultdb`"}, "tableChanges": []} {"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226275124, "snapshot": {"string": "true"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 0, "gtid": null, "file": "binlog.000289", "pos": 197, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226275197, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "CREATE TABLE \"users\" (\n \"id\" bigint unsigned NOT NULL AUTO_INCREMENT,\n \"username\" varchar(100) DEFAULT NULL,\n PRIMARY KEY (\"id\"),\n UNIQUE KEY \"id\" (\"id\")\n)"}, "tableChanges": [{"type": "CREATE", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 6

Warning

The response also contains database truncation statements which might delete other tables present in the same target database.

If we now add a column to the source MySQL database with:

alter table users add column country varchar(3);

We get a new entry in the example topic defining the DDL change we just implemented:

{"databaseName": "defaultdb"}{"source": {"version": "2.5.0.Final.Aiven", "connector": "mysql", "name": "example", "ts_ms": 1733226759134, "snapshot": {"string": "false"}, "db": "defaultdb", "sequence": null, "table": {"string": "users"}, "server_id": 710603388, "gtid": {"string": "3f6f07d7-b0a4-11ef-9d29-0adcce978e09:41"}, "file": "binlog.000291", "pos": 274, "row": 0, "thread": null, "query": null}, "ts_ms": 1733226759153, "databaseName": {"string": "defaultdb"}, "schemaName": null, "ddl": {"string": "alter table users add column country varchar(3)"}, "tableChanges": [{"type": "ALTER", "id": "\"defaultdb\".\"users\"", "table": {"Table": {"defaultCharsetName": {"string": "utf8mb4"}, "primaryKeyColumnNames": {"array": ["id"]}, "columns": [{"name": "id", "jdbcType": -5, "nativeType": null, "typeName": "BIGINT UNSIGNED", "typeExpression": {"string": "BIGINT UNSIGNED"}, "charsetName": null, "length": null, "scale": null, "position": 1, "optional": {"boolean": false}, "autoIncremented": {"boolean": true}, "generated": {"boolean": true}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "username", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 100}, "scale": null, "position": 2, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}, {"name": "country", "jdbcType": 12, "nativeType": null, "typeName": "VARCHAR", "typeExpression": {"string": "VARCHAR"}, "charsetName": {"string": "utf8mb4"}, "length": {"int": 3}, "scale": null, "position": 3, "optional": {"boolean": true}, "autoIncremented": {"boolean": false}, "generated": {"boolean": false}, "comment": null, "defaultValueExpression": null, "enumValues": null}], "comment": null}}}]} % Reached end of topic example [0] at offset 7

Conclusion

The Debezium connector for Apache Kafka tracks, in real time, all the changes happening in one or more MySQL tables into a series of topics by reading the binlog. The messages in Apache Kafka contain the update data and are also enriched with metadata information like the transaction ids and timestamp. Furthermore, the usage of a dedicated topic to track DDL changes enables additional control over the table structures useful to replicate the database structure when needed.

You can dig more with the following resources: