背景

因为alibaba的特殊业务,比如:

  • 同步数据同时,需要同步数据关联的文件 (需要做数据join)
  • 同步会员数据,敏感字段信息不能同步到美国站点. (需要做数据过滤)
  • 两地机房的数据库可能为异构数据库,(需要做字段类型,名字等转化.)
    为解决这些业务,otter引入了映射规则这一概念,用于描述这样的一种同步业务的关系,其粒度可以精确到一张表,或者是一整个库.

映射规则

表映射

otter中每个pipeline可以设置多个映射规则,每个映射规则描述一个数据库同步的内容,比如源表是哪张,同步到哪张目标表。

映射规则配置 - 图1

权重的概念

可以先看下:Otter数据入库算法 , 因为otter采用了pk hash的并行载入算法,会将原先binlog中的事务进行打散做并行处理提升同步性能。原先事务被拆散后,通过这个权重概念,来提供“业务上类事务功能".

举个实际点的例子来看:

  • 比如有两张表,product(记录产品的属性信息)和product_detail(记录产品的详情信息),product_detail上有个字段为product_id与product进行关联. 目前为1:1的关系
  • 业务上插入数据可以通过事务,先插入product,然后插入product_detail,最后一起提交到数据库. 正常,页面上通过查询用户的product表的数据,发现有产品记录,然后通过product_id去查询product_detail表,查到后才显示产品页面
  • 假如同步到目标库后,打散事务后,同步过程如果先插入了product表,后插入product_detail表,这时如果有用户访问该产品记录,可能就会遇到product_detail不存在的情况,从而导致页面出错.
  • 所以,我们通过权重定义,比如将product_detail权重定义为5,将product定义为10。 otter会优先同步权重低的表数据,最终可以保证查询product有数据后,product_detail一定有数据,避免业务出错.

视图映射

如何进入视图编辑:

点击下一步后,进入视图编辑页面:

映射规则配置 - 图2

说明:

  • 映射规则配置页面,可以选择视图模式为:include或exclude,代表正向匹配或者逆向排除.
  • 视图配置页面,只支持存在的数据表(因为要获取数据表结构,所以.*等正则的数据表不支持配置该功能)
  • 视图配置列表,左右选中列表会按照顺序进行对应,做映射时需按照顺序进行选择.
    举个例子:

    如果要排除表字段A的同步,则只需要选择为exclude模式,然后视图编辑页面选择左右皆选择A字段即可,点击保存.

字段组映射

首先解释一下,需要字段组同步的需求.

  • 文件同步. 一条记录对应的图片,可能会有一个或者多个字段,比如会有image_path,image_version来决定图片,所以我们可以定义这两个字段为一组,只要满足组内任意一个字段的变更,就会认为需要文件同步.
  • 数据上的组同步,比如国家,省份,城市,可能在数据库为三个字段. 如果是双A同步,两地同时修改这些字段,但业务上可能在A地修改了国家为美国,在B地修改为省份为浙江,然后一同步,最终就会变成美国,浙江这样的情况. 这种情况可以通过group来解决,将国家,省份,城市做一个group,组内任何一个字段发生了变更,其余字段会做为整体一起变更.
    再来看一下配置:(点击视图编辑页面的下一步,即可进入)

映射规则配置 - 图3 说明:

  • 也可不配置视图,单独配置字段组,此时可选择的字段即为当前所有字段(映射规则按照同名映射).

高级映射

主要分为两类:

  • 文件同步
  • 自定义数据同步
    具体代码扩展方式和配置可参见: Otter扩展性

    配置方式:

映射规则配置 - 图4

文件同步

首先解释一下文件同步的需求,阿里巴巴国际站业务,主要模式为对外贸易,卖家基本在国内,买家在国外. 所以,目前我们的机房部署为杭州和美国各一个,卖家访问杭州机房,买家访问美国机房。卖家在国内发布产品和图片,通过otter同步到美国,同步产品数据记录的同时,同样需要把图片同步过去,保证买家用户的访问体验. 所以,基于这个业务场景,衍生出了文件同步的需求.

所以,做文件同步的几个前提:

  • 异地同步 (需要部署为两个node,S/E和T/L分为两地. )
  • 数据触发文件同步 (数据库记录做为类似文件索引信息,不支持单独同步文件)
  • 本地文件同步 (需要同步的文件需要和node部署在一台机器上或者通过nfs mount,如果要同步 公司自带的分布式文件系统的数据,otter需要做额外扩展)
    文件同步准备工作:

  • 准备两台机器,分别部署上两个node

  • 配置channel/pipeline同步,配置映射规则
  • 编写FileResolver解析类,根据binlog中的变更数据,转化为文件的路径信息. 例子:TestFileResolver
  1. public class TestFileResolver extends AbstractFileResolver {
  2. public FileInfo[] getFileInfo(Map<String, String> rowMap) {
  3.     // 基本步骤:
  4.     // 1. 获取binlog中的变更字段,比如组成文件有多个字段组成version+path
  5.     // 2. 基于字段内容,构造一个文件路径,目前开源版本只支持本地文件的同步.(如果是网络文件,建议进行NFS mount到ndde机器).
  6.     // 3. 返回FileInfo数组,(目前不支持目录同步,如果是目录需要展开为多个FileInfo的子文件),如果不需要同步,则返回null.
  7.     String path = rowMap.get("FIELD"); //注意为大写
  8.     FileInfo fileInfo = null;
  9.     if (StringUtils.isNotEmpty(path)) {
  10.         fileInfo = new FileInfo(path);
  11.         return new FileInfo[] { fileInfo };
  12.     } else {
  13.         return null;
  14.     }
  15. }
  16.  
  17. }

自定义数据同步

通过前面的字段视图映射,或许可以解决80%的需求,但总会有一小撮的特殊用户,希望可以自定义自己的同步数据内容,所以otter引入了自定义数据同步为EventProcessor,允许你任意改变整个同步过程中的数据内容.

可以支持的需求:

  • 根据字段内容,判断是否需要屏蔽本记录同步
  • 动态增加/减少字段
  • 动态修改字段内容
  • 动态改变事件类型(Insert/Update/Delete)
    几点注意:

  • EventProcessor主要是在E模块进行数据处理,也就是EventProcessor处理后的数据,会再次经过视图配置,字段组映射,文件同步,最后进入Transform处理.

  • EventProcessor修改数据中的schema/table name需要谨慎,因为会继续后续的E/T/L流程,所以需要保证修改后的name在映射规则列表里有配置,否则同步会出错.
    一个例子:(比如我想将源库的每条binlog变更,记录到一个日志表binlog,映射规则配置为.*所有表的同步)

代码:TestEventProcessor

  1. public class TestEventProcessor extends AbstractEventProcessor {
  2. public boolean process(EventData eventData) {
  3.     // 基本步骤:
  4.     // 1. 获取binlog中的变更字段
  5.     // 2. 根据业务逻辑进行判断,如果需要忽略本条数据同步,直接返回false,否则返回true
  6.     // 3. 根据业务逻辑进行逻辑转化,比如可以修改整个EventData数据.  
  7.     // 本文例子:源库的每条binlog变更,记录到一个日志表binlog
  8.     // create table test.binlog(
  9.     //        id bigint(20) auto_increment,
  10.     //        oschema varchar(256),
  11.     //        otable varchar(256),
  12.     //        gtime varchar(32)
  13.     //        ovalue text,
  14.     //        primary key(id);
  15.     //    )
  16.     // 在process处理中,可以修改EventData的任何数据,达到数据转换的效果, just have fun.
  17.     JSONObject col = new JSONObject();
  18.     JSONArray array = new JSONArray();
  19.     for (EventColumn column : eventData.getColumns()) {
  20.         JSONObject obj = this.doColumn(column);
  21.         array.add(obj);
  22.     }
  23.     for (EventColumn key : eventData.getKeys()) {
  24.         JSONObject obj = this.doColumn(key);
  25.         array.add(obj);
  26.     }
  27.     // 记录原始的表信息
  28.     col.put("schema", eventData.getSchemaName());
  29.     col.put("table", eventData.getTableName());
  30.     col.put("columns", array);
  31.     col.put("dml", eventData.getEventType());
  32.     col.put("exectime", eventData.getExecuteTime());
  33.     // 构造新的主键
  34.     EventColumn id = new EventColumn();
  35.     id.setColumnValue(eventData.getSchemaName());
  36.     id.setColumnType(Types.BIGINT);
  37.     id.setColumnName("id");
  38.     // 构造新的字段
  39.     EventColumn schema = new EventColumn();
  40.     schema.setColumnValue(eventData.getSchemaName());
  41.     schema.setColumnType(Types.VARCHAR);
  42.     schema.setColumnName("oschema");
  43.     EventColumn table = new EventColumn();
  44.     table.setColumnValue(eventData.getTableName());
  45.     table.setColumnType(Types.VARCHAR);
  46.     table.setColumnName("otable");
  47.     EventColumn ovalue = new EventColumn();
  48.     ovalue.setColumnValue(col.toJSONString());
  49.     ovalue.setColumnType(Types.VARCHAR);
  50.     ovalue.setColumnName("ovalue");
  51.     EventColumn gtime = new EventColumn();
  52.     gtime.setColumnValue(eventData.getExecuteTime() + "");
  53.     gtime.setColumnType(Types.VARCHAR);
  54.     gtime.setColumnName("gtime");
  55.     // 替换为新的字段和主键信息
  56.     List<EventColumn> cols = new ArrayList<EventColumn>();
  57.     cols.add(schema);
  58.     cols.add(table);
  59.     cols.add(gtime);
  60.     cols.add(ovalue);
  61.     eventData.setColumns(cols);
  62.     List<EventColumn> keys = new ArrayList<EventColumn>();
  63.     keys.add(id);
  64.     eventData.setKeys(keys);
  65.     //修改数据meta信息
  66.     eventData.setEventType(EventType.INSERT);
  67.     eventData.setSchemaName("test");
  68.     eventData.setTableName("binlog");
  69.     return true;
  70. }
  71. private JSONObject doColumn(EventColumn column) {
  72.     JSONObject obj = new JSONObject();
  73.     obj.put("name", column.getColumnName());
  74.     obj.put("update", column.isUpdate());
  75.     obj.put("key", column.isKey());
  76.     if (column.getColumnType() != Types.BLOB && column.getColumnType() != Types.CLOB) {
  77.         obj.put("value", column.getColumnValue());
  78.     } else {
  79.         obj.put("value", "");
  80.     }
  81.     return obj;
  82. }
  83.  
  84. }

原文: https://github.com/alibaba/otter/wiki/%E6%98%A0%E5%B0%84%E8%A7%84%E5%88%99%E9%85%8D%E7%BD%AE