Kafka Connector to MySQL Source with Confluent JDBC
Kafka Connector to MySQL Source reads rows from a MySQL database table and writes them to Kafka topics through Kafka Connect. In this Kafka Tutorial, we shall set up the Confluent JDBC Source Connector, add the MySQL JDBC driver, run the connector in standalone mode, and verify the imported rows with an Avro console consumer.
This example uses mode=incrementing with a MySQL column named rollno. Kafka Connect polls the table and imports rows whose rollno value is greater than the last value it has already processed.
MySQL to Kafka flow in this JDBC source connector example
- MySQL contains the source table named
students. - MySQL Connector/J lets Kafka Connect open a JDBC connection to MySQL.
- Confluent JDBC Source Connector polls the table.
- Kafka receives the rows in a topic named from
topic.prefixplus the table name. - Schema Registry is used because the example verifies messages with
kafka-avro-console-consumer.
With the configuration used below, topic.prefix=test-mysql-jdbc- and the table is students, so the generated topic is test-mysql-jdbc-students. For connector properties and version-specific details, refer to the official Confluent JDBC Source Connector documentation. For managed connectors, see the Confluent Cloud MySQL Source connector.
Before configuring Kafka Connect for a MySQL source table
- Install Apache Kafka or Confluent Platform.
- Install the Kafka Connect JDBC connector.
- Keep MySQL running and reachable from the Kafka Connect host.
- Create a MySQL user that can read the source database and table.
- Copy MySQL Connector/J to the Kafka Connect JDBC plugin path.
- Use a table with a suitable tracking column, such as an auto-increment primary key for
mode=incrementing.
1. Install Confluent Platform and Kafka Connect JDBC connector
Refer Install Confluent Open Source Platform.
The commands in this tutorial follow the older local Confluent Platform style. Newer installations may use different service commands, but the MySQL JDBC source connector configuration remains similar.
2. Download MySQL Connector/J for the Kafka Connect worker
MySQL connector for java is required by the Connector to connect to MySQL Database. Download MySQL connector for java, mysql-connector-java-5.1.42-bin.jar , from https://dev.mysql.com/downloads/connector/j/5.1.html.
For a new setup, use a MySQL Connector/J version compatible with your MySQL server and Java runtime. Current packages may use a name such as mysql-connector-j-x.x.x.jar; the older jar name is kept here for the original example.
3. Copy MySQL Connector/J into the Kafka Connect JDBC plugin path
Add the jar to existing Kafka Connect JDBC Jars. [Location in Ubuntu /usr/share/java/kafka-connect-jdbc].
sudo cp mysql-connector-j-*.jar /usr/share/java/kafka-connect-jdbc/
Restart the Kafka Connect worker after copying the driver. If the driver is missing from the plugin path, the connector usually fails with a driver class or “no suitable driver” error when it tries to connect to MySQL.
4. Prepare a MySQL students table for incrementing mode
The sample connector uses mode=incrementing. The tracking column must be numeric, non-null, and strictly increasing for newly inserted rows.
CREATE DATABASE IF NOT EXISTS studentsDB;
USE studentsDB;
CREATE TABLE IF NOT EXISTS students (
rollno INT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
marks INT,
PRIMARY KEY (rollno)
);
If your table does not have an incrementing column, consider another JDBC source mode such as timestamp or timestamp+incrementing, provided the table has reliable timestamp columns.
5. Configure source-quickstart-mysql.properties for the MySQL JDBC source
Create a file, /etc/kafka-connect-jdbc/source-quickstart-mysql.properties with following content.
name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=arjun&password=password
mode=incrementing
incrementing.column.name=rollno
topic.prefix=test-mysql-jdbc-
Following are the configuration values that you might need to adjust for your MySQL database
- connection.url
connection.url=jdbc:mysql://127.0.0.1:3306/<strong><DatabaseName></strong>?user=<strong><username></strong>&password=<strong><password></strong>whereusernameandpasswordare the user credentials with which you login to MySQL Database. - incrementing.column.name
The name of the strictly incrementing column in the tables of your database to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable. If you don’t have a column with these properties, you may update one of the column with following SQL Commands.
ALTER TABLE <table_name> MODIFY COLUMN <column_name> INT auto_increment
ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>)
- topic.prefix
Prefix to prepend to table names to generate the name of the Kafka topic to publish data to, or in the case of a custom query, the full name of the topic to publish to.
Example : If yourtopic.prefix=test-mysql-jdbc-and if you have a table namedstudentsin your Database, the topic name to which Connector publishes the messages would betest-mysql-jdbc-students.
Connector properties that control MySQL polling and Kafka topics
| Property | Use in this MySQL source connector |
|---|---|
connector.class | Uses io.confluent.connect.jdbc.JdbcSourceConnector. |
connection.url | JDBC URL for the MySQL database. Protect credentials in shared environments. |
mode | Controls how rows are detected. This example uses incrementing. |
incrementing.column.name | Column used to identify new rows in incrementing mode. |
topic.prefix | Prefix added to table names to create Kafka topic names. |
6. Start ZooKeeper, Kafka and Schema Registry for the Avro example
To start Zookeeper, Kafka and Schema Registry, run the following confluent command
$ confluent start schema-registry
$ confluent start schema-registry
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Schema Registry is required here because this tutorial verifies the records with the Avro console consumer. If your worker uses JSON converters, use a matching JSON consumer command instead.
7. Start the MySQL JDBC source connector in standalone mode
Run the following command to start standalone connector.
$ /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties
The Connector should start successfully.
Standalone mode is suitable for this local example because the worker and connector properties are passed as files. In shared Kafka Connect environments, distributed mode is usually used and connector configuration is submitted through the Kafka Connect REST API.
8. Verify MySQL rows in the Kafka topic with Avro console consumer
To verify the messages posted to the topic, start a consumer that subscribes to topic named test-mysql-jdbc-students. [students is the table name and test-mysql-jdbc- is topic.prefix].
Run the following command to start a consumer
/usr/bin/kafka-avro-console-consumer –topic test-mysql-jdbc-students –zookeeper localhost:2181 –from-beginning
You may replace test-mysql-jdbc-students with the name that your configuration and tables in the MySQL Database generate.
root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}
For newer Kafka clients, use --bootstrap-server instead of the older --zookeeper option.
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic test-mysql-jdbc-students \
--from-beginning
9. Insert a new MySQL row and confirm Kafka receives it
Let us add a row to MySQL Table, students and check if the Console Consumer would receive the message.
mysql> use studentsDB;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> INSERT INTO students (name, marks) VALUES ('Sastri',88);
Query OK, 1 row affected (0.06 sec)
And the consumer receives the message
root@tutorialkart:~# /usr/bin/kafka-avro-console-consumer --topic test-mysql-jdbc-students --zookeeper localhost:2181 --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"name":{"string":"John"},"rollno":1,"marks":{"int":84}}
{"name":{"string":"Arjun"},"rollno":2,"marks":{"int":84}}
{"name":{"string":"Prasanth"},"rollno":3,"marks":{"int":77}}
{"name":{"string":"Adarsh"},"rollno":4,"marks":{"int":78}}
{"name":{"string":"Raja"},"rollno":5,"marks":{"int":94}}
{"name":{"string":"Sai"},"rollno":6,"marks":{"int":84}}
{"name":{"string":"Ross"},"rollno":7,"marks":{"int":54}}
{"name":{"string":"Monica Gellar"},"rollno":8,"marks":{"int":86}}
{"name":{"string":"Lee"},"rollno":9,"marks":{"int":98}}
{"name":{"string":"Bruce Wane"},"rollno":10,"marks":{"int":92}}
{"name":{"string":"Jack"},"rollno":11,"marks":{"int":82}}
{"name":{"string":"Priya"},"rollno":12,"marks":{"int":88}}
{"name":{"string":"Amy"},"rollno":13,"marks":{"int":84}}
{"name":{"string":"Sastri"},"rollno":14,"marks":{"int":88}}
The new record appears with rollno value 14. Because the connector stores its offset, the next poll reads only rows with a higher incrementing value.
Important behavior of the MySQL JDBC source connector
- Incrementing mode reads new higher IDs. Updates to old
rollnovalues are not emitted again. - Deletes are not captured. The JDBC source connector polls query results; it does not read the MySQL binary log.
- Topic names depend on table names. The prefix
test-mysql-jdbc-plusstudentsgivestest-mysql-jdbc-students. - Converters decide message format. Avro, JSON, and String converters require matching consumer tools.
Troubleshooting Kafka Connect MySQL JDBC source connector issues
| Issue | What to check |
|---|---|
| MySQL driver or no suitable driver error | Copy MySQL Connector/J to the Kafka Connect JDBC plugin path and restart the worker. |
| No records in the topic | Check the table name, topic prefix, connector logs, and whether new rows match the selected mode. |
| Updates are not appearing | mode=incrementing is for new higher IDs. Use timestamp-based mode if the table supports it. |
| Consumer cannot read Avro messages | Verify Schema Registry and the worker converter configuration. |
| Connection refused or access denied | Test the MySQL host, port, username, password, and privileges from the Connect host. |
Kafka Connector to MySQL Source FAQs
Does the Kafka JDBC Source Connector capture MySQL updates and deletes?
Not with this incrementing configuration. It reads rows with new higher incrementing values. Updates to already processed rows and deletes are not captured as separate events.
Which MySQL column should be used for incrementing.column.name?
Use a numeric, non-null column that always increases for new rows. An auto-increment primary key is the common choice.
Why is the Kafka topic named test-mysql-jdbc-students?
The connector combines topic.prefix with the source table name. Here, test-mysql-jdbc- plus students becomes test-mysql-jdbc-students.
Can this MySQL source connector run without Schema Registry?
Yes, if the Kafka Connect worker uses non-Avro converters such as JSON converters. Schema Registry is used in this tutorial because the verification command uses an Avro console consumer.
Should this connector run in standalone or distributed mode?
Standalone mode is convenient for a local tutorial. Distributed mode is better for shared Connect clusters because connectors are managed through the Kafka Connect REST API.
QA checklist for the Kafka MySQL JDBC connector tutorial
- The MySQL database, user, password, and table name in
connection.urlmatch the local setup. - MySQL Connector/J is in the Kafka Connect JDBC plugin path.
- The
rollnocolumn is numeric, non-null, and increasing. - The expected Kafka topic name is built from
topic.prefixand the MySQL table name. - The consumer command matches the converter used by the Kafka Connect worker.
- The article explains that this JDBC polling connector is not a full MySQL CDC connector.
Conclusion: importing MySQL rows into Kafka with JDBC Source Connector
In this Apache Kafka Tutorial – Kafka Connector to MySQL Source, we have learnt to setup a Connector to import data to Kafka from MySQL Database Source using Confluent JDBC Connector and MySQL Connect Driver.
The essential steps are to install the JDBC connector, place MySQL Connector/J in the Kafka Connect plugin path, configure source-quickstart-mysql.properties, start the connector, and verify the generated Kafka topic. For this example, mode=incrementing works because the students table has a strictly increasing rollno column.
TutorialKart.com