示例:逻辑复制代码示例

下面示例演示如何通过JDBC接口使用逻辑复制功能的过程。

  1. //逻辑复制功能示例:文件名,LogicalReplicationDemo.java
  2. //前提条件:添加JDBC用户机器IP到数据库白名单里,在pg_hba.conf添加以下内容即可:
  3. //假设JDBC用户IP为10.10.10.10
  4. //host all all 10.10.10.10/32 sha256
  5. //host replication all 10.10.10.10/32 sha256
  6. import org.postgresql.PGProperty;
  7. import org.postgresql.jdbc.PgConnection;
  8. import org.postgresql.replication.LogSequenceNumber;
  9. import org.postgresql.replication.PGReplicationStream;
  10. import java.nio.ByteBuffer;
  11. import java.sql.DriverManager;
  12. import java.util.Properties;
  13. import java.util.concurrent.TimeUnit;
  14. public class LogicalReplicationDemo {
  15. public static void main(String[] args) {
  16. String driver = "org.postgresql.Driver";
  17. //此处配置数据库IP以及端口,
  18. String sourceURL = "jdbc:postgresql://$ip:$port/postgres";
  19. PgConnection conn = null;
  20. //默认逻辑复制槽的名称是:replication_slot
  21. //测试模式:创建逻辑复制槽
  22. int TEST_MODE_CREATE_SLOT = 1;
  23. //测试模式:开启逻辑复制(前提条件是逻辑复制槽已经存在)
  24. int TEST_MODE_START_REPL = 2;
  25. //测试模式:删除逻辑复制槽
  26. int TEST_MODE_DROP_SLOT = 3;
  27. //开启不同的测试模式
  28. int testMode = TEST_MODE_START_REPL;
  29. try {
  30. Class.forName(driver);
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. return;
  34. }
  35. try {
  36. Properties properties = new Properties();
  37. PGProperty.USER.set(properties, "user");
  38. PGProperty.PASSWORD.set(properties, "passwd");
  39. //对于逻辑复制,以下三个属性是必须配置项
  40. PGProperty.ASSUME_MIN_SERVER_VERSION.set(properties, "9.4");
  41. PGProperty.REPLICATION.set(properties, "database");
  42. PGProperty.PREFER_QUERY_MODE.set(properties, "simple");
  43. conn = (PgConnection) DriverManager.getConnection(sourceURL, properties);
  44. System.out.println("connection success!");
  45. if(testMode == TEST_MODE_CREATE_SLOT){
  46. conn.getReplicationAPI()
  47. .createReplicationSlot()
  48. .logical()
  49. .withSlotName("replication_slot")
  50. .withOutputPlugin("test_decoding")
  51. .make();
  52. }else if(testMode == TEST_MODE_START_REPL) {
  53. //开启此模式前需要创建复制槽
  54. LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");
  55. PGReplicationStream stream = conn
  56. .getReplicationAPI()
  57. .replicationStream()
  58. .logical()
  59. .withSlotName("replication_slot")
  60. .withSlotOption("include-xids", false)
  61. .withSlotOption("skip-empty-xacts", true)
  62. .withStartPosition(waitLSN)
  63. .start();
  64. while (true) {
  65. ByteBuffer byteBuffer = stream.readPending();
  66. if (byteBuffer == null) {
  67. TimeUnit.MILLISECONDS.sleep(10L);
  68. continue;
  69. }
  70. int offset = byteBuffer.arrayOffset();
  71. byte[] source = byteBuffer.array();
  72. int length = source.length - offset;
  73. System.out.println(new String(source, offset, length));
  74. //如果需要flush lsn,根据业务实际情况调用以下接口
  75. //LogSequenceNumber lastRecv = stream.getLastReceiveLSN();
  76. //stream.setFlushedLSN(lastRecv);
  77. //stream.forceUpdateStatus();
  78. }
  79. }else if(testMode == TEST_MODE_DROP_SLOT){
  80. conn.getReplicationAPI()
  81. .dropReplicationSlot("replication_slot");
  82. }
  83. } catch (Exception e) {
  84. e.printStackTrace();
  85. return;
  86. }
  87. }
  88. }