DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,支持包括 MySQL、SQL Server、Oracle、PostgreSQL、HDFS、Hive、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。DataX 采用了框架 + 插件 的模式。目前已开源,代码托管在 GitHub。

    DataX 数据同步效率较高,满足大多数场景下的异构数据库间的数据同步需求。同时其配置灵活,支持字段级别的配置,可以轻松应对因异构数据库迁移到 TiDB 而产生的一些改动。

    方案设计如图所示

    5.4 SqlServer 到 TiDB(DATAX) - 图1

    第一阶段:切换支持 TiDB 的应用上线之前,把 SQL Server 数据库中的全量数据用 DataX 同步到 TiDB 库中。为避免对线上业务产生影响,可以选择备份库,或者在业务低峰期操作。

    第二阶段:把全量同步改为增量同步,利用 UpdateTime 字段(或其它条件,根据实际业务灵活调整)作为同步 Where 条件,进行增量覆盖式同步。 t_sync_record 表中记录每张表上次增量任务的执行时间。

    第三阶段:支持 TiDB 的应用上线以后,增量同步切换读写数据源改为逆向增量同步,将新数据近实时地写回 SQL Server 数据库。一旦上线之后出现需要回退的情况,可随时切回 SQL Server,待修复之后再次上线。

    具体操作步骤:

    第一步:部署 DataX

    下载

    1. wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz

    解压

    1. tar -zxvf datax.tar.gz

    自检

    1. python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json

    第二步:编写 DataX 数据同步 Job(Json格式)

    (1)全量同步Job

    vi full.json

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. #数据分片,分片数据可同时进行同步
    6. "channel": 128
    7. }
    8. },
    9. "content": [{
    10. #SQL Server配置
    11. "reader": {
    12. "name": "sqlserverreader",
    13. "parameter": {
    14. #数据库用户名
    15. "username": "${srcUserName}",
    16. #数据库密码
    17. "password": "${srcPassword}",
    18. #需要迁移的列名,* 表示全部列
    19. "column": ["*"]
    20. "connection": [{
    21. #需要迁移的表名
    22. "table": ["${tableName}"],
    23. 数据库 jdbc 连接
    24. "jdbcUrl": ["${srcUrl}"]
    25. }]
    26. }
    27. },
    28. #TiDB配置
    29. "writer": {
    30. "name": "tidbwriter",
    31. "parameter": {
    32. #数据库用户名
    33. "username": "${desUserName}",
    34. #数据库密码
    35. "password": "${desPassword}",
    36. #使用 Replace 语法
    37. "writeMode": "replace",
    38. #目标表列名,* 表示全部列
    39. "column": ["*"],
    40. "connection": [{
    41. #数据库 jdbc 连接
    42. "jdbcUrl": "${desUrl}",
    43. #目标表名
    44. "table": ["${tableName}"]
    45. }],
    46. #本次迁移开始前执行的sql-作数据迁移日志使用
    47. "preSql": [
    48. "replace into t_sync_record(table_name,start_date,end_date) values('@table',now(),null)"],
    49. #本次迁移完成后执行的 sql- 作数据迁移日志使用
    50. "postSql": [
    51. "update t_sync_record set end_date=now() where table_name='@table' " ]
    52. }
    53. }
    54. }]
    55. }
    56. }

    (2)增量同步 Job

    vi increase.json

    1. {
    2. "job": {
    3. "setting": {
    4. "speed": {
    5. #数据分片,分片数据可同时进行同步
    6. "channel": 128
    7. }
    8. },
    9. "content": [{
    10. #SQL Server配置
    11. "reader": {
    12. "name": "sqlserverreader",
    13. "parameter": {
    14. #数据库用户名
    15. "username": "${srcUserName}",
    16. #数据库密码
    17. "password": "${srcPassword}",
    18. #需要迁移的列名,* 表示全部列
    19. "column": ["*"],
    20. "connection": [{
    21. #需要迁移的表名
    22. "table": ["${tableName}"],
    23. 数据库 jdbc 连接
    24. "jdbcUrl": ["${srcUrl}"]
    25. }],
    26. #抓取一个时间窗口的增量数据
    27. "where": "updateTime >= '${syncTime}'"
    28. }
    29. },
    30. #TiDB配置
    31. "writer": {
    32. "name": "tidbwriter",
    33. "parameter": {
    34. #数据库用户名
    35. "username": "${desUserName}",
    36. #数据库密码
    37. "password": "${desPassword}",
    38. #使用 Replace 语法
    39. "writeMode": "replace",
    40. #目标表列名,* 表示全部列
    41. "column": ["*"],
    42. "connection": [{
    43. #数据库 jdbc 连接
    44. "jdbcUrl": "${desUrl}",
    45. #目标表名
    46. "table": ["${tableName}"]
    47. }],
    48. #本次迁移开始前执行的sql-作数据迁移日志使用
    49. "preSql": [
    50. "replace into t_sync_record(table_name,start_date,end_date) values('@table',now(),null)"],
    51. #本次迁移完成后执行的 sql- 作数据迁移日志使用
    52. "postSql": [
    53. "update t_sync_record set end_date=now() where table_name='@table' " ]
    54. }
    55. }
    56. }]
    57. }
    58. }

    (3)编写运行DataX Job的Shell执行脚本

    vi datax_excute_job.sh

    1. #!/bin/bash
    2. source /etc/profile
    3. srcUrl="Reader Sql Server 地址"
    4. srcUserName="Sql Server 账号"
    5. srcPassword="Sql Server 密码"
    6. desUrl="Writer TiDB 地址"
    7. desUserName="TiDB 账号"
    8. desPassword="TiDB 密码"
    9. # 同步开始时间
    10. defaultsyncUpdateTime="2020-03-03 18:00:00.000"
    11. # 同步周期(秒)
    12. sleepTime="N"
    13. tableName="Table1,Table2,..."
    14. # 循环次数标识,-1为一直循环,其他按输入次数循环
    15. flg=-1
    16. while [ "$flg" -gt 0 -o "$flg" -eq -1 ]
    17. do
    18. #更新时间设置为上次循序执行的时间
    19. if [ "" = "$preRunTime" ]; then
    20. syncTime=$defaultsyncUpdateTime
    21. else
    22. syncTime=$preRunTime
    23. fi
    24. #记录下本次循环执行时间,供下次循环使用
    25. preRunTime=$(date -d +"%Y-%m-%d %T")".000";
    26. echo $syncTime
    27. echo $preRunTime
    28. echo $flg
    29. python {YOUR_DATAX_HOME}/bin/datax.py -p "-DsyncTime='${syncTime}' -DtableName='${tableName}' -DsrcUrl='${srcUrl}' -DsrcUserName='${srcUserName}' -DsrcPassword='${srcPassword}' -DdesUrl='${desUrl}' -DdesUserName='${desUserName}' -DdesPassword='${desPassword}'" {YOUR_DATAX_HOME}/job/increase.json
    30. if [ -1 -lt "$flg" ]; then
    31. let "flg-=1"
    32. fi
    33. sleep $sleepTime
    34. done

    (4)执行 Shell 脚本

    1. chmod +x datax_excute_job.sh
    2. nohup ./datax_excute_job.sh > info.file 2>&1 &

    至此,核心脚本和操作都已完成,可通过修改参数配置,达到自己不同的需求,还可以配合数据对比服务,以期达到将应用程序从 SQL Server 顺利迁移到 TiDB 的目的。