Change data capture (CDC) Beta

Change data capture (CDC) can be used to asynchronously stream data changes from a YugabyteDB cluster to external systems like message queues and OLAP warehouses. The data changes in YugabyteDB are detected, captured, and then output to the specified target. In the steps below, you will use a local YugabyteDB cluster to stream data changes to stdout using the CDC API.

If you haven’t installed YugabyteDB yet, do so first by following the Quick Start guide.

Prerequisites

Java

A JRE (or JDK), for Java 8 or later, is installed. JDK and JRE installers for Linux, macOS, and Windows can be downloaded from OpenJDK, AdoptOpenJDK, or Azul Systems.

1. Add a database table

Start your local YugabyteDB cluster and run ysqlsh to connect to the service.

  1. $ ./bin/ysqlsh

Add a table, named users, to the default yugabyte database.

  1. CREATE TABLE products(
  2. id bigserial PRIMARY KEY,
  3. created_at timestamp,
  4. category text,
  5. ean text,
  6. price float,
  7. quantity int default(5000),
  8. rating float,
  9. title text,
  10. vendor text
  11. );

2. Download the YugabyteDB CDC Connector

Download the CDC Connector JAR file (yb-cdc-connector.jar).

  1. $ wget -O yb-cdc-connector.jar https://github.com/yugabyte/yb-kafka-connector/blob/master/yb-cdc/yb-cdc-connector.jar?raw=true

3. Stream the log output stream to “stdout”

Run the command below to to start logging an output stream of data changes from the YugabyteDB cdc table to stdout.

  1. java -jar yb-cdc-connector.jar \
  2. --table_name yugabyte.products \
  3. --log_only

The example above uses the following parameters:

  • —table_name — Specifies the namespace and table, where namespace is the database (YSQL) or keyspace (YCQL).
  • —master_addrs — Specifies the IP addresses for all of the YB-Master servers that are producing or consuming. Default value is 127.0.0.1:7100. If you are using a 3-node local cluster, then you need to specify a comma-delimited list of the addresses for all of your YB-Master servers.
  • —log_only: Flag to restrict logging only to the console (stdout).

4. Insert values and observe

In another terminal shell, write some values to the table and observe the values on your stdout output stream.

  1. INSERT INTO products (id, category, created_at, ean, price, rating, title, vendor) VALUES (14,'Widget','2017-12-31T14:41:56.870Z',8833419218504,25.09876359271891,4.0,'Awesome Concrete Shoes','McClure-Lockman');
  1. 2020-01-16 14:52:01,597 [INFO|org.yb.cdc.LogClient|LogClient] time: 6468465138045198336
  2. operation: WRITE
  3. key {
  4. key: "id"
  5. value {
  6. int64_value: 14
  7. }
  8. }
  9. changes {
  10. key: "created_at"
  11. value {
  12. int64_value: 568046516870000
  13. }
  14. }
  15. changes {
  16. key: "category"
  17. value {
  18. string_value: "Widget"
  19. }
  20. }
  21. changes {
  22. key: "ean"
  23. value {
  24. string_value: "8833419218504"
  25. }
  26. }
  27. changes {
  28. key: "price"
  29. value {
  30. double_value: 25.09876359271891
  31. }
  32. }
  33. changes {
  34. key: "quantity"
  35. value {
  36. int32_value: 5000
  37. }
  38. }
  39. changes {
  40. key: "rating"
  41. value {
  42. double_value: 4.0
  43. }
  44. }
  45. changes {
  46. key: "title"
  47. value {
  48. string_value: "Awesome Concrete Shoes"
  49. }
  50. }
  51. changes {
  52. key: "vendor"
  53. value {
  54. string_value: "McClure-Lockman"
  55. }
  56. }