Transactions API

事务中的所有消息只有在事务提交完成后才能被消费者消费, 如果事务终止,事务中的所有写入和确认都会回滚。

先决条件

  1. 在 Pulsar 中开启事务,需要在 broker.conf 文件中配置以下参数。
  1. transactionCoordinatorEnabled=true
  1. 初始化事务协调器元数据,以便事务协调器可以利用 topic 分区优势,例如负载均衡。
  1. bin/pulsar initialize-transaction-coordinator-metadata -cs 127.0.0.1:2181 -c standalone

After initializing transaction coordinator metadata, you can use the transactions API. The following APIs are available.

初始化 Pulsar 客户端

You can enable transaction for transaction client and initialize transaction coordinator client.

  1. PulsarClient pulsarClient = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .enableTransaction(true)
  4. .build();

开启事务

You can start transaction in the following way.

  1. Transaction txn = pulsarClient
  2. .newTransaction()
  3. .withTransactionTimeout(5, TimeUnit.MINUTES)
  4. .build()
  5. .get();

生成事务消息

A transaction parameter is required when producing new transaction messages. The semantic of the transaction messages in Pulsar is read-committed, so the consumer cannot receive the ongoing transaction messages before the transaction is committed.

  1. producer.newMessage(txn).value("Hello Pulsar Transaction".getBytes()).sendAsync();

确认事务消息

The transaction acknowledgement requires a transaction parameter. The transaction acknowledgement marks the messages state to pending-ack state. When the transaction is committed, the pending-ack state becomes ack state. If the transaction is aborted, the pending-ack state becomes unack state.

  1. Message<byte[]> message = consumer.receive();
  2. consumer.acknowledgeAsync(message.getMessageId(), txn);

提交事务

When the transaction is committed, consumers receive the transaction messages and the pending-ack state becomes ack state.

  1. txn.commit().get();

中止事务

When the transaction is aborted, the transaction acknowledgement is canceled and the pending-ack messages are redelivered.

  1. txn.abort().get();

示例

The following example shows how messages are processed in transaction.

  1. PulsarClient pulsarClient = PulsarClient.builder()
  2. .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
  3. .statsInterval(0, TimeUnit.SECONDS)
  4. .enableTransaction(true)
  5. .build();
  6. String sourceTopic = "public/default/source-topic";
  7. String sinkTopic = "public/default/sink-topic";
  8. Producer<String> sourceProducer = pulsarClient
  9. .newProducer(Schema.STRING)
  10. .topic(sourceTopic)
  11. .create();
  12. sourceProducer.newMessage().value("hello pulsar transaction").sendAsync();
  13. Consumer<String> sourceConsumer = pulsarClient
  14. .newConsumer(Schema.STRING)
  15. .topic(sourceTopic)
  16. .subscriptionName("test")
  17. .subscriptionType(SubscriptionType.Shared)
  18. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  19. .subscribe();
  20. Producer<String> sinkProducer = pulsarClient
  21. .newProducer(Schema.STRING)
  22. .topic(sinkTopic)
  23. .create();
  24. Transaction txn = pulsarClient
  25. .newTransaction()
  26. .withTransactionTimeout(5, TimeUnit.MINUTES)
  27. .build()
  28. .get();
  29. // source message acknowledgement and sink message produce belong to one transaction,
  30. // they are combined into an atomic operation.
  31. Message<String> message = sourceConsumer.receive();
  32. sourceConsumer.acknowledgeAsync(message.getMessageId(), txn);
  33. sinkProducer.newMessage(txn).value("sink data").sendAsync();
  34. txn.commit().get();

在事务中启用批量消息处理

To enable batch messages in transactions, you need to enable the batch index acknowledgement feature. The transaction acks check whether the batch index acknowledgement conflicts.

To enable batch index acknowledgement, you need to set acknowledgmentAtBatchIndexLevelEnabled to true in the broker.conf or standalone.conf file.

  1. acknowledgmentAtBatchIndexLevelEnabled=true

And then you need to call the enableBatchIndexAcknowledgment(true) method in the consumer builder.

  1. Consumer<byte[]> sinkConsumer = pulsarClient
  2. .newConsumer()
  3. .topic(transferTopic)
  4. .subscriptionName("sink-topic")
  5. .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  6. .subscriptionType(SubscriptionType.Shared)
  7. .enableBatchIndexAcknowledgment(true) // enable batch index acknowledgement
  8. .subscribe();