示例:逻辑复制代码示例

下面示例演示如何通过JDBC接口使用逻辑复制功能的过程。

针对逻辑复制的配置选项,除了在逻辑解码中的配置选项外,还有专门给JDBC等流式解码工具增加的配置项,如下所示:

  1. 解码线程并行度

    通过配置选项parallel-decode-num,指定并行解码的Decoder线程数量。其取值范围为1~20的int型,取1表示按照原有的串行逻辑进行解码,取其余值即为开启并行解码。默认值为1。当该选项配置为1时,禁止配置以下选项:解码格式选项decode-style、批量发送选项sending-batch和并行解码队列长度选项parallel-queue-size。

  2. 解码格式

    通过配置选项decode-style,指定解码格式。其取值为char型的字符’j’、’t’或’b’,分别代表json格式、text格式及二进制格式。默认值为’b’即二进制格式解码。该选项仅允许并行解码时设置,且二进制格式解码仅在并行解码场景下支持。与二进制格式对应的json和text格式,在批量发送的解码结果中,每条解码语句的前4字节组成的uint32代表该条语句总字节数(不包含该uint32类型占用的4字节,0代表本批次解码结束),8字节uint64代表相应lsn(begin对应first_lsn,commit对应end_lsn,其他场景对应该条语句的lsn)。

示例:逻辑复制代码示例 - 图1 说明:

二进制格式编码规则如下所示:

  1. 前4字节代表接下来到语句级别分隔符字母P(不含)或者该批次结束符F(不含)的解码结果的总字节数,该值如果为0代表本批次解码结束。
  2. 接下来8字节uint64代表相应lsn(begin对应first_lsn,commit对应end_lsn,其他场景对应该条语句的lsn)。
  3. 接下来1字节的字母有5种B/C/I/U/D,分别代表begin/commit/insert/update/delete。
  4. 3.接下来1字节的字母有5种B/C/I/U/D,…步字母为B时:
    1. 接下来的8字节uint64代表CSN。
    2. 接下来的8字节uint64代表first_lsn。
    3. 【该部分为可选项】接下来的1字节字母如果为T,则代表后面4字节uint32表示该事务commit时间戳长度,再后面等同于该长度的字符为时间戳字符串。
    4. 因为之后仍可能有解码语句,接下来会有1字节字母P或F作为语句间的分隔符,P代表本批次仍有解码的语句,F代表本批次完成。
  5. 3.接下来1字节的字母有5种B/C/I/U/D,…步字母为C时:
    1. 【该部分为可选项】接下来1字节字母如果为X,则代表后面的8字节uint64表示xid。
    2. 【该部分为可选项】接下来的1字节字母如果为T,则代表后面4字节uint32表示时间戳长度,再后面等同于该长度的字符为时间戳字符串。
    3. 因为批量发送日志时,一个COMMIT日志解码之后可能仍有其他事务的解码结果,接下来的1字节字母如果为P则表示该批次仍需解码,如果为F则表示该批次解码结束。
  6. 3.接下来1字节的字母有5种B/C/I/U/D,…步字母为I/U/D时:
    1. 接下来的2字节uint16代表schema名的长度。
    2. 按照上述长度读取schema名。
    3. 接下来的2字节uint16代表table名的长度。
    4. 按照上述长度读取table名。
    5. 【该部分为可选项】接下来1字符字母如果为N代表为新元组,如果为O代表为旧元组,这里先发送新元组。
      1. 接下来的2字节uint16代表该元组需要解码的列数,记为attrnum。
      2. 以下流程重复attrnum次。
        1. 接下来2字节uint16代表列名的长度。
        2. 按照上述长度读取列名。
        3. 接下来4字节uint32代表当前列类型的Oid。
        4. 接下来4字节uint32代表当前列的值(以字符串格式存储)的长度,如果为0xFFFFFFFF则表示NULL,如果为0则表示长度为0的字符串。
        5. 按照上述长度读取列值。
    6. 因为之后仍可能有解码语句,接下来的1字节字母如果为P则表示该批次仍需解码,如果为F则表示该批次解码结束。
  1. 限制仅备机解码

    通过配置选项standby-connection,指定是否限制仅备机解码。其取值为bool型(可用0或1表示),取true(或1)代表限制仅允许连接备机解码,连接主机解码时会报错退出;取false(或0)时不做限制。默认值为false(0)。

  2. 批量发送

    通过配置选项sending-batch,指定是否批量发送。其取值范围为0或1的int型,取0表示逐条发送解码结果,取1表示解码结果累积到达1MB则批量发送解码结果。默认值为0。该选项仅允许并行解码时设置。开启批量发送的场景中,当解码格式为’j’或’t’时,在原来的每条解码语句之前会附加一个uint32类型,表示本条解码结果长度(长度不包含当前的uint32类型),以及一个uint64类型,表示当前解码结果对应的lsn。

  3. 并行解码队列长度

    通过配置选项parallel-queue-size,指定并行逻辑解码线程间进行交互的队列长度。取值范围【2,1024】,且必须为2的幂数,默认值为128。队列长度和解码过程的内存使用量正相关。

  4. 逻辑解码内存阈值

    通过配置选项max-txn-in-memory指定单个事务解码中间结果缓存的内存阈值;单位为MB,范围【0,100】,默认值为0,表示不管控内存使用。通过配置选项max-reorderbuffer-in-memory指定所有事务解码中间结果缓存的内存阈值;单位为GB,范围【0,100】,默认值为0,表示不管控内存使用。当超过内存阈值时,解码过程将出现解码中间结果写临时文件的现象,影响逻辑解码的性能。

  5. 逻辑解码发送超时阈值

    通过配置选项sender-timeout指定内核与客户端的心跳超时阈值。当该时间段内没有收到客户端任何消息,逻辑解码将主动停止,并断开和客户端的连接。单位为毫秒(ms),范围【0, 2147483647】,默认值取决于GUC参数logical_sender_timeout配置。

  6. JDBC默认设置逻辑解码连接的socketTimeout=10s,备机解码在主机压力大的时候可能会导致连接超时关闭,可以通过配置withStatusInterval(10000,TimeUnit.MILLISECONDS),调整超时时间解决。

在并行解码的标准场景下(16核CPU、内存128G、网络带宽 > 200MBps、表的列数为10~100、单行数据量0.1KB~1KB、DML操作以insert为主、不涉及落盘事务即单个事务中语句数量小于4096、parallel-decode-num为8、解码格式为’b’且开启批量发送功能),解码性能(这里以xlog消耗量为标准)不低于100MBps。为保证解码性能达标以及尽量降低对业务的影响,一台备机上应尽量仅建立一个并行解码连接,保证CPU、内存、带宽资源充足。

注意:逻辑复制类PGReplicationStream为非线程安全类,并发调用可能导致数据异常。

  1. //逻辑复制功能示例:文件名,LogicalReplicationDemo.java
  2. //前提条件:添加JDBC用户机器IP到数据库白名单里,在pg_hba.conf添加以下内容即可:
  3. //假设JDBC用户IP为10.10.10.10
  4. //host all all 10.10.10.10/32 sha256
  5. //host replication all 10.10.10.10/32 sha256
  6. import org.postgresql.PGProperty;
  7. import org.postgresql.jdbc.PgConnection;
  8. import org.postgresql.replication.LogSequenceNumber;
  9. import org.postgresql.replication.PGReplicationStream;
  10. import java.nio.ByteBuffer;
  11. import java.sql.DriverManager;
  12. import java.util.Properties;
  13. import java.util.concurrent.TimeUnit;
  14. public class LogicalReplicationDemo {
  15. private static PgConnection conn = null;
  16. public static void main(String[] args) {
  17. String driver = "org.postgresql.Driver";
  18. //此处配置数据库IP以及端口,这里的端口为haPort,通常默认是所连接DN的port+1端口
  19. String sourceURL = "jdbc:postgresql://$ip:$port/postgres";
  20. //默认逻辑复制槽的名称是:replication_slot
  21. //测试模式:创建逻辑复制槽
  22. int TEST_MODE_CREATE_SLOT = 1;
  23. //测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在)
  24. int TEST_MODE_START_REPL = 2;
  25. //测试模式:删除逻辑复制槽
  26. int TEST_MODE_DROP_SLOT = 3;
  27. //开启不同的测试模式
  28. int testMode = TEST_MODE_START_REPL;
  29. try {
  30. Class.forName(driver);
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. return;
  34. }
  35. try {
  36. Properties properties = new Properties();
  37. PGProperty.USER.set(properties, "user");
  38. PGProperty.PASSWORD.set(properties, "passwd");
  39. //对于逻辑复制,以下三个属性是必须配置项
  40. PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
  41. PGProperty.REPLICATION.set(properties, "database");
  42. PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
  43. conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
  44. System.out.println("connection success!");
  45. if(testMode == TEST_MODE_CREATE_SLOT){
  46. conn.getReplicationAPI()
  47. .createReplicationSlot()
  48. .logical()
  49. .withSlotName("replication_slot") //这里字符串如包含大写字母则会自动转化为小写字母
  50. .withOutputPlugin("mppdb_decoding")
  51. .make();
  52. }else if(testMode == TEST_MODE_START_REPL) {
  53. //开启此模式前需要创建复制槽
  54. LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
  55. PGReplicationStream stream = conn
  56. .getReplicationAPI()
  57. .replicationStream()
  58. .logical()
  59. .withSlotName("replication_slot")
  60. .withSlotOption("include-xids", false)
  61. .withSlotOption("skip-empty-xacts", true)
  62. .withStartPosition(waitLSN)
  63. .withSlotOption("parallel-decode-num", 10) //解码线程并行度
  64. .withSlotOption("white-table-list", "public.t1,public.t2") //白名单列表
  65. .withSlotOption("standby-connection", true) //强制备机解码
  66. .withSlotOption("decode-style", "t") //解码格式
  67. .withSlotOption("sending-batch", 1) //批量发送解码结果
  68. .withSlotOption("max-txn-in-memory", 100) //单个解码事务落盘内存阈值为100MB
  69. .withSlotOption("max-reorderbuffer-in-memory", 50) //正在处理的解码事务落盘内存阈值为50GB
  70. .start();
  71. while (true) {
  72. ByteBuffer byteBuffer = stream.readPending();
  73. if (byteBuffer == null) {
  74. TimeUnit.MILLISECONDS.sleep(10L);
  75. continue;
  76. }
  77. int offset = byteBuffer.arrayOffset();
  78. byte[] source = byteBuffer.array();
  79. int length = source.length - offset;
  80. System.out.println(new String(source, offset, length));
  81. //如果需要flush lsn,根据业务实际情况调用以下接口
  82. //LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
  83. //stream.setFlushedLSN(lastRecv);
  84. //stream.forceUpdateStatus();
  85. }
  86. }else if(testMode == TEST_MODE_DROP_SLOT){
  87. conn.getReplicationAPI()
  88. .dropReplicationSlot("replication_slot");
  89. }
  90. } catch (Exception e) {
  91. e.printStackTrace();
  92. return;
  93. }
  94. }
  95. }