Kafka Connector to MySQL Source – Confluent JDBC Driver – Example

Kafka Connector to MySQL Source

Kafka Connector to MySQL Source – In this Kafka Tutorial, we shall learn to set up a connector to import and listen on a MySQL Database.

To setup a Kafka Connector to MySQL Database source, follow the step by step guide :

  1. Install Confluent Open Source Platform.

    Refer Install Confluent Open Source Platform.

  2. Download MySQL connector for Java.

    MySQL connector for java is required by the Connector to connect to MySQL Database. Download MySQL connector for java, mysql-connector-java-5.1.42-bin.jar , from https://dev.mysql.com/downloads/connector/j/5.1.html.

  3. Copy MySQL Connector Jar.

    Add the jar to existing Kafka Connect JDBC Jars. [Location in Ubuntu /usr/share/java/kafka-connect-jdbc].

  4. Configure Data Source Properties.

    Create a file, /etc/kafka-connect-jdbc/source-quickstart-mysql.properties with following content.

    Following are the configuration values that you might need to adjust for your MySQL database
    connection.url
    connection.url=jdbc:mysql://127.0.0.1:3306/<DatabaseName>?user=<username>&password=<password>
    username and password are the user credentials with which you login to MySQL Database.
    incrementing.column.name

    The name of the strictly incrementing column in the tables of your database to use to detect new rows. Any empty value indicates the column should be autodetected by looking for an auto-incrementing column. This column may not be nullable. If you don’t have a column with these properties, you may update one of the column with following SQL Commands.

    ALTER TABLE <table_name> MODIFY COLUMN <column_name> INT auto_increment
    ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>)

    topic.prefix
    Prefix to prepend to table names to generate the name of the Kafka topic to publish data to, or in the case of a custom query, the full name of the topic to publish to.
    Example : If your topic.prefix=test-mysql-jdbc-  and if you have a table named students  in your Database, the topic name to which Connector publishes the messages would be test-mysql-jdbc-students .

  5. Start Zookeeper, Kafka and Schema Registry

    To start Zookeeper, Kafka and Schema Registry, run the following confluent command

    $ confluent start schema-registry
     
  6. Start standalone connector.

    Run the following command to start standalone connector.

    $ /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

    The Connector should start successfully.

  7.  Start a Console Consumer.

    To verify the messages posted to the topic, start a consumer that subscribes to topic named test-mysql-jdbc-students. [ students  is the table name and test-mysql-jdbc-  is topic.prefix] Run the following command to start a consumer

    /usr/bin/kafka-avro-console-consumer –topic test-mysql-jdbc-students –zookeeper localhost:2181 –from-beginning

    You may replace test-mysql-jdbc-students with the name that your configuration and tables in the MySQL Database generate.

  8. Add a row to the MySQL Table.

    Let us add a row to MySQL Table, students and check if the Console Consumer would receive the message.

    And the consumer receives the message

     

Conclusion :

In this Apache Kafka Tutorial – Kafka Connector to MySQL Source, we have learnt to setup a Connector to import data to Kafka from MySQL Database Source using Confluent JDBC Connector and MySQL Connect Driver.