背景

canal 1.1.1版本之后, 内置增加客户端数据同步功能, Client适配器整体介绍: [[ClientAdapter]]

canal adapter 的 Elastic Search 版本支持6.x.x以上, 如需其它版本的es可替换依赖重新编译client-adapter.elasticsearch模块

ElasticSearch适配器

1 修改启动器配置: application.yml

  1. canal.conf:
  2. canalServerHost: 127.0.0.1:11111
  3. batchSize: 500
  4. syncBatchSize: 1000
  5. retries: 0
  6. timeout:
  7. mode: tcp
  8. srcDataSources:
  9. defaultDS:
  10. url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
  11. username: root
  12. password: 121212
  13. canalAdapters:
  14. - instance: example
  15. groups:
  16. - groupId: g1
  17. outerAdapters:
  18. -
  19. key: exampleKey
  20. name: es6 # or es7
  21. hosts: 127.0.0.1:9300 # es 集群地址, 逗号分隔
  22. properties:
  23. mode: transport # or rest # 可指定transport模式或者rest模式
  24. # security.auth: test:123456 # only used for rest mode
  25. cluster.name: elasticsearch # es cluster name

adapter将会自动加载 conf/es 下的所有.yml结尾的配置文件

2 适配器表映射文件

修改 conf/es/mytest_user.yml文件:

  1. dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
  2. outerAdapterKey: exampleKey # 对应application.yml中es配置的key
  3. destination: example # cannal的instance或者MQ的topic
  4. groupId: # 对应MQ模式下的groupId, 只会同步对应groupId的数据
  5. esMapping:
  6. _index: mytest_user # es 的索引名称
  7. _type: _doc # es 的type名称, es7下无需配置此项
  8. _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  9. # pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
  10. # sql映射
  11. sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
  12. a.c_time as _c_time, c.labels as _labels from user a
  13. left join role b on b.id=a.role_id
  14. left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
  15. group by user_id) c on c.user_id=a.id"
  16. # objFields:
  17. # _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
  18. # _obj: object # json对象
  19. etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
  20. commitBatch: 3000 # 提交批大小

sql映射说明:

sql支持多表关联自由组合, 但是有一定的限制:

  1. 主表不能为子查询语句
  2. 只能使用left outer join即最左表一定要是主表
  3. 关联从表如果是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
  5. 关联条件只允许主外键的’=’操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.

2.1.单表映射索引示例sql:

  1. select a.id as _id, a.name, a.role_id, a.c_time from user a

该sql对应的es mapping示例:

  1. {
  2. "mytest_user": {
  3. "mappings": {
  4. "_doc": {
  5. "properties": {
  6. "name": {
  7. "type": "text"
  8. },
  9. "role_id": {
  10. "type": "long"
  11. },
  12. "c_time": {
  13. "type": "date"
  14. }
  15. }
  16. }
  17. }
  18. }
  19. }

2.2.单表映射索引示例sql带函数或运算操作:

  1. select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

函数字段后必须跟上别名, 该sql对应的es mapping示例:

  1. {
  2. "mytest_user": {
  3. "mappings": {
  4. "_doc": {
  5. "properties": {
  6. "name": {
  7. "type": "text"
  8. },
  9. "role_id": {
  10. "type": "long"
  11. },
  12. "c_time": {
  13. "type": "date"
  14. }
  15. }
  16. }
  17. }
  18. }
  19. }

2.3.多表映射(一对一, 多对一)索引示例sql:

  1. select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a
  2. left join role b on b.id = a.role_id

注:这里join操作只能是left outer join, 第一张表必须为主表!!

该sql对应的es mapping示例:

  1. {
  2. "mytest_user": {
  3. "mappings": {
  4. "_doc": {
  5. "properties": {
  6. "name": {
  7. "type": "text"
  8. },
  9. "role_id": {
  10. "type": "long"
  11. },
  12. "role_name": {
  13. "type": "text"
  14. },
  15. "c_time": {
  16. "type": "date"
  17. }
  18. }
  19. }
  20. }
  21. }
  22. }

2.4.多表映射(一对多)索引示例sql:

  1. select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
  2. left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
  3. group by user_id) c on c.user_id=a.id

注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!

该sql对应的es mapping示例:

  1. {
  2. "mytest_user": {
  3. "mappings": {
  4. "_doc": {
  5. "properties": {
  6. "name": {
  7. "type": "text"
  8. },
  9. "role_id": {
  10. "type": "long"
  11. },
  12. "c_time": {
  13. "type": "date"
  14. },
  15. "labels": {
  16. "type": "text"
  17. }
  18. }
  19. }
  20. }
  21. }
  22. }

2.5.其它类型的sql示例:

  • geo type
    1. select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
  • 复合主键
    1. select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
  • 数组字段
    1. select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a
    2. left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
    3. group by user_id) c on c.user_id=a.id
    配置中使用:
    1. objFields:
    2. labels: array:;
  • 对象字段
    1. select a.id as _id, a.name, a.role_id, c.labels, a.c_time, a.description from user a
    配置中使用:
    1. objFields:
    2. description: object
    其中a.description字段内容为json字符串
  • 父子文档索引

es/customer.yml

  1. ......
  2. esMapping:
  3. _index: customer
  4. _type: _doc
  5. _id: id
  6. relations:
  7. customer_order:
  8. name: customer
  9. sql: "select t.id, t.name, t.email from customer t"

es/order.yml

  1. esMapping:
  2. _index: customer
  3. _type: _doc
  4. _id: _id
  5. relations:
  6. customer_order:
  7. name: order
  8. parent: customer_id
  9. sql: "select concat('oid_', t.id) as _id,
  10. t.customer_id,
  11. t.id as order_id,
  12. t.serial_code as order_serial,
  13. t.c_time as order_time
  14. from biz_order t"
  15. skips:
  16. - customer_id

mapping示例:

  1. {
  2. "mappings":{
  3. "_doc":{
  4. "properties":{
  5. "id": {
  6. "type": "long"
  7. },
  8. "name": {
  9. "type": "text"
  10. },
  11. "email": {
  12. "type": "text"
  13. },
  14. "order_id": {
  15. "type": "long"
  16. },
  17. "order_serial": {
  18. "type": "text"
  19. },
  20. "order_time": {
  21. "type": "date"
  22. },
  23. "customer_order":{
  24. "type":"join",
  25. "relations":{
  26. "customer":"order"
  27. }
  28. }
  29. }
  30. }
  31. }
  32. }

3 启动ES数据同步

启动canal-adapter启动器

  1. bin/startup.sh

验证

  1. 新增mysql mytest.user表的数据, 将会自动同步到es的mytest_user索引下面, 并会打出DML的log
  2. 修改mysql mytest.role表的role_name, 将会自动同步es的mytest_user索引中的role_name数据
  3. 新增或者修改mysql mytest.label表的label, 将会自动同步es的mytest_user索引中的labels数据