Apache Kafka

In this tutorial, we are going to use the Kafka Connect-based Sink Connector for YugabyteDB to store events from Apache Kafka into YugabyteDB using the YCQL API.

1. Start local cluster

Start a YugabyteDB cluster on your local machine. Check that you are able to connect to YugabyteDB using cqlsh by doing the following.

  1. $ ./bin/cqlsh localhost
  1. Connected to local cluster at 127.0.0.1:9042.
  2. [cqlsh 5.0.1 | Cassandra 3.9-SNAPSHOT | CQL spec 3.4.2 | Native protocol v4]
  3. Use HELP for help.
  4. cqlsh>

Create the table that would store the Kafka events.

  1. cqlsh> CREATE KEYSPACE IF NOT EXISTS demo;
  2. cqlsh> CREATE TABLE demo.test_table (key text, value bigint, ts timestamp, PRIMARY KEY (key));

2. Download Apache Kafka

Download from the Apache Kafka downloads page. This tutorial uses the 2.11 version of Apache Kafka.

  1. $ mkdir -p ~/yb-kafka && cd ~/yb-kafka
  2. $ wget http://apache.cs.utah.edu/kafka/2.0.0/kafka_2.11-2.0.0.tgz
  3. $ tar xvfz kafka_2.11-2.0.0.tgz && cd kafka_2.11-2.0.0

3. Install the Kafka Sink Connector for YugabyteDB

Clone the git yb-kafka-connector git repo.

  1. $ cd ~/yb-kafka
  2. $ git clone https://github.com/yugabyte/yb-kafka-connector.git
  3. $ cd yb-kafka-connector/

Build the repo to get the connector jar.

  1. $ mvn clean install -DskipTests

The connector jar yb-kafka-connnector-1.0.0.jar is now placed in the ./target directory. Copy this jar to the libs directory in Kafka Home.

  1. $ cp ./target/yb-kafka-connnector-1.0.0.jar ~/yb-kafka/kafka_2.11-2.0.0/libs/

Go to the Kafka libs directory and get the additional jars that the connector depends on (including the driver for the YCQL API)

  1. $ cd ~/yb-kafka/kafka_2.11-2.0.0/libs/
  2. $ wget http://central.maven.org/maven2/io/netty/netty-all/4.1.25.Final/netty-all-4.1.25.Final.jar
  3. $ wget http://central.maven.org/maven2/com/yugabyte/cassandra-driver-core/3.2.0-yb-18/cassandra-driver-core-3.2.0-yb-18.jar
  4. $ wget http://central.maven.org/maven2/com/codahale/metrics/metrics-core/3.0.1/metrics-core-3.0.1.jar

4. Start ZooKeeper and Kafka

Now you can start ZooKeeper and Kafka as shown below.

  1. $ cd ~/yb-kafka/kafka_2.11-2.0.0
  1. $ ./bin/zookeeper-server-start.sh config/zookeeper.properties &
  1. $ ./bin/kafka-server-start.sh config/server.properties &

Now create the Kafka topic that will be used to persist messages in the YugabyteDB table we created earlier.

  1. $ ./bin/kafka-topics.sh --create \
  2. --zookeeper localhost:2181 \
  3. --replication-factor 1 \
  4. --partitions 1 \
  5. --topic test

5. Start Kafka Sink Connector for YugabyteDB

At this point, we have YugabyteDB’s YCQL APU running at 9042 port with the test_table table created in the demo keyspace. We also have Kafka running at the 9092 port with the test_topic topic created. We are ready to start the connector.

  1. $ ./bin/connect-standalone.sh \
  2. ~/yb-kafka/yb-kafka-connector/resources/examples/kafka.connect.properties \
  3. ~/yb-kafka/yb-kafka-connector/resources/examples/yugabyte.sink.properties

The yugabyte.sink.properties file used above (and shown below) has the configuration necessary for this sink to work correctly. You will have to change this file to the Kafka topic and YugabyteDB table necessary for your application.

  1. # Sample yugabyte sink properties.
  2. name=yugabyte-sink
  3. connector.class=com.yb.connect.sink.YBSinkConnector
  4. topics=test_topic
  5. yugabyte.cql.keyspace=demo
  6. yugabyte.cql.tablename=test_table

6. Produce events for Kafka

We can now produce some events into Kafka using the kafka-console-producer.sh utility that ships with Kafka.

  1. $ ~/yb-kafka/kafka_2.11-2.0.0/bin/kafka-console-producer.sh
  2. --broker-list localhost:9092 \
  3. --topic test_topic

Enter the following.

  1. {"key" : "A", "value" : 1, "ts" : 1541559411000}
  2. {"key" : "B", "value" : 2, "ts" : 1541559412000}
  3. {"key" : "C", "value" : 3, "ts" : 1541559413000}

7. Verify events in YugabyteDB

The events above should now show up as rows in the YugabyteDB table.

  1. cqlsh> SELECT * FROM demo.test_table;
  1. key | value | ts
  2. ----+-------+---------------------------------
  3. A | 1 | 2018-11-07 02:56:51.000000+0000
  4. C | 3 | 2018-11-07 02:56:53.000000+0000
  5. B | 2 | 2018-11-07 02:56:52.000000+0000