天增的博客
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
  • 分布式解决方案
  • 分布式理论
    • 三驾马车
      • The Google File System
      • BigTable中文翻译
      • MapReduce
    • 理论基础
    • 共识算法
      • Raft算法
      • Paxos算法
    • 通讯协议-Gossip
  • 流量调度
    • 流量控制
    • 服务路由
    • 负载均衡
  • 数据调度
    • 分布式缓存
    • 数据库
    • 分布式事务
      • 事务分类
        • 刚性事务的实现
        • 柔性事务的实现
        • 总体的⽅案对⽐
      • Seata框架
        • Seata解决方案
          • AT模式
          • TCC模式
          • Sega模式
            • 配置状态机引擎
            • 状态机执行
          • XA模式
    • 分布式锁
    • 分布式ID
  • 服务治理
    • 服务注册和发现
    • 链路追踪
    • 服务监控
    • 服务降级和熔断
  • 高并发架构
  • topic
  • 分布式解决方案
  • 数据调度
  • 分布式事务
  • Seata框架
  • Seata解决方案
  • Sega模式
2022-04-26
目录
配置状态机
配置状态机引擎
状态机执行

Sega模式

# Sega模式

Saga模式是SEATA提供的⻓事务解决⽅案,在Saga模式中,业务流程中 每个参与者都提交本地事务,当出现某⼀个参与者失败则补偿前⾯已经成 功的参与者,⼀阶段正向服务和⼆阶段补偿服务都由业务开发实现。

# 基于状态机引擎的 Saga 实现

⽬前SEATA提供的Saga模式是基于状态机引擎来实现的,机制是:

  1. 基于json格式定义服务调用状态图;
  2. 状态图的一个节点可以是一个服务,节点可以配置补偿节点;
  3. 状态图json由状态机执行引擎驱动执行,当出现异常状态时状态机引擎执行反向补偿任务将事物回滚;
  4. 异常状态发生时是否进行补偿由用户自定义决定;
  5. 可以实现服务编排的需求,支持单项选择、并发、异步、子状态机调用、参数转换、参数映射、服务执行状态判断、异常捕获等功能;

# springCloud seata saga接入指南

# 配置状态机

{
  "Name": "purchaseProcess",
  "Comment": "用户下单流程-saga流程",
  "StartState": "CreateOrderNo",
  "Version": "1.0.0",
  "States": {
    "CreateOrderNo": {
      "Comment": "生成订单号服务",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.OrderApi",
      "ServiceMethod": "createOrderNo",
      "CompensateState": "CompensationCanalOrder1",
      "Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger"
        }],
      "Output": {
        "orderNo":"$.#root"
      },
      "Next": "CreateOrder",
      "Status": {
        "$Exception{java.lang.Throwable}": "UN",
        "#root != null": "SU",
        "#root == null": "FA"
      }
    },
    "CreateOrder": {
      "Comment": "创建订单服务",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.OrderApi",
      "ServiceMethod": "createOrder",
      "CompensateState": "CompensationCanalOrder2",
      "Next": "ReduceStorage",
      "Input": [{
          "orderNo": "$.[orderNo]",
          "userId": "$.[order].userId",
          "productId": "$.[order].productId",
          "count": "$.[order].count",
          "price": "$.[order].price"
        }],
      "Catch": [{
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger"
        }],
      "Status": {
        "$Exception{java.lang.Throwable}": "UN",
        "#root != null": "SU",
        "#root == null": "FA"
      }
    },
    "ReduceStorage": {
      "Comment": "扣减库存服务",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.StorageApi",
      "ServiceMethod": "reduce",
      "CompensateState": "CompensatingReduceStorage",
      "Next":"Succeed",
      "Input": [{
        "orderNo": "$.[orderNo]",
        "productId": "$.[order].productId",
        "count": "$.[order].count"
      }],
      "Catch": [{
        "Exceptions": [
          "java.lang.Throwable"
        ],
        "Next": "CompensationTrigger"
      }]
    },
    "CompensationCanalOrder1": {
      "Comment": "取消订单补偿服务1--用于订单号生成失败",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.OrderApi",
      "ServiceMethod": "canalOrder",
      "Input": [
        "$.[orderNo]",
        1
      ]
    },
    "CompensationCanalOrder2": {
      "Comment": "取消订单补偿服务2--用于订单生成失败",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.OrderApi",
      "ServiceMethod": "canalOrder",
      "Input": [
        "$.[orderNo]",
        2
      ]
    },
    "CompensatingReduceStorage": {
      "Comment": "库存补偿服务",
      "Comment": "扣减库存服务",
      "Type": "ServiceTask",
      "ServiceName": "com.fly.seata.api.StorageApi",
      "ServiceMethod": "compensateReduce",
      "Input": [{
        "orderNo": "$.[orderNo]",
        "productId": "$.[order].productId",
        "count": "$.[order].count"
      }]
    },
    "CompensationTrigger": {
      "Type": "CompensationTrigger"
    },
    "Succeed": {
      "Type":"Succeed"
    },
    "Fail": {
      "Type":"Fail",
      "ErrorCode": "STORAGE_FAILED",
      "Message": "purchase failed"
    }
  }
}

# 配置状态机引擎

@Configuration
public class SagaConfig {

  @ConfigurationProperties("spring.datasource.saga")
  @Bean
  public DataSource dataSource(){
    return new DruidDataSource();
  }

  @Bean
  public DbStateMachineConfig dbStateMachineConfig(){
    DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig();
    dbStateMachineConfig.setDataSource(dataSource());
    Resource[] resources = {new ClassPathResource("statelang/purchase.json")};
    dbStateMachineConfig.setResources(resources);
    dbStateMachineConfig.setEnableAsync(true);
    dbStateMachineConfig.setThreadPoolExecutor(threadPoolExecutor());
    dbStateMachineConfig.setApplicationId("sage-tm");
    dbStateMachineConfig.setTxServiceGroup("my_test_tx_group");
    return dbStateMachineConfig;
  }

  /**
   * saga状态图执行引擎
   * @return
   */
  @Bean
  public StateMachineEngine processCtrlStateMachineEngine(){
    ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine();
    stateMachineEngine.setStateMachineConfig(dbStateMachineConfig());
    return stateMachineEngine;
  }

  @Bean
  public StateMachineEngineHolder stateMachineEngineHolder(){
    StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder();
    stateMachineEngineHolder.setStateMachineEngine(processCtrlStateMachineEngine());
    return stateMachineEngineHolder;
  }

  @Bean
  public ThreadPoolExecutor threadPoolExecutor(){
    ThreadPoolExecutorFactoryBean threadPoolExecutorFactoryBean = new ThreadPoolExecutorFactoryBean();
    threadPoolExecutorFactoryBean.setCorePoolSize(1);
    threadPoolExecutorFactoryBean.setMaxPoolSize(20);
    threadPoolExecutorFactoryBean.setThreadNamePrefix("saga_");
    return (ThreadPoolExecutor)threadPoolExecutorFactoryBean.getObject();
  }
}

# 状态机执行

@RequestMapping("/tm")
@RestController
public class TmController {

  /**
   * 模拟购买商品流程
   * @return
   */
  @GlobalTransactional
  @GetMapping("/purchase")
  public String purchase(){
    Map<String, Object> startParams = new HashMap<>();
    OrderDTO orderDTO = new OrderDTO();
    orderDTO.setUserId(1l);
    orderDTO.setCount(1);
    orderDTO.setPrice(new BigDecimal(19));
    orderDTO.setProductId(1l);
    startParams.put("order",orderDTO);
    StateMachineInstance stateMachineInstance = stateMachineEngine.start("purchaseProcess",null,startParams);
    return "执行状态:"+stateMachineInstance.getStatus().getStatusString();
  }

}
最近更新
01
以 root 身份启动 transmission-daemon
12-13
02
Debian系统安装qbittorrent-nox
12-09
03
LXC Debain12安装zerotier并实现局域网自动nat转发
07-29
更多文章>
Theme by Vdoing | Copyright © 2015-2025 天增 | 苏ICP备16037388号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式