CDC

概述

​ Change data capture(CDC)功能提供高效的、分布式、行级别的数据变化推送。

​ 可以通过CDC将变化推送到下游处理,比如你可能希望将数据的所有内容发送到全文检索,分析型数据库或者大数据系统。

​ CDC的一个核心特性为变化推送(changefeed),通过设置被检视的表格,changefeed会将表格中所有行的每次变化以特定的格式(JSON或Avro)发送记录,发送到配置的Kafka连接中。

顺序保证

  • 节点错误或网络分裂等问题出现时可能使得重复发送changefeed,这可以保证changefeed至少交付一次。

  • 带有时间戳的变化被交付后,不会再有对应行带有更早时间戳的变化被发送。

  • 如果一行数据在一则事务中被多次修改,只会发送最后的变化结果。

  • 数据到Kafka的分区中依据数据库中行的主键进行划分。

  • 使用选项UPDATED在发送时为每一行添加一个”updated”时间戳。也可以用RESOLVED选项发送周期性的”resolved”时间戳信息到每一个Kafka分区。一个”resolved”的时间戳可以保证更早时间戳的行不会被发送到该分区。

  • 刨去重复发送的情况,一行数据的发出顺序与事务更新该行的顺序相同。但是不保证不同行的数据变化总与事务更新它们的顺序相同。根据”updated”时间戳可以比较两行变化的发生顺序,这适用于同一集群中任意两个部分(如不同的表格间,不同的节点间)。”resolved”时间戳可以比较不同时间戳并缓存这些记录,来保证强顺序性和全局一致性。

带有列回填的模式变化

​ 当被检视的行发生了带有列回填的模式变化(例如,添加一个带有默认值的列,添加一个计算的列,添加一个NOT NULL的列,删除一列)时,changefeed将在回填阶段重复发送一些变化。结束后,用一个新的模式输出所有受检视的行。使用Avro时,被模式变化回填的行也会被重复发送。

举一个带有列回填的模式变化的例子,创建一个changefeed像下面这样发送信息:

[1] {“id”: 1, “name”: “Petee H”} [2] {“id”: 2, “name”: “Carl”} [3] {“id”: 3, “name”: “Ernie”}

添加一列到受检视的表格中:

>ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

在使用新的模式输出记录前,重复发送三条changefeed信息:

[1] {“id”: 1, “name”: “Petee H”}

[2] {“id”: 2, “name”: “Carl”}

[3] {“id”: 3, “name”: “Ernie”}

[1] {“id”: 1, “name”: “Petee H”}Duplicate

[2] {“id”: 2, “name”: “Carl”} # Duplicate

[3] {“id”: 3, “name”: “Ernie”} # Duplicate

[1] {“id”: 1, “likes_treats”: true, “name”: “Petee H”}

[2] {“id”: 2, “likes_treats”: true, “name”: “Carl”}

[3] {“id”: 3, “likes_treats”: true, “name”: “Ernie”}

使用rangefeed减少发送延迟

​ 原本在创建changefeed后,要通过定期发送请求获得最近的变更。现在它可以保持一个长时间的连接(即rangefeed),在变化发生的时候推送这些记录。这将减少行变化的等待时间,也减少受检视的表格在一些工作负载中发生的事务重启。

​ 要打开rangefeed,设置cluster setting中的kv.rangefeed.enabled到true。如果这个设置开启,任何的创建changefeed都将会报错。注意目前开启rangefeed会有一点性能消耗(大约5%-10%的延迟提高),无论是否在changefeed中使用了rangefeed。

如果遇到了问题,可以回退之前的设置,将kv.changefeed.push.enabled设为false。在未来的版本这个设置将会移除。

注意:

​ 为已经存在的changefeed启动rangefeed必须先重启changefeed。暂停并恢复job实现重新启用changefeed。

​ cluster setting的kv.closed_timestamp.target_duration设置可以用在集群推送的changefeed中。已完成的时间戳将至少在这个设定的时间长度后,减少时间长度会导致集群中事务的多次重启,继而影响性能。

配置changefeed

可以创建,暂停,恢复,取消,监控,调试changefeed。

创建

创建一个changefeed:

>CREATE CHANGEFEED FOR TABLE name INTO ‘scheme://host:port’;

更多详情,查看CREATE CHANGEFEED。

暂停

暂停一个changefeed:

>PAUSE JOB job_id;

更多详情,查看PAUSE JOB。

恢复

恢复一个暂停的changefeed:

>RESUME JOB job_id;

更多详情,查看RESUME JOB。

取消

取消一个changefeed:

>CANCEL JOB job_id;

更多详情,查看CANCEL JOB。

监控changefeed

​ changefeed进程以一个高水位的时间戳暴露给用户,随着changefeed的运行而后移。这保证了所有此前的或者正在本时间戳的所有变化全部发送。你可以像这样监控changefeed:

  • 在Admin UI上的Changefeed Dashboard

  • 在Admin UI的Jobs page,将鼠标悬停在高水位的时间戳上查看系统时间。

  • 使用ZNBase_internal.jobs:

> SELECT * FROM ZNBase_internal.jobs WHERE job_id = <job_id>;

job_id | job_type | description | … | high_water_timestamp | error | coordinator_id

+——————————+——————+————————————————————————————————————+ … +————————————————+———-+————————+ 383870400694353921 | CHANGEFEED | CREATE CHANGEFEED FOR TABLE office_dogs2 INTO ‘kafka://localhost:9092’ | … | 1537279405671006870.0000000000 |

在统计数据changefeed.max_behind_nanos上设置警报,追踪changefeed的高水位标记时间戳是否存在落后于集群垃圾回收窗口的风险。更多的内容,查看监控与报警。

注意:

在另一个changefeed结束时可以使用高水位时间戳启动一个新的changefeed。

调试changefeed

​ 使用日志信息调试changefeed使用中的连接问题(比如,kafka: client has run out of available brokers to talk to (Is your cluster reachable?)) 寻找日志中存在kafka-producer的行来调试:

I190312 18:56:53.535646 585 vendor/github.com/Shopify/sarama/client.go:123 [kafka-producer] Initializing new client I190312 18:56:53.535714 585 vendor/github.com/Shopify/sarama/client.go:724 [kafka-producer] client/metadata fetching metadata for all topics from broker localhost:9092 I190312 18:56:53.536730 569 vendor/github.com/Shopify/sarama/broker.go:148 [kafka-producer] Connected to broker at localhost:9092 (unregistered) I190312 18:56:53.537661 585 vendor/github.com/Shopify/sarama/client.go:500 [kafka-producer] client/brokers registered new broker #0 at 172.16.94.87:9092 I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170 [kafka-producer] Successfully initialized new client |

使用范例

创建一个连接到Kafka的changefeed

在本示例中,你将为一个单节点集群创建changefeed连接到Kafka sink。

  1. 启动服务:

$ bini start —insecure —listen-addr=localhost —background

  1. 下载并解压Confluent Open Source platform(包含Kafka服务)

  2. 将解压文件移动到confluent-<version>文件夹并启动服务:

$ ./bin/confluent start

只需要zookeeper和kafka两个文件。confluent的使用如遇问题,请查看他们的文档。

  1. 创建一个Kafka topic:

$ ./bin/kafka-topics \ —create \ —zookeeper localhost:2181 \ —replication-factor 1 \ —partitions 1 \ —topic office_dogs

  1. 以root用户打开内置的SQL client:

$ ZNBase sql —insecure

  1. 打开cluster setting的kv.rangefeed.enabled:

> SET CLUSTER SETTING kv.rangefeed.enabled = true;

  1. 创建一个数据库:

> CREATE DATABASE cdc_demo;

  1. 设置为默认数据库:

> SET DATABASE cdc_demo;

  1. 创建表格并添加数据:

> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);

> INSERT INTO office_dogs VALUES (1, ‘Petee’), (2, ‘Carl’);

> UPDATE office_dogs SET name = ‘Petee H’ WHERE id = 1;

  1. 启动changefeed:

> CREATE CHANGEFEED FOR TABLE office_dogs INTO ‘kafka://localhost:9092’;

job_id

+——————————+

360645287206223873

这将在后台启动changefeed并返回job_id。changefeed会写入到Kafka。

  1. 在一个新的终端进入解压缩的文件夹confluent-<version>并启动查看Kafka topic:

$ ./bin/kafka-console-consumer \ —bootstrap-server=localhost:9092 \ —property print.key=true \ —from-beginning \ —topic=office_dogs $ [1] {“id”: 1, “name”: “Petee H”} [2] {“id”: 2, “name”: “Carl”}

注意初始化扫描显示changefeed启动时的表格状态(因此初始化的”Petee”被忽略掉)。

  1. 回到SQL client插入更多数据:

> INSERT INTO office_dogs VALUES (3, ‘Ernie’);

  1. 回到查看Kafka topic的终端,可以看到下面的输出:

$ [3] {“id”: 3, “name”: “Ernie”}

  1. 完成后退出SQL shell(\q)

  2. 停止服务,执行以下命令:

$ ZNBase quit —insecure

  1. 停止Kafka,进入解压缩的confluent-<version>文件夹并停止Confluent:

$ ./bin/confluent stop FEED

CREATE CHANGEFEED

CREATE CHANGEFEED 语句创建一个新的数据变化推送,提供行级别的推送服务。

权限需求

changefeed只能被超级用户创建。例如admin角色的成员。admin角色默认存在root用户作为成员。

语法格式

CDC - 图1

参数说明

参数描述
table_name表格名或以逗号分隔的表格名列表,用来创建changefeed
sinksink的位置。URI的schema标明类型。更多信息见下方Sink URI
option/value可选的选项和对应的值。见下方选项。

Sink URI

Sink URI的基本格式如下:

‘[scheme]😕/[host]“[port]?[query_parameters]‘

Scheme可以是kafka或者下面的任何的云存储sink。

1、Kafka

Kafka sink URI示例:

‘kafka://broker.address.com:9092?topic_prefix=bar_&tls_enable=true&ssl_user=petee&sasl_password=bones’

查询参数包括:

参数值类型描述
topicprefixSTRING在所有的topic名前添加前缀。 例如,CREATE CHANGEFEED FOR TABLE foo INTO ‘kafka://…?topic_prefix=bar’将会发送在topic为bar_foo下而不是foo下。
tls_enabled=trueBOOL允许 Kafka 连接使用TLS。可以和ca_cert一起使用。
ca_certSTRING编码的 ca_cert 文件。 如果要编码ca.cert文件,执行base64 -w 0 ca.cert
sasl_enabledBOOL如果为 true,则使用 SASL/PLAIN 认证。需要 sasl_user 和 sasl_pasword。
sasl_userSTRINGSASL 的用户名。
sasl_passwordSTRINGSASL 的密码。

2、云存储sink

用云存储sink将changefeed数据发送到OLAP数据库或者大数据系统,而无需通过Kafka进行传输。

注意:目前云存储 sink 只支持 JSON 格式,并且使用发送换行符分割的JSON文件。一个云存储sink(即AWS S3) URI的例子:

‘experimental-s3://test-s3encryption/test?AWS_ACCESS_KEY_ID=ABCDEFGHIJKLMNOPQ&AWS_SECRET_ACCESS_KEY=LS0tLS1CRUdJTiBDRVJUSUZ’

注意:云存储的scheme应该使用前缀experimental-。

下面的云存储都可以用作 sink:

[scheme]😕/[host]/[path]?[parameters]
位置Scheme主机参数
Amazon S3S3Bucket名AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
AzureazureN/AAZURE_ACCOUNT_KEY, AZURE_ACCOUNT_NAME
Google CloudgsBucket名AUTH (optional; default, implicit, 或 specified), CREDENTIALS
HTTPhttp远端主机N/A
NFS/Localnodelocal空或 nodeIDN/A
S3-compatible servicesS3Bucket名AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION (可选), AWS_ENDPOINT

警告:

如果写到多节点的集群的nodelocal本地存储,则单个数据文件将被写入任意节点extern目录,且有可能无法按照预期工作。为了正确运行,每个节点必须使用—external-io-dir选项,指向相同的NFS或其他网络支持的共享存储。

注意:

位置的参数经常包含需要以URI形式编码的特殊的字符,可以用Javascript的encodeURIComponent函数或者Go语言的url.QueryEscape函数编码。其他语言也会提供类似的函数转转义这些特殊字符。

注意:

如果你的环境需要一个HTTP或者HTTPS代理服务器进行Go语言的相关连接,你可以在启动服务前设置基础的HTTP_PROXY和HTTPS_PROXY环境变量。

  • 如果AUTH未指定参数,则cluster setting的cloudstorage.gs.default.key选项非空,否则AUTH将使用implicit。如果AUTH参数是implicit,所有GCS连接会使用Google的默认认证策略。如果AUTH参数是default,cloudstorage.gs.default.key需要被设为授权过程中使用的服务账户文件的内容。如果AUTH参数是specified,则根据每个语句对GCS连接进行身份验证,允许JSON键对象在CREDENTIALS参数中被发送。JSON键要基于64位编码。

  • 你可以用Caddy或者ngix创建自己的HTTP服务。通过设置cloudstorage.http.customer_ca,一个常规的根CA可以追加到系统的默认CA,从HTTPS URL确认证书的时候起到作用。

  • NFS 驱动上的文件系统备份位置取决于启动节点—external-io-dir指定的路径。如果该标志设置为disable,则禁止从本地目录和NFS驱动器导入。

  • NFS/Local的主机可以为空,也可以为nodeID。如果指定nodeID,也忽略(即,可以向任何节点发送,并且看上去像本地的文件的输入输出)。但是nodeID未来可能会有用。

    3、文件URL例子

位置例子
Amazon S3s3://acme-co/employees.sql?AWS_ACCESS_KEY_ID=123&AWS_SECRET_ACCESS_KEY=456
Azureazure://employees.sql?AZURE_ACCOUNT_KEY=123&AZURE_ACCOUNT_NAME=acme-co
Google Cloudgs://acme-co/employees.sql
HTTPhttp://localhost:8080/employees.sql
NFS/Localnodelocal:///emloyees.sql nodelocal://2/employees.sql

选项

选项描述
updatedN/A包含每行更新的时间戳。 如果提供一个cursor,”updated”时间戳将与发出行的 MVCC 时间戳匹配,并且没有初始化扫描。如果没有提供cursor,changefeed将会初始化扫描(changefeed创建的时候),初始化扫描发出的每个更改记录的”updated”时间戳将为初始化扫描的时间戳。相似的,当为schema更改回填时,将在新的schema有效时将”updated”时间戳设置为第一个时间戳。
resolvedINTERVAL定期将resolved的时间戳发送到changefeed。可选择设置发送resolved时间戳之间的最小持续时间。如果未指定,则发送所有 resolved时间戳。 例:resolved=’10s’
envelopekey_only/wrapped用key_only选项只发送键不发送值,可以更快地了解键的变化情况。 默认:envelope=wrapped
cursorTimestamp发送指定时间戳后的所有变化,但是不会先发送目前的表状态。如果cursor没有被指定,changefeed开始后先进行初始化扫描并发送当前的值,然后移动,发送扫描后的所有变化。 如果在一个特定的 cursor 后面开始一个changefeed,cursor不能在垃圾回收窗口(gc.ttlseconds)的设定之前;否则changefeed将会错误。如果使用默认的垃圾回收设置,则无法创建过去25小时以前的changefeed。 前面的changefeed全部结束了,可以用cursor开始一个新的changefeed。 例子: CURSOR=’1536242855577149065.0000000000’
formatjson/experimental_avro发送记录的格式。目前Avro格式的支持为实验性的。 查看下面的表格,对应格式到Avro。 默认:format=json
confluent_schema_registrySchema Registry address使用experimental_avro的时候需要Schema Registry地址。
key_in_valueN/A使sink中被删除的行主键可找回(大部分的消息中都有键和值)。这些sink自动使用key_in_value(目前只针对云存储sink)。
diffN/A使用diff,可以显示出before的状态,即操作之前的状态。 示例(json格式): CREATE CHANGEFEED FOR TABLE name   INTO ‘kafka://host:port’   WITH diff; Avro格式添加diff参数方法一致。
typeN/A使用type,可以显示出每次的操作的类型,包括delete,update和insert。 注意:在使用type时,必须要有diff参数,如果仅仅使用type,就会报错; 示例(json格式):CREATE CHANGEFEED FOR TABLE name   INTO ‘kafka://host:port’   WITH diff,type; Avro格式添加type参数方一致。

1、Avro限制

目前对 Avro 的支持是实验性的。下面是 Avro 不支持的 SQL 类型和值:

  • Decimal 必须要明确精度。

  • 带有 NaN 或者无限的 Decimal 类型的值在 Avro 中不能被重写。

注意:

为了避免 NaN 或者无限的值,添加一个CHECK的约束来防止这些值插入到decimal的列中。

  • TIME, DATE, INTERVAL, UUID, INET, ARRAY, JSONB, BIT和校对的STRING目前在Avro中还不支持。

2、Avro类型

下面是到Avro的类型转换:

类型Avro 类型Avro 逻辑类型
INTLONG
BOOLBOOLEAN
FLOATDOUBLE
STRINGSTRING
DATEINTDATE
TIMELONGTIME-MICROS
TIMESTAMPLONGTIME-MICROS
TIMESTAMPZLONGTIME-MICROS
DECIMALBYTESDECIMAL
UUIDSTRING
INETSTRING
JSONBSTRING

响应

信息(即键和值)发送到一个Kafka的topic被指定到一个envelope。默认的格式是 wrapped,输出信息由以下组成:

  • 键:一行数据由行的主键字段构成键(如,JSON 的[1]或 Avro 的{“id”:{“long”:1}})
  • 值:

​ 1、三种可能的字段之一:

​ 1)、after:包含行在更新后的状态(或DELETE后的null状态)

​ 2)、updated:包含更新后的时间戳

​ 3)、resolved:用于表示已完成的时间戳记录。这些记录不包含after值,因为本函数仅作为一个检查点。

​ 2、 INSERT和UPDATE,表示目前行状态为被插入的或被更新的。

​ 3、DELETE操作,null状态。

实例:

语句返回
INSERT INTO office_dogs VALUES (1, ‘Petee’);JSON: [1] {“after”: {“id”: 1, “name”: “Petee”}} Avro: {“id”:{“long”:1}} {“after”:{“office_dogs”:{“id”:{“long”:1},”name”:{“string”:”Petee”}}}}
DELETE FROM office_dogs WHERE name = ‘Petee’;JSON: [1] {“after”: null} Avro: {“id”:{“long”:1}} {“after”:null}

语法示例

  1. 创建一个连接到 Kafka 的 changefeed

> CREATE CHANGEFEED FOR TABLE name INTO ‘kafka://host:port’ WITH updated, resolved;

+——————————+

| job_id |

+——————————+

| 360645287206223873 |

+——————————+

(1 row) |

​ 创建一个连接到Kafka的changefeed,包含before功能

> CREATE CHANGEFEED FOR TABLE name INTO ‘kafka://host:port’ WITH updated, resolved,diff,type;

+——————————+

| job_id |

+——————————+

| 360645287206223873 |

+——————————+

(1 row) |

json格式显示,使用type时,必须有diff

2、使用 Avro 创建一个连接到 Kafka 的 changefeed

> CREATE CHANGEFEED FOR TABLE name INTO ‘kafka://host:port’ WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>;

+——————————+

| job_id |

+——————————+

| 360645287206223873 |

+——————————+

(1 row) |

使用 Avro 创建一个连接到 Kafka 的 changefeed,包含before功能

> CREATE CHANGEFEED FOR TABLE name INTO ‘kafka://host:port’ WITH format = experimental_avro, confluent_schema_registry = <schema_registry_address>,diff,type;

+——————————+

| job_id |

+——————————+

| 360645287206223873 |

使用type时,必须有diff

3、管理 changefeed

​ 使用如下的语句暂停,继续,取消一个 changefeed。

1)暂停一个 changefeed

> PAUSE JOB job_id;

更多信息查看 PAUSE JOB。

2)继续一个暂停的 changefeed

> RESUME JOB job_id;

更多信息查看 RESUME JOB。

3)取消一个 changefeed

> CANCEL JOB job_id;

更多信息查看 CANCEL JOB。

4)从另一个结束的地方开始一个新的 changefeed

找到结束的 changefeed 最后的时间戳:

> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;

job_id | job_type | … | high_water_timestamp | error | coordinator_id

+——————————+——————+ … +————————————————+———-+————————+

383870400694353921 | CHANGEFEED | … | 1537279405671006870.0000000000 | |

用 high_water_timestamp 开始一个新的 changefeed:

> CREATE CHANGEFEED FOR TABLE name INTO ‘kafka//host:port’ WITH cursor = ‘<high_water_timestamp>’;

注意:由于提供了cursor,不会进行初始化扫描。