SEATA Saga 模式

概述

Saga模式是SEATA提供的长事务解决方案,在Saga模式中,业务流程中每个参与者都提交本地事务,当出现某一个参与者失败则补偿前面已经成功的参与者,一阶段正向服务和二阶段补偿服务都由业务开发实现。

Saga模式示意图

理论基础:Hector & Kenneth 发表论⽂ Sagas (1987)

适用场景:

  • 业务流程长、业务流程多
  • 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

优势:

  • 一阶段提交本地事务,无锁,高性能
  • 事件驱动架构,参与者可异步执行,高吞吐
  • 补偿服务易于实现

缺点:

  • 不保证隔离性(应对方案见后面文档)

Saga的实现:

基于状态机引擎的 Saga 实现:

目前SEATA提供的Saga模式是基于状态机引擎来实现的,机制是:

  1. 通过状态图来定义服务调用的流程并生成 json 状态语言定义文件
  2. 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点
  3. 状态图 json 由状态机引擎驱动执行,当出现异常时状态引擎反向执行已成功节点对应的补偿节点将事务回滚

注意: 异常发生时是否进行补偿也可由用户自定义决定

  1. 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能

示例状态图:

示例状态图

快速开始

Demo简介

基于dubbo构建的微服务下,使用Saga模式演示分布式事务的提交和回滚;

业务流程图如下图所示:

demo业务流程图

先下载seata-samples工程:https://github.com/seata/seata-samples.git

注意SEATA版本需要0.9.0以上

在dubbo-saga-sample中一个分布式事务内会有2个Saga事务参与者,分别是: InventoryActionBalanceAction ;分布式事务提交则两者均提交,分布式事务回滚则两者均回滚;

这2个Saga参与者均是 dubbo 服务,两个参与都有一个reduce方法,表示库存扣减或余额扣减,还有一个compensateReduce方法,表示补偿扣减操作。

  • InventoryAction 接口定义如下:
  1. public interface InventoryAction {
  2. /**
  3. * reduce
  4. * @param businessKey
  5. * @param amount
  6. * @param params
  7. * @return
  8. */
  9. boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
  10. /**
  11. * compensateReduce
  12. * @param businessKey
  13. * @param params
  14. * @return
  15. */
  16. boolean compensateReduce(String businessKey, Map<String, Object> params);
  17. }
  • 这个场景用状态语言定义就是下面的json:src/main/resources/statelang/reduce_inventory_and_balance.json
  1. {
  2. "Name": "reduceInventoryAndBalance",
  3. "Comment": "reduce inventory then reduce balance in a transaction",
  4. "StartState": "ReduceInventory",
  5. "Version": "0.0.1",
  6. "States": {
  7. "ReduceInventory": {
  8. "Type": "ServiceTask",
  9. "ServiceName": "inventoryAction",
  10. "ServiceMethod": "reduce",
  11. "CompensateState": "CompensateReduceInventory",
  12. "Next": "ChoiceState",
  13. "Input": [
  14. "$.[businessKey]",
  15. "$.[count]"
  16. ],
  17. "Output": {
  18. "reduceInventoryResult": "$.#root"
  19. },
  20. "Status": {
  21. "#root == true": "SU",
  22. "#root == false": "FA",
  23. "$Exception{java.lang.Throwable}": "UN"
  24. }
  25. },
  26. "ChoiceState":{
  27. "Type": "Choice",
  28. "Choices":[
  29. {
  30. "Expression":"[reduceInventoryResult] == true",
  31. "Next":"ReduceBalance"
  32. }
  33. ],
  34. "Default":"Fail"
  35. },
  36. "ReduceBalance": {
  37. "Type": "ServiceTask",
  38. "ServiceName": "balanceAction",
  39. "ServiceMethod": "reduce",
  40. "CompensateState": "CompensateReduceBalance",
  41. "Input": [
  42. "$.[businessKey]",
  43. "$.[amount]",
  44. {
  45. "throwException" : "$.[mockReduceBalanceFail]"
  46. }
  47. ],
  48. "Output": {
  49. "compensateReduceBalanceResult": "$.#root"
  50. },
  51. "Status": {
  52. "#root == true": "SU",
  53. "#root == false": "FA",
  54. "$Exception{java.lang.Throwable}": "UN"
  55. },
  56. "Catch": [
  57. {
  58. "Exceptions": [
  59. "java.lang.Throwable"
  60. ],
  61. "Next": "CompensationTrigger"
  62. }
  63. ],
  64. "Next": "Succeed"
  65. },
  66. "CompensateReduceInventory": {
  67. "Type": "ServiceTask",
  68. "ServiceName": "inventoryAction",
  69. "ServiceMethod": "compensateReduce",
  70. "Input": [
  71. "$.[businessKey]"
  72. ]
  73. },
  74. "CompensateReduceBalance": {
  75. "Type": "ServiceTask",
  76. "ServiceName": "balanceAction",
  77. "ServiceMethod": "compensateReduce",
  78. "Input": [
  79. "$.[businessKey]"
  80. ]
  81. },
  82. "CompensationTrigger": {
  83. "Type": "CompensationTrigger",
  84. "Next": "Fail"
  85. },
  86. "Succeed": {
  87. "Type":"Succeed"
  88. },
  89. "Fail": {
  90. "Type":"Fail",
  91. "ErrorCode": "PURCHASE_FAILED",
  92. "Message": "purchase failed"
  93. }
  94. }
  95. }

该json表示的状态图:

该json表示的状态图

状态语言在一定程度上参考了AWS Step Functions

“状态机” 属性简介:

  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个”状态”
  • States: 状态列表,是一个map结构,key是”状态”的名称,在状态机内必须唯一
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新

“状态” 属性简介:

  • Type: “状态” 的类型,比如有:
    • ServiceTask: 执行调用服务任务
    • Choice: 单条件选择路由
    • CompensationTrigger: 触发补偿流程
    • Succeed: 状态机正常结束
    • Fail: 状态机异常结束
    • SubStateMachine: 调用子状态机
    • CompensateSubMachine: 用于补偿一个子状态机
  • ServiceName: 服务名称,通常是服务的beanId
  • ServiceMethod: 服务方法名称
  • CompensateState: 该”状态”的补偿”状态”
  • Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行
  • Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可
  • Ouput: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  • Catch: 捕获到异常后的路由
  • Next: 服务执行完成后下一个执行的”状态”
  • Choices: Choice类型的”状态”里, 可选的分支列表, 分支中的Expression为SpringEL表达式, Next为当表达式成立时执行的下一个”状态”
  • ErrorCode: Fail类型”状态”的错误码
  • Message: Fail类型”状态”的错误信息

更多详细的状态语言解释请看State language referance章节

更多详细的状态语言使用示例见https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine

Demo 运行指南

step 1 启动 SEATA Server

运行 SeataServerStarter ,启动 Seata Server;

step 2 启动 dubbo provider Demo

运行 DubboSagaProviderStarter ,启动 dubbo provider;

step 3 启动 Saga Demo

运行 DubboSagaTransactionStarter , 启动 demo工程;

Demo中的数据库使用的是H2内存数据库, 生产上建议使用与业务相同的库, 目前支持Oracle, Mysql, DB2. 建表语句在 https://github.com/seata/seata/tree/develop/saga/seata-saga-engine-store/src/main/resources/sql

Demo中还有调用本地服务和调用SOFA RPC服务的示例

状态机设计器

Seata Saga 提供了一个可视化的状态机设计器方便用户使用,代码和运行指南请参考: https://github.com/seata/seata/tree/develop/saga/seata-saga-statemachine-designer

状态机设计器截图: 状态机设计器

状态机设计器演示地址:http://seata.io/saga_designer/index.html

状态机设计器视频教程:http://seata.io/saga_designer/vedio.html

最佳实践

Saga 服务设计的实践经验

允许空补偿

  • 空补偿:原服务未执行,补偿服务执行了
  • 出现原因:
    • 原服务 超时(丢包)
    • Saga 事务触发 回滚
    • 未收到 原服务请求,先收到 补偿请求

所以服务设计时需要允许空补偿, 即没有找到要补偿的业务主键时返回补偿成功并将原业务主键记录下来

防悬挂控制

  • 悬挂:补偿服务 比 原服务 先执行
  • 出现原因:
    • 原服务 超时(拥堵)
    • Saga 事务回滚,触发 回滚
    • 拥堵的 原服务 到达

所以要检查当前业务主键是否已经在空补偿记录下来的业务主键中存在,如果存在则要拒绝服务的执行

幂等控制

  • 原服务与补偿服务都需要保证幂等性, 由于网络可能超时, 可以设置重试策略,重试发生时要通过幂等控制避免业务数据重复更新

缺乏隔离性的应对

  • 由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 然后给用户B扣减余额, 如果在给A用户充值成功, 在事务提交以前, A用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了。这就是缺乏隔离性造成的典型的问题, 实践中一般的应对方法是:
    • 业务流程设计时遵循“宁可长款, 不可短款”的原则, 长款意思是客户少了钱机构多了钱, 以机构信誉可以给客户退款, 反之则是短款, 少的钱可能追不回来了。所以在业务流程设计上一定是先扣款。
    • 有些业务场景可以允许让业务最终成功, 在回滚不了的情况下可以继续重试完成后面的流程, 所以状态机引擎除了提供“回滚”能力还需要提供“向前”恢复上下文继续执行的能力, 让业务最终执行成功, 达到最终一致性的目的。

性能优化

  • 配置客户端参数client.rm.report.success.enable=false,可以在当分支事务执行成功时不上报分支状态到server,从而提升性能。

当上一个分支事务的状态还没有上报的时候,下一个分支事务已注册,可以认为上一个实际已成功

API referance

StateMachineEngine API

  1. public interface StateMachineEngine {
  2. /**
  3. * start a state machine instance
  4. * @param stateMachineName
  5. * @param tenantId
  6. * @param startParams
  7. * @return
  8. * @throws EngineExecutionException
  9. */
  10. StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException;
  11. /**
  12. * start a state machine instance with businessKey
  13. * @param stateMachineName
  14. * @param tenantId
  15. * @param businessKey
  16. * @param startParams
  17. * @return
  18. * @throws EngineExecutionException
  19. */
  20. StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException;
  21. /**
  22. * start a state machine instance asynchronously
  23. * @param stateMachineName
  24. * @param tenantId
  25. * @param startParams
  26. * @param callback
  27. * @return
  28. * @throws EngineExecutionException
  29. */
  30. StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
  31. /**
  32. * start a state machine instance asynchronously with businessKey
  33. * @param stateMachineName
  34. * @param tenantId
  35. * @param businessKey
  36. * @param startParams
  37. * @param callback
  38. * @return
  39. * @throws EngineExecutionException
  40. */
  41. StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
  42. /**
  43. * forward restart a failed state machine instance
  44. * @param stateMachineInstId
  45. * @param replaceParams
  46. * @return
  47. * @throws ForwardInvalidException
  48. */
  49. StateMachineInstance forward(String stateMachineInstId, Map<String, Object> replaceParams) throws ForwardInvalidException;
  50. /**
  51. * forward restart a failed state machine instance asynchronously
  52. * @param stateMachineInstId
  53. * @param replaceParams
  54. * @param callback
  55. * @return
  56. * @throws ForwardInvalidException
  57. */
  58. StateMachineInstance forwardAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws ForwardInvalidException;
  59. /**
  60. * compensate a state machine instance
  61. * @param stateMachineInstId
  62. * @param replaceParams
  63. * @return
  64. * @throws EngineExecutionException
  65. */
  66. StateMachineInstance compensate(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
  67. /**
  68. * compensate a state machine instance asynchronously
  69. * @param stateMachineInstId
  70. * @param replaceParams
  71. * @param callback
  72. * @return
  73. * @throws EngineExecutionException
  74. */
  75. StateMachineInstance compensateAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws EngineExecutionException;
  76. /**
  77. * skip current failed state instance and forward restart state machine instance
  78. * @param stateMachineInstId
  79. * @return
  80. * @throws EngineExecutionException
  81. */
  82. StateMachineInstance skipAndForward(String stateMachineInstId) throws EngineExecutionException;
  83. /**
  84. * skip current failed state instance and forward restart state machine instance asynchronously
  85. * @param stateMachineInstId
  86. * @param callback
  87. * @return
  88. * @throws EngineExecutionException
  89. */
  90. StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallback callback) throws EngineExecutionException;
  91. /**
  92. * get state machine configurations
  93. * @return
  94. */
  95. StateMachineConfig getStateMachineConfig();
  96. }

StateMachine Execution Instance API:

  1. StateLogRepository stateLogRepository = stateMachineEngine.getStateMachineConfig().getStateLogRepository();
  2. StateMachineInstance stateMachineInstance = stateLogRepository.getStateMachineInstanceByBusinessKey(businessKey, tenantId);
  3. /**
  4. * State Log Repository
  5. *
  6. * @author lorne.cl
  7. */
  8. public interface StateLogRepository {
  9. /**
  10. * Get state machine instance
  11. *
  12. * @param stateMachineInstanceId
  13. * @return
  14. */
  15. StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
  16. /**
  17. * Get state machine instance by businessKey
  18. *
  19. * @param businessKey
  20. * @param tenantId
  21. * @return
  22. */
  23. StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
  24. /**
  25. * Query the list of state machine instances by parent id
  26. *
  27. * @param parentId
  28. * @return
  29. */
  30. List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
  31. /**
  32. * Get state instance
  33. *
  34. * @param stateInstanceId
  35. * @param machineInstId
  36. * @return
  37. */
  38. StateInstance getStateInstance(String stateInstanceId, String machineInstId);
  39. /**
  40. * Get a list of state instances by state machine instance id
  41. *
  42. * @param stateMachineInstanceId
  43. * @return
  44. */
  45. List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
  46. }

StateMachine Definition API:

  1. StateMachineRepository stateMachineRepository = stateMachineEngine.getStateMachineConfig().getStateMachineRepository();
  2. StateMachine stateMachine = stateMachineRepository.getStateMachine(stateMachineName, tenantId);
  3. /**
  4. * StateMachineRepository
  5. *
  6. * @author lorne.cl
  7. */
  8. public interface StateMachineRepository {
  9. /**
  10. * Gets get state machine by id.
  11. *
  12. * @param stateMachineId the state machine id
  13. * @return the get state machine by id
  14. */
  15. StateMachine getStateMachineById(String stateMachineId);
  16. /**
  17. * Gets get state machine.
  18. *
  19. * @param stateMachineName the state machine name
  20. * @param tenantId the tenant id
  21. * @return the get state machine
  22. */
  23. StateMachine getStateMachine(String stateMachineName, String tenantId);
  24. /**
  25. * Gets get state machine.
  26. *
  27. * @param stateMachineName the state machine name
  28. * @param tenantId the tenant id
  29. * @param version the version
  30. * @return the get state machine
  31. */
  32. StateMachine getStateMachine(String stateMachineName, String tenantId, String version);
  33. /**
  34. * Register the state machine to the repository (if the same version already exists, return the existing version)
  35. *
  36. * @param stateMachine
  37. */
  38. StateMachine registryStateMachine(StateMachine stateMachine);
  39. /**
  40. * registry by resources
  41. *
  42. * @param resources
  43. * @param tenantId
  44. */
  45. void registryByResources(Resource[] resources, String tenantId) throws IOException;
  46. }

Config referance

在Spring Bean配置文件中配置一个StateMachineEngine

  1. <bean id="dataSource" class="...">
  2. ...
  3. <bean>
  4. <bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
  5. <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
  6. </bean>
  7. <bean id="dbStateMachineConfig" class="io.seata.saga.engine.config.DbStateMachineConfig">
  8. <property name="dataSource" ref="dataSource" />
  9. <property name="resources" value="statelang/*.json" />
  10. <property name="enableAsync" value="true" />
  11. <!-- 事件驱动执行时使用的线程池, 如果所有状态机都同步执行且不存在循环任务可以不需要 -->
  12. <property name="threadPoolExecutor" ref="threadExecutor" />
  13. <property name="applicationId" value="saga_sample" />
  14. <property name="txServiceGroup" value="my_test_tx_group" />
  15. <property name="sagaBranchRegisterEnable" value="false" />
  16. <property name="sagaJsonParser" value="fastjson" />
  17. <property name="sagaRetryPersistModeUpdate" value="false" />
  18. <property name="sagaCompensatePersistModeUpdate" value="false" />
  19. </bean>
  20. <bean id="threadExecutor"
  21. class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
  22. <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_" />
  23. <property name="corePoolSize" value="1" />
  24. <property name="maxPoolSize" value="20" />
  25. </bean>
  26. <!-- Seata Server进行事务恢复时需要通过这个Holder拿到stateMachineEngine实例 -->
  27. <bean class="io.seata.saga.rm.StateMachineEngineHolder">
  28. <property name="stateMachineEngine" ref="stateMachineEngine"/>
  29. </bean>

State language referance

“状态机”的属性列表

  1. {
  2. "Name": "reduceInventoryAndBalance",
  3. "Comment": "reduce inventory then reduce balance in a transaction",
  4. "StartState": "ReduceInventory",
  5. "Version": "0.0.1",
  6. "States": {
  7. },
  8. "IsRetryPersistModeUpdate": false,
  9. "IsCompensatePersistModeUpdate": false
  10. }
  • Name: 表示状态机的名称,必须唯一
  • Comment: 状态机的描述
  • Version: 状态机定义版本
  • StartState: 启动时运行的第一个”状态”
  • States: 状态列表,是一个map结构,key是”状态”的名称,在状态机内必须唯一, value是一个map结构表示”状态”的属性列表
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新, 默认是false, 即新增一条重试日志 (优先级高于全局 stateMachineConfig 配置属性)
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新, 默认是false, 即新增一条补偿日志 (优先级高于全局 stateMachineConfig 配置属性)

各种”状态”的属性列表

ServiceTask:

  1. "States": {
  2. ...
  3. "ReduceBalance": {
  4. "Type": "ServiceTask",
  5. "ServiceName": "balanceAction",
  6. "ServiceMethod": "reduce",
  7. "CompensateState": "CompensateReduceBalance",
  8. "IsForUpdate": true,
  9. "IsPersist": true,
  10. "IsAsync": false,
  11. "IsRetryPersistModeUpdate": false,
  12. "IsCompensatePersistModeUpdate": false,
  13. "Loop": {
  14. "Parallel": 3,
  15. "Collection": "$.[collection]",
  16. "ElementVariableName": "element",
  17. "ElementIndexName": "loopCounter",
  18. "CompletionCondition": "[nrOfCompletedInstances] / [nrOfInstances] >= 0.6"
  19. },
  20. "Input": [
  21. "$.[businessKey]",
  22. "$.[amount]",
  23. {
  24. "loopCounter": "$.[loopCounter]",
  25. "element": "$.[element]",
  26. "throwException" : "$.[mockReduceBalanceFail]"
  27. }
  28. ],
  29. "Output": {
  30. "compensateReduceBalanceResult": "$.#root"
  31. },
  32. "Status": {
  33. "#root == true": "SU",
  34. "#root == false": "FA",
  35. "$Exception{java.lang.Throwable}": "UN"
  36. },
  37. "Retry": [
  38. {
  39. "Exceptions": ["io.seata.saga.engine.mock.DemoException"],
  40. "IntervalSeconds": 1.5,
  41. "MaxAttempts": 3,
  42. "BackoffRate": 1.5
  43. },
  44. {
  45. "IntervalSeconds": 1,
  46. "MaxAttempts": 3,
  47. "BackoffRate": 1.5
  48. }
  49. ],
  50. "Catch": [
  51. {
  52. "Exceptions": [
  53. "java.lang.Throwable"
  54. ],
  55. "Next": "CompensationTrigger"
  56. }
  57. ],
  58. "Next": "Succeed"
  59. }
  60. ...
  61. }
  • ServiceName: 服务名称,通常是服务的beanId
  • ServiceMethod: 服务方法名称
  • CompensateState: 该”状态”的补偿”状态”
  • IsForUpdate: 标识该服务会更新数据, 默认是false, 如果配置了CompensateState则默认是true, 有补偿服务的服务肯定是数据更新类服务
  • IsPersist: 执行日志是否进行存储, 默认是true, 有一些查询类的服务可以配置为false, 执行日志不进行存储提高性能, 因为当异常恢复时可以重复执行
  • IsAsync: 异步调用服务, 注意: 因为异步调用服务会忽略服务的返回结果, 所以用户定义的服务执行状态映射(下面的Status属性)将被忽略, 默认为服务调用成功, 如果提交异步调用就失败(比如线程池已满)则为服务执行状态为失败
  • IsRetryPersistModeUpdate: 向前重试时, 日志是否基于上次失败日志进行更新, 默认是false, 即新增一条重试日志 (优先级高于状态机属性配置)
  • IsCompensatePersistModeUpdate: 向后补偿重试时, 日志是否基于上次补偿日志进行更新, 默认是false, 即新增一条补偿日志 (优先级高于状态机属性配置)
  • Loop: 标识该事务节点是否为循环事务, 即由框架本身根据循环属性的配置, 遍历集合元素对该事务节点进行循环执行. 具体使用见: Loop 循环事务使用
  • Input: 调用服务的输入参数列表, 是一个数组, 对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的SpringEL, 如果是常量直接写值即可。复杂的参数如何传入见:复杂参数的Input定义
  • Output: 将服务返回的参数赋值到状态机上下文中, 是一个map结构,key为放入到状态机上文时的key(状态机上下文也是一个map),value中$.是表示SpringEL表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数
  • Status: 服务执行状态映射,框架定义了三个状态,SU 成功、FA 失败、UN 未知, 我们需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个map结构,key是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是SpringEL表达式判断服务返回参数,带$Exception{开头表示判断异常类型。value是当这个条件表达式成立时则将服务执行状态映射成这个值
  • Catch: 捕获到异常后的路由
  • Retry: 捕获异常后的重试策略, 是个数组可以配置多个规则, Exceptions 为匹配的的异常列表, IntervalSeconds 为重试间隔, MaxAttempts 为最大重试次数, BackoffRate 下一次重试间隔相对于上一次重试间隔的倍数,比如说上次一重试间隔是2秒, BackoffRate=1.5 则下一次重试间隔是3秒。Exceptions 属性可以不配置, 不配置时表示框架自动匹配网络超时异常。当在重试过程中发生了别的异常,框架会重新匹配规则,并按新规则进行重试,同一种规则的总重试次数不会超过该规则的MaxAttempts
  • Next: 服务执行完成后下一个执行的”状态”

当没有配置Status对服务执行状态进行映射, 系统会自动判断状态:

  • 没有异常则认为执行成功,
  • 如果有异常, 则判断异常是不是网路连接超时, 如果是则认为是FA
  • 如果是其它异常, 服务IsForUpdate=true则状态为UN, 否则为FA

整个状态机的执行状态如何判断?是由框架自己判断的, 状态机有两个状态: status(正向执行状态), compensateStatus(补偿状态):

  • 如果所有服务执行成功(事务提交成功)则status=SU, compensateStatus=null
  • 如果有服务执行失败且存在更新类服务执行成功且没有进行补偿(事务提交失败) 则status=UN, compensateStatus=null
  • 如果有服务执行失败且不存在更新类服务执行成功且没有进行补偿(事务提交失败) 则status=FA, compensateStatus=null
  • 如果补偿成功(事务回滚成功)则status=FA/UN, compensateStatus=SU
  • 发生补偿且有未补偿成功的服务(回滚失败)则status=FA/UN, compensateStatus=UN
  • 存在事务提交或回滚失败的情况Seata Sever都会不断发起重试

Choice:

  1. "ChoiceState":{
  2. "Type": "Choice",
  3. "Choices":[
  4. {
  5. "Expression":"[reduceInventoryResult] == true",
  6. "Next":"ReduceBalance"
  7. }
  8. ],
  9. "Default":"Fail"
  10. }

Choice类型的”状态”是单项选择路由 Choices: 可选的分支列表, 只会选择第一个条件成立的分支 Expression: SpringEL表达式 Next: 当Expression表达式成立时执行的下一个”状态”

Succeed:

  1. "Succeed": {
  2. "Type":"Succeed"
  3. }

运行到”Succeed状态”表示状态机正常结束, 正常结束不代表成功结束, 是否成功要看每个”状态”是否都成功

Fail:

  1. "Fail": {
  2. "Type":"Fail",
  3. "ErrorCode": "PURCHASE_FAILED",
  4. "Message": "purchase failed"
  5. }

运行到”Fail状态”状态机异常结束, 异常结束时可以配置ErrorCode和Message, 表示错误码和错误信息, 可以用于给调用方返回错误码和消息

CompensationTrigger:

  1. "CompensationTrigger": {
  2. "Type": "CompensationTrigger",
  3. "Next": "Fail"
  4. }

CompensationTrigger类型的state是用于触发补偿事件, 回滚分布式事务 Next: 补偿成功后路由到的state

SubStateMachine:

  1. "CallSubStateMachine": {
  2. "Type": "SubStateMachine",
  3. "StateMachineName": "simpleCompensationStateMachine",
  4. "CompensateState": "CompensateSubMachine",
  5. "IsRetryPersistModeUpdate": false,
  6. "IsCompensatePersistModeUpdate": false,
  7. "Input": [
  8. {
  9. "a": "$.1",
  10. "barThrowException": "$.[barThrowException]",
  11. "fooThrowException": "$.[fooThrowException]",
  12. "compensateFooThrowException": "$.[compensateFooThrowException]"
  13. }
  14. ],
  15. "Output": {
  16. "fooResult": "$.#root"
  17. },
  18. "Next": "Succeed"
  19. }

SubStateMachine类型的”状态”是调用子状态机 StateMachineName: 要调用的子状态机名称 CompensateState: 子状态机的补偿state, 可以不配置, 系统会自动创建它的补偿state, 子状态机的补偿实际就是调用子状态机的compensate方法, 所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时, 可以里利用Input属性自定义传入一些变量, 见下面的CompensateSubMachine

CompensateSubMachine:

  1. "CompensateSubMachine": {
  2. "Type": "CompensateSubMachine",
  3. "Input": [
  4. {
  5. "compensateFooThrowException": "$.[compensateFooThrowException]"
  6. }
  7. ]
  8. }

CompensateSubMachine类型的state是专门用于补偿一个子状态机的state,它会调用子状态机的compensate方法,可以利用Input属性传入一些自定义的变量, Status属性自定判断补偿是否成功

复杂参数的Input定义

  1. "FirstState": {
  2. "Type": "ServiceTask",
  3. "ServiceName": "demoService",
  4. "ServiceMethod": "complexParameterMethod",
  5. "Next": "ChoiceState",
  6. "ParameterTypes" : ["java.lang.String", "int", "io.seata.saga.engine.mock.DemoService$People", "[Lio.seata.saga.engine.mock.DemoService$People;", "java.util.List", "java.util.Map"],
  7. "Input": [
  8. "$.[people].name",
  9. "$.[people].age",
  10. {
  11. "name": "$.[people].name",
  12. "age": "$.[people].age",
  13. "childrenArray": [
  14. {
  15. "name": "$.[people].name",
  16. "age": "$.[people].age"
  17. },
  18. {
  19. "name": "$.[people].name",
  20. "age": "$.[people].age"
  21. }
  22. ],
  23. "childrenList": [
  24. {
  25. "name": "$.[people].name",
  26. "age": "$.[people].age"
  27. },
  28. {
  29. "name": "$.[people].name",
  30. "age": "$.[people].age"
  31. }
  32. ],
  33. "childrenMap": {
  34. "lilei": {
  35. "name": "$.[people].name",
  36. "age": "$.[people].age"
  37. }
  38. }
  39. },
  40. [
  41. {
  42. "name": "$.[people].name",
  43. "age": "$.[people].age"
  44. },
  45. {
  46. "name": "$.[people].name",
  47. "age": "$.[people].age"
  48. }
  49. ],
  50. [
  51. {
  52. "@type": "io.seata.saga.engine.mock.DemoService$People",
  53. "name": "$.[people].name",
  54. "age": "$.[people].age"
  55. }
  56. ],
  57. {
  58. "lilei": {
  59. "@type": "io.seata.saga.engine.mock.DemoService$People",
  60. "name": "$.[people].name",
  61. "age": "$.[people].age"
  62. }
  63. }
  64. ],
  65. "Output": {
  66. "complexParameterMethodResult": "$.#root"
  67. }
  68. }

上面的complexParameterMethod方法定义如下:

  1. People complexParameterMethod(String name, int age, People people, People[] peopleArrya, List<People> peopleList, Map<String, People> peopleMap)
  2. class People {
  3. private String name;
  4. private int age;
  5. private People[] childrenArray;
  6. private List<People> childrenList;
  7. private Map<String, People> childrenMap;
  8. ...
  9. }

启动状态机时传入参数:

  1. Map<String, Object> paramMap = new HashMap<>(1);
  2. People people = new People();
  3. people.setName("lilei");
  4. people.setAge(18);
  5. paramMap.put("people", people);
  6. String stateMachineName = "simpleStateMachineWithComplexParams";
  7. StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

注意ParameterTypes属性是可以不用传的,调用的方法的参数列表中有Map, List这种可以带泛型的集合类型, 因为java编译会丢失泛型, 所以需要用这个属性, 同时在Input的json中对应的对这个json加”@type”来申明泛型(集合的元素类型)

Loop 循环事务使用

  1. "States": {
  2. ...
  3. "ReduceBalance": {
  4. "Type": "ServiceTask",
  5. "ServiceName": "balanceAction",
  6. "ServiceMethod": "reduce",
  7. "CompensateState": "CompensateReduceBalance",
  8. "Loop": {
  9. "Parallel": 3,
  10. "Collection": "$.[collection]",
  11. "ElementVariableName": "loopElement",
  12. "ElementIndexName": "loopCounter",
  13. "CompletionCondition": "[nrOfCompletedInstances] / [nrOfInstances] >= 0.6"
  14. },
  15. "Input": [
  16. {
  17. "loopCounter": "$.[loopCounter]",
  18. "element": "$.[element]",
  19. "throwException": "$.[fooThrowException]"
  20. }
  21. ],
  22. "Output": {
  23. "fooResult": "$.#root"
  24. },
  25. "Status": {
  26. "#root == true": "SU",
  27. "#root == false": "FA",
  28. "$Exception{java.lang.Throwable}": "UN"
  29. },
  30. "Next": "ChoiceState"
  31. },
  32. "ChoiceState": {
  33. "Type": "Choice",
  34. "Choices": [
  35. {
  36. "Expression": "[loopResult].?[#this[fooResult] == null].size() == 0",
  37. "Next": "SecondState"
  38. }
  39. ],
  40. "Default":"Fail"
  41. }
  42. ...
  43. }
  • Loop: Loop 属性配置
    • Parallel: 并发执行事务的线程数, 支持并发执行循环任务, 默认: 1
    • Collection: 集合变量名, 在状态机启动时的入参, 用于框架获取需要循环遍历的集合对象
    • ElementVariableName: 集合单个元素名称, 用于在分支事务中获取元素值, 默认: loopElement
    • CompletionCondition: 自定义循环结束条件, 不写默认全部执行完成, 即: [nrOfInstances] == [nrOfCompletedInstances]
    • ElementIndexName: 集合下标名称, 用于在分支事务中获取元素下标, 默认: loopCounter

在循环任务中, 其每次事务出参会存放于一个 List: loopResult中, 在事务上下文中可以通过 loopResult获取事务的运行结果集, 并遍历获取单次执行结果

  • Loop 上下文参数
    • nrOfInstances: 循环实例总数
    • nrOfActiveInstances: 当前活动的实例总数
    • nrOfCompletedInstances: 当前完成的实例总数
    • loopResult: 循环实例执行的结果集

示例状态图:

Saga_Loop示例状态图

FAQ


问: saga服务流程可以不配置吗,使用全局事务id串起来,这样省去配置的工作量,再加上人工配置难免会配置错误?

答: saga一般有两种实现,一种是基于状态机定义,比如apache camel saga、eventuate,一种是基于注解+拦截器实现,比如serviceComb saga,后者是不需要配置状态图的。由于 Saga 事务不保证隔离性, 在极端情况下可能由于脏写无法完成回滚操作, 比如举一个极端的例子, 分布式事务内先给用户A充值, 然后给用户B扣减余额, 如果在给A用户充值成功, 在事务提交以前, A用户把余额消费掉了, 如果事务发生回滚, 这时则没有办法进行补偿了,有些业务场景可以允许让业务最终成功, 在回滚不了的情况下可以继续重试完成后面的流程, 基于状态机引擎除可以提供“回滚”能力外, 还可以提供“向前”恢复上下文继续执行的能力, 让业务最终执行成功, 达到最终一致性的目的,所以在实际生产中基于状态机的实现应用更多。后续也会提供基于注解+拦截器实现。


问: 比如有 服务A在系统1里面,服务B在系统2里面。全局事务由A开启,流程调用B开启子事务,那系统2也需要维护Saga状态机的三个表吗,也需要在Spring Bean配置文件中配置一个StateMachineEngine吗?

答: 不需要, 只在发起方记录日志. 由于只在发起方记录日志同时对参与者服务没有接口参数的要求,使得Saga可以方便集成其它机构或遗留系统的服务


问: 如果 系统1和系统2里面的服务,可以相互调用。系统12都可以开启全局事务,可以这样使用吗。那1和2 都需要维护Saga状态机的三个表,也需要在Spring Bean配置文件中配置一个StateMachineEngine?

答: 可以这样使用,如果两个系统都开启saga事务,那就要记录那三个表配置StateMachineEngine


问: 使用 Seata 的时候,现在是 AT 模式 如果改成 saga 模式的话,改造会大吗?

答: AT 模式完全是透明的,Saga 是有侵入性的,要配置状态机 json,如果服务多改造会比较大


问: Saga 模式是不是基于 AT 来加强的长事务处理呢?

答: 没有基于 AT,客户端完全是两套,Server 端是复用的。你也可以看 Saga 的单元测试,那里有很多示例:https://github.com/seata/seata/tree/develop/test/src/test/java/io/seata/saga/engine


问: 开发者文档中状态机引擎原理图里的EventQueue只是开启分布式事务的系统来进行事件驱动,调用其它系统服务像调用本地一样。系统之间还是RPC调用是吧。而不是系统之前也是纯事件驱动的?(”系统之间也是纯事件驱动的” 指 RPC 也是非阻塞的)

答: 节点与节点间是事件驱动的, RPC 的非阻塞需要 rpc client 支持,理论上也是可以的。rpc client 如果也是非阻塞 IO,那么所有环节都是异步了。


问: 考虑一个业务流程, 它后续的子流程, 不管谁先运行都不会相互影响,可以异步调用。子流程是其它系统服务。Seata Saga 是不是实现了这点,其实我没看明白 ,Seata Saga异步调用具体是不是各个节点异步了?

答: Saga的异步启动一个状态机(stateMachineEngine.startAsync)是指状态机内所有的状态都是事件驱动来执行的, 整个流程实际是同步的, 上一个状态结束才产生下一个状态的事件. 而异步调用一个服务是配置该ServiceTask为”IsAsync”:true, 这个服务将会异步调用, 不会阻塞状态机的推进, 状态机不关心它的执行结果.


问: Saga源码中事件驱动层同步bus和异步bus是什么作用?

答: 同步BUS是线程阻塞的,等整个状态机执行完毕才返回,异步BUS是非线程阻塞的,调用后立即返回,状态机执行完毕后回调你的Callback。


问: IsPersist: 执行日志是否进行存储,默认是true,有一些查询类的服务可以配置在false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行?

答: 是的可以配置成false, 不过建议先保持默认,这样在查询执行日志比较全,真的要做性能调优再配,一般不会有性能问题


问: seata saga 开启事务的客户端或者seata server服务端宕机或者重启,未完成的状态机实例是怎么保证继续执行下去的?谁去触发这个操作?

答: 状态机实例在本地数据库有记录日志,通过日志恢复。seata server 会触触发事务恢复。


问: saga 的json文件 支持热部署吗?

答: 支持, stateMachineEngine.getStateMachineConfig().getStateMachineRepository().registryByResources()。不过java代码和服务需要自己实现支持热部署


问: 出参入参都放在saga的上下文中,如果参数内容较多较大,业务量又大的话,对内存有限制吗?

答: 没有做限制,建议无关的参数不要放到上下文。下一个服务需要用的参数、或用于分支判断的参数可以放入上下文。


问: 确认个事情:每个节点,要么自己方法内部 Catch 异常处理,使最终有返回信息。要么自己内部可以不处理,交由状态机引擎捕获异常,在 json 中定义 Catch 属性。 而不是补偿节点能够自动触发补偿,需要补偿必须手动在 json,由 Catch 或者 Choices 属性路由到 CompensationTrigger?

答: 对的,这个是为了提高灵活性。用户可以自己控制是否进行回滚,因为并不是所有异常都要回滚,可能有一些自定义处理手段。


问: 所以 Catch 和 Choices 可以随便路由到想要的 state 对吧?

答: 是的。这种自定义出发补偿的设计是参考了 bpmn2.0 的。


问: 还有关于 json 文件,我打算一条流程,就定义一个 json,虽然有的流程很像,用 Choices,可以解决。但是感觉 json 还是要尽量简单。这样考虑对吗?

答: 你可以考虑用子状态机来复用,子状态机会多生成一行 stateMachineInstance 记录,但对性能影响应该不大。