How transactions work?

本节介绍事务有哪些组件,并说明这些组件是如何一起协同运行的。 想了解完整的设计实现,请参考PIP-31: Transactional Streaming

核心概念

必须先了解以下几个事务的核心概念,才能更好的理解事务的运行原理。

事务协调器

事务协调器(TC)是运行在 Pulsar Broker 中的一个模块。

  • 它维护事务的整个生命周期,并防止事务进入错误状态。

  • 它处理事务超时,并确保事务在事务超时后中止。

事务日志

所有事务元数据都保存在事务日志中。 事务日志由 Pulsar 主题记录。 如果事务协调器崩溃,它可以从事务日志恢复事务元数据。

事务日志存储事务状态,而不是事务中的实际消息(实际消息存储在实际的主题分区中)。

事务缓存

向事务内的主题分区生成的消息存储在该主题分区的事务缓冲区(TB)中。 在提交事务之前,事务缓冲区中的消息对消费者不可见。 当事务中止时,事务缓冲区中的消息将被丢弃。

事务缓冲区将所有正在进行和中止的事务存储在内存中。 所有消息都发送到实际的分区 Pulsar 主题。 提交事务后,事务缓冲区中的消息对消费者具体化(可见)。 事务中止时,事务缓冲区中的消息将被丢弃。

事务 ID

事务ID(TxnID)标识Pulsar中的唯一事务。 事务 ID 长度是 128-bit。 最高 16 位保留给事务协调器的 ID,其余位用于每个事务协调器中单调递增的数字。 使用 TxnID 很容易定位事务崩溃。

待确认状态

挂起确认状态在事务完成之前维护事务中的消息确认。 如果消息处于挂起确认状态,则在该消息从挂起确认状态中移除之前,其他事务无法确认该消息。

挂起的确认状态被保留到挂起的确认日志中(cursor ledger)。 新启动的broker可以从挂起的确认日志中恢复状态,以确保状态确认不会丢失。

数据流

在较高级别上,数据流可分为几个步骤:

  1. 开启事务。

  2. 使用事务发布消息。

  3. 使用事务确认消息。

  4. 结束事务。

为了帮助您调试或调优事务以获得更好的性能,请查看以下图表和说明。

1. 开启事务

在Pulsar中引入事务之前,将创建一个生产者,然后将消息发送给 broker 并存储在数据日志中。

How transactions work? - 图1

让我们浏览一下开启事务的步骤。

步骤说明
1.1
新Txn
第一步是 Pulsar 客户端找到事务协调器。
1.2
分配Txn ID
事务协调器为事务分配事务 ID。 在事务日志中,使用事务ID和状态(OPEN)记录事务,这确保事务状态保持不变,而不管事务协调器崩溃。
1.3
发送结果
事务日志将持久化事务ID的结果发送给事务协调器。
1.4
带上Txn ID
记录事务状态条目(entry)后,事务协调器将事务ID带回Pulsar客户端。

2. 生产事务消息

在此阶段,Pulsar 客户端进入一个事务循环,对组成事务的所有消息重复consume-process-produce(消费-处理-生产)操作。 这是一个很长的阶段,可能由多个生成和确认请求组成。

How transactions work? - 图2

让我们了解使用事务发布消息的步骤。

步骤说明
2.1.1
将生成的分区添加到Txn
在Pulsar客户端向一个新的主题分区生成消息之前,它向事务协调器发送一个请求,将该分区添加到事务中。
2.1.2
Txn的日志分区更改
事务协调器将事务的分区更改记录到事务日志中以确保持久性,从而确保事务协调器知道事务正在处理的所有分区。 事务协调器可以在结束分区阶段提交或中止每个分区上的更改。
2.1.3
发送结果
事务日志将记录新分区(用于生成消息)的结果发送给事务协调器。
2.1.4
发送结果
事务协调器将新确认的分区添加结果发送给事务。
2.2.1
向带有w/Txn的分区生产消息
Pulsar客户端开始向分区生成消息。 此部分的流程与生成消息的正常流程相同,只是事务生成的消息批包含事务ID。
2.2.2
写入消息
Broker 将消息写入分区。

3. 确认事务消息

在此阶段,Pulsar客户端向事务协调器发送请求,并确认新订阅是事务的一部分。

How transactions work? - 图3

让我们浏览一下使用事务确认消息的步骤。

步骤说明
3.1.1
发送请求
Pulsar 客户端向事务协调器发送请求以添加已确认的订阅。
3.1.2
日志订阅
事务协调器记录订阅的添加,这确保它知道事务处理的所有订阅,并且可以在结束阶段提交或中止每个订阅的更改。
3.1.3
发送结果
事务日志将记录新分区(用于确认消息)的结果发送给事务协调器。
3.1.4
发送结果
事务协调器将新确认的分区添加结果发送给事务。
3.2
确认消息 w/ Txn
Pulsar客户端确认订阅上的消息。 此部分的流程与消息确认的正常流程相同,只是确认的请求带有事务ID。
3.3
检查确认
接收确认请求的 broker 将检查确认是否属于某个事务。
如果它属于某个事务,broker 将消息标记为处于 PENDING_ACK(挂起确认)状态,这意味着,在确认提交或中止之前,使用相同订阅的其他消费者无法确认或否定确认消息。
如果有两个事务试图在具有相同订阅的一条消息上确认,则只有一个事务成功,而另一个事务冲突。 Pulsar客户端在尝试确认但检测到冲突时中止整个事务。 可以在单个确认和累积确认中检测到冲突。

4. 结束事务

在事务结束时,Pulsar客户端决定提交或中止事务。 当在确认消息上检测到冲突时,可以中止事务。

4.1 结束事务请求

Pulsar客户端完成事务后,会发出结束事务请求。

How transactions work? - 图4

让我们浏览一下结束事务的步骤。

步骤说明
4.1.1
结束Txn请求
Pulsar客户端向事务协调器发出结束事务请求(带有一个指示事务是提交还是中止的字段)。
4.1.2
提交Txn
事务协调器将提交或中止消息写入其事务日志。
4.1.3
发送结果
事务日志发送记录提交或中止状态的结果。

4.2 完成事务

事务协调器面向该事务中涉及的所有分区启动提交或中止消息的进程。

How transactions work? - 图5

让我们浏览一下完成事务的步骤。

步骤说明
4.2.1
在订阅上提交Txn
事务协调器在订阅上提交事务,同时在分区上提交事务。
4.2.2
写入标记符(Marker)
Broker (produce)将生成的提交标记写入实际分区。 同时,broker(ack)将确认的已提交标记(acked committed mark)写入订阅挂起的ack分区。
4.2.3
发送结果
数据日志将写入生成的提交标记的结果发送给 broker。 同时,挂起确认的数据日志将写入已确认提交的标记的结果发送给 broker。 游标(cursor)移动到下一个位置。
  • 如果事务已提交,则挂起的确认状态将变为“确认”状态。
  • 如果事务中止,则挂起的确认状态变为未确认状态。 (中止确认会导致消息重新传递给其他使用者。)

4.3 标记事务为 COMMITTED 或 ABORTED

事务协调器将最终事务状态写入事务日志以完成事务。

How transactions work? - 图6

让我们浏览一下将事务标记为已提交或已中止的步骤。

步骤说明
4.3.1
提交Txn
在此事务中涉及的所有分区的所有生成的消息和确认已成功提交或中止后,事务协调器将最终提交或中止的事务状态消息写入其事务日志,指示事务已完成。 可以安全地删除事务日志中与事务关联的所有消息。
4.3.2
发送结果
事务日志将提交的事务的结果发送给事务协调器。
4.3.3
发送结果
事务协调器将提交的事务的结果发送给Pulsar客户端。