clickhouse-copier

将数据从一个群集中的表复制到另一个(或相同)群集中的表。

您可以运行多个 clickhouse-copier 不同服务器上的实例执行相同的作业。 ZooKeeper用于同步进程。

开始后, clickhouse-copier:

  • 连接到ZooKeeper并且接收:

    • 复制作业。
    • 复制作业的状态。
  • 它执行的工作。

    1. 每个正在运行的进程都会选择源集群的“最接近”分片,然后将数据复制到目标集群,并在必要时重新分片数据。

clickhouse-copier 跟踪ZooKeeper中的更改,并实时应用它们。

为了减少网络流量,我们建议运行 clickhouse-copier 在源数据所在的同一服务器上。

运行Clickhouse-copier

该实用程序应手动运行:

  1. clickhouse-copier --daemon --config zookeeper.xml --task-path /task/path --base-dir /path/to/dir

参数:

  • daemon — 在守护进程模式下启动clickhouse-copier
  • configzookeeper.xml文件的路径,其中包含用于连接ZooKeeper的参数。
  • task-path — ZooKeeper节点的路径。 该节点用于同步clickhouse-copier进程和存储任务。 任务存储在$task-path/description中。
  • task-file — 可选的非必须参数, 指定一个包含任务配置的参数文件, 用于初始上传到ZooKeeper。
  • task-upload-force — 即使节点已经存在,也强制上载task-file
  • base-dir — 日志和辅助文件的路径。 启动时,clickhouse-copier$base-dir中创建clickhouse-copier_YYYYMMHHSS_<PID>子目录。 如果省略此参数,则会在启动clickhouse-copier的目录中创建目录。

Zookeeper.xml格式

  1. <yandex>
  2. <logger>
  3. <level>trace</level>
  4. <size>100M</size>
  5. <count>3</count>
  6. </logger>
  7. <zookeeper>
  8. <node index="1">
  9. <host>127.0.0.1</host>
  10. <port>2181</port>
  11. </node>
  12. </zookeeper>
  13. </yandex>

复制任务的配置

  1. <yandex>
  2. <!-- Configuration of clusters as in an ordinary server config -->
  3. <remote_servers>
  4. <source_cluster>
  5. <shard>
  6. <internal_replication>false</internal_replication>
  7. <replica>
  8. <host>127.0.0.1</host>
  9. <port>9000</port>
  10. </replica>
  11. </shard>
  12. ...
  13. </source_cluster>
  14. <destination_cluster>
  15. ...
  16. </destination_cluster>
  17. </remote_servers>
  18. <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
  19. <max_workers>2</max_workers>
  20. <!-- Setting used to fetch (pull) data from source cluster tables -->
  21. <settings_pull>
  22. <readonly>1</readonly>
  23. </settings_pull>
  24. <!-- Setting used to insert (push) data to destination cluster tables -->
  25. <settings_push>
  26. <readonly>0</readonly>
  27. </settings_push>
  28. <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
  29. They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
  30. <settings>
  31. <connect_timeout>3</connect_timeout>
  32. <!-- Sync insert is set forcibly, leave it here just in case. -->
  33. <insert_distributed_sync>1</insert_distributed_sync>
  34. </settings>
  35. <!-- Copying tasks description.
  36. You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
  37. sequentially.
  38. -->
  39. <tables>
  40. <!-- A table task, copies one table. -->
  41. <table_hits>
  42. <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
  43. <cluster_pull>source_cluster</cluster_pull>
  44. <database_pull>test</database_pull>
  45. <table_pull>hits</table_pull>
  46. <!-- Destination cluster name and tables in which the data should be inserted -->
  47. <cluster_push>destination_cluster</cluster_push>
  48. <database_push>test</database_push>
  49. <table_push>hits2</table_push>
  50. <!-- Engine of destination tables.
  51. If destination tables have not be created, workers create them using columns definition from source tables and engine
  52. definition from here.
  53. NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
  54. be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
  55. specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
  56. system.parts table.
  57. -->
  58. <engine>
  59. ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
  60. PARTITION BY toMonday(date)
  61. ORDER BY (CounterID, EventDate)
  62. </engine>
  63. <!-- Sharding key used to insert data to destination cluster -->
  64. <sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
  65. <!-- Optional expression that filter data while pull them from source servers -->
  66. <where_condition>CounterID != 0</where_condition>
  67. <!-- This section specifies partitions that should be copied, other partition will be ignored.
  68. Partition names should have the same format as
  69. partition column of system.parts table (i.e. a quoted text).
  70. Since partition key of source and destination cluster could be different,
  71. these partition names specify destination partitions.
  72. NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
  73. it is strictly recommended to specify them explicitly.
  74. If you already have some ready partitions on destination cluster they
  75. will be removed at the start of the copying since they will be interpeted
  76. as unfinished data from the previous copying!!!
  77. -->
  78. <enabled_partitions>
  79. <partition>'2018-02-26'</partition>
  80. <partition>'2018-03-05'</partition>
  81. ...
  82. </enabled_partitions>
  83. </table_hits>
  84. <!-- Next table to copy. It is not copied until previous table is copying. -->
  85. </table_visits>
  86. ...
  87. </table_visits>
  88. ...
  89. </tables>
  90. </yandex>

clickhouse-copier 跟踪更改 /task/path/description 并在飞行中应用它们。 例如,如果你改变的值 max_workers,运行任务的进程数也会发生变化。

原始文章