1 Star 1 Fork 1

机器人 / rabbitmq-tx

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
贡献代码
同步代码
取消
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README

分布式事务

1 案例图

image-20210817161137448

小丁下单后,订单微服务要存储该订单信息到数据库中,同时订单微服务要把消息发送到派单微服务中,派单微服务再存储派单信息到数据库中。这交互的期间可能就有很多不稳定因素,其中某些服务总会有异常、超时、甚至奔溃的情况,就会造成数据的不一致问题;

2 分布式项目搭建

这种项目很常见了,各位小主们都熟练得很,就不做过多的演示,贴出一些核心的代码即可

2.1 订单微服务核心类

@Service
public class OrderService {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    /**
     * @描述 下单service
     * @码农 丁昌江
     * @日期 2021/5/17 15:14
     */
    public boolean order(Order order) {
        //1 插入订单
        String sql = "INSERT INTO `t_order`  VALUES (?, ?,?);";
        int effectCount = 0;
        effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
        //2 如果订单插入成功,再讲订单信息推送到派单服务中
        if (effectCount > 0) {
            //使用hutool发送post请求,如果第二个参数是string类型,那么自动是application/json格式;指定读取超时时间2秒
            String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
            //打印返回结果
            System.out.println(JSONUtil.toJsonStr(post));
            return true;
        }
        return false;
    }
}

2.2 派单微服务核心类

@RestController
public class DispatchController {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @PostMapping("dispatch")
    public String dispatch(@RequestBody Order order) throws InterruptedException {
        //为了方便演示,就不写service层了
        String sql = "insert into t_dispatch values (?,?,?,?);";
        final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
        if (effectCount > 0) {
            return "success";
        }
        return "fail";
    }
}

2.3 下单测试类

@SpringBootTest
class OrderServerApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        Order order = new Order();
        order.setId((int) (Math.random() * 100));
        order.setUserName("xxjqr");
        order.setProduct("肾宝");
        orderService.order(order);
    }

}

3 测试

3.1 订单插入、派单没插入的情况

这里我们模拟订单微服务 调用 派单微服务出现错误的情况

  1. 派单方法加入错误代码
    @PostMapping("dispatch")
    public String dispatch(@RequestBody Order order) throws InterruptedException {
        //错误演示代码
        System.out.println(1/0);
        
        String sql = "insert into t_dispatch values (?,?,?,?);";
        final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
        if (effectCount > 0) {
            return "success";
        }
        return "fail";
    }
  1. 此时结果是:

    订单插入了,但是派单没有插入;

image-20210817163858978

  1. 如何处理呢?

在 订单微服务->下单方法 上使用@Transactional()注解可以解决该问题;因为加了事务的方法,只有等待方法执行完成才会commit到数据库;只要派单方法报错,下单方法捕捉到,那么下单方法也不会commit

@Transactional(rollbackFor = Exception.class)
public boolean order(Order order) {
    //1 插入订单
    String sql = "INSERT INTO `t_order`  VALUES (?, ?,?);";
    int effectCount = 0;
    effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
    //2 如果订单插入成功,再讲订单信息推送到派单服务中
    if (effectCount > 0) {
        //使用hutool发送post请求,如果第二个参数是string类型,那么自动是application/json格式;指定读取超时时间3秒
        String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
        System.out.println(JSONUtil.toJsonStr(post));
        return true;
    }
    return false;
}

3.2 订单没插入、派单插入的情况

这种情况下,要么是下单服务出错了,要么就是派单服务超时之类的;这里我们演示订单微服务 调用 派单微服务超时吧!

  1. 派单方法加入3s延时

    @PostMapping("dispatch")
    public String dispatch(@RequestBody Order order) throws InterruptedException {
        //错误演示代码
        TimeUnit.SECONDS.sleep(3);
        
        String sql = "insert into t_dispatch values (?,?,?,?);";
        final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
        if (effectCount > 0) {
            return "success";
        }
        return "fail";
    }
  2. 下单方法post请求超时时间设置为2s

    String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
  3. 此时结果是:

    cn.hutool.http.HttpException: Read timed out 读取超时且

    image-20210817170041808

  4. 如何处理呢?

    使用RabbitMQ机制来处理

4 RabbitMQ处理分布式事务

4.1 使用rabbitmq中间件代替restfull请求

  1. yml加入rabbitmq配置,两个微服务都需要
spring:
  datasource:
    username: root
    url: jdbc:mysql://localhost:3306/order
    password: 221121
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: alisv
    port: 5672
    username: root
    password: 221121
    virtual-host: /
server:				
  port: 8081																								
  1. 消费者(本案例就是派单服务)负责创建队列、交换机等等
package com.xxjqr.dispatchserver.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    //定义队列 订单队列
    @Bean
    public Queue orderQueue() {
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        return new Queue("order_queue", durable, exclusive, autoDelete);
    }

    @Bean
    public DirectExchange direcOrdertExchange() {
        boolean durable = true;
        boolean autoDelete = false;
        return new DirectExchange("direct_order_exchange", durable, autoDelete);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(direcOrdertExchange())
                .with("order");
    }
}
  1. 订单服务->下单方法使用RabbitmqTemplate发送消息
public boolean orderByRabbitMQ(Order order) {
    //1 插入订单
    String sql = "INSERT INTO `t_order`  VALUES (?, ?,?);";
    int effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
    //2 如果订单插入成功,再讲订单信息推送到派单服务中
    if (effectCount == 1) {
        rabbitTemplate.convertAndSend("direct_order_exchange","order", JSON.toJSONString(order));
        return true;
    }
    return false;
}
  1. 派单服务->派单方法监听订单队列
@Component
@RabbitListener(queues = "order_queue")
public class OrderConsumer {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("开始消费了");
        Order order = JSON.parseObject(message, Order.class);
        String sql = "insert into t_dispatch values (?,?,?,?);";
        final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
        if (effectCount > 0) {
            System.out.println("插入成功:"+order.toString());
        }
    }
}

这样就完成了RabbitMQ来替代restfull进行微服务间通讯的功能,当然可能大家平时都是用的服务注册中心+openFeign的方式,也是一样的,替换成RabbitMQ;

那替换成RabbitMQ就不需要考虑我们前面提到的分布式事务的问题了吗?no,我们需要保证生产者消息的可靠投递和消费者的可靠消费,至于RabbitMQ服务的稳定性只能是搭建可靠的集群环境了,接下来我们从 生产者消息的可靠投递消费者的可靠消费两个方面来着手

4.2 生产者消息的可靠投递

我们把RabbitMQ的组成部分再回顾一下,我们的消息发送到交换机,由交换机根据我们的routingKey(如果是direct、topic模式的话)发送到对应的队列中;消费者再去从队列中取出消息消费 image-20210814111502888

那如何保证 生产者消息的可靠投递,其实就是保证我们发送到交换机的消息确实是被它收到了,那我们就需要打开RabbitMQ的消息确认机制

两个微服务配置,都需要开启消息确认机制

spring:
  datasource:
    username: root
    url: jdbc:mysql://localhost:3306/dispatch
    password: 221121
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: alisv
    port: 5672
    username: root
    password: 221121
    virtual-host: /
    # 发布确认类型:理解为 显示的
    publisher-confirm-type: correlated

server:
  port: 8082

在与插入订单同一个事务的方法中,插入一条记录订单消息发送到交换机状态的消息

还要设置好消息确认回调:这里我们每个2s重试3次,如果都失败就放弃

	@PostConstruct
    public void confirmCallback() {
        //如果消息投递失败,就会有cause
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{

            assert correlationData != null;
            final String data = correlationData.getId();
            final Order order = JSON.parseObject(data, Order.class);
            if (!ack) {
                System.out.println("失败原因:" + cause);
                //判断确认信息是否尝试了3次
                String querySql = String.format("select count(1) from t_order_confirm where order_id = %d and status = 0 and attemp_num < %d;", order.getId(), 3);
                final Integer queryCount = jdbcTemplate.queryForObject(querySql, Integer.class);
                if (queryCount != null && queryCount > 0) {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        rabbitTemplate.convertAndSend("direct_order_exchange","order",
                                JSON.toJSONString(order),
                                new CorrelationData(data));
                        String updateSQL = "update t_order_confirm set attemp_num = attemp_num+1 where order_id = ?;";
                        jdbcTemplate.update(updateSQL, order.getId());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    System.out.println("MQ队列应答失败,orderId 是:" + order.getId());
                    //发送邮件通知,不过这里要考虑到死锁的问题,因为如果是如果broker挂掉了的话,会反复这个过程;所以是不是可以选择通过预留一个http发送邮件的接口
                }

            } else {
                try {
                    String updateSQL = "update t_order_confirm set status = 1 where order_id = ?;";
                    final int update = jdbcTemplate.update(updateSQL, order.getId());
                    if (update == 1) {
                        System.out.println("消息成功投递到交换机");
                    } else {
                        System.out.println("修改确认状态失败");
                    }
                } catch (DataAccessException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    //开启事务
    @Transactional(rollbackFor = Exception.class)
    public boolean orderByRabbitMQ(Order order) {
        //1 插入订单
        String sql = "INSERT INTO `t_order`  VALUES (?, ?,?);";
        int effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
        //2 如果订单插入成功,再讲订单信息推送到派单服务中
        if (effectCount == 1) {
            //3 插入记录订单消息发送到交换机状态的消息
            String confirmSql = "insert into `t_order_confirm` (`id`,`order_id`,`product`,`user_name`) values (?,?,?,?);";
            final int update = jdbcTemplate.update(confirmSql, (int) (Math.random() * 100), order.getId(), order.getProduct(), order.getUserName());
            if (update == 1) {
                //4 发送消息,我们加入了最后一个参数:相关数据
                rabbitTemplate.convertAndSend("direct_order_exchange","order",
                        JSON.toJSONString(order),
                        new CorrelationData(JSON.toJSONString(order)));
            }
            return true;
        }
        return false;
    }

表结构

CREATE TABLE `t_order_confirm` (
  `id` int NOT NULL,
  `order_id` int DEFAULT NULL,
  `product` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `status` tinyint DEFAULT '0' COMMENT '0 已投递未确认 1 已投递已确认',
  `attempt_num` int DEFAULT '0' COMMENT '消息尝试发送次数',
  PRIMARY KEY (`id`),
  UNIQUE KEY `order_id_index` (`order_id`) USING BTREE COMMENT '订单id也是唯一的'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

这样就能保证我们生产者消息的可靠投递了

image-20210818115143130

4.3 消费者的可靠消费

这里主要是打断 消费者接收到消息后,自动应答到队列的过程,因为这样队列会移除该消息,而且开了重试机制的话还会造成死循环;我们要手动去应答并且将消息移到死信队列去做后续处理

  1. 消费者端修改配置文件
spring:
  datasource:
    username: root
    url: jdbc:mysql://localhost:3306/dispatch
    password: 221121
    driver-class-name: com.mysql.cj.jdbc.Driver
  rabbitmq:
    host: alisv
    port: 5672
    username: root
    password: 221121
    virtual-host: /
    # 发布确认类型:理解为 显示的
    publisher-confirm-type: correlated
    listener:
      #我使用的模式是direct模式,所以配置direct,如果是还简单模式、fanout模式、工作模式那就填simple
      direct:
        acknowledge-mode: manual
        retry:
          enabled: false # 开启重试
          max-attempts: 3 # 最大重试次数
          initial-interval: 2000ms # 重试间隔

server:
  port: 8082
  1. 消费者端添加死信队列绑定
@Configuration
@EnableRabbit
public class RabbitMQConfig {
    //定义队列 订单队列
    @Bean
    public Queue orderQueue() {
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        Map<String, Object> params = new HashMap<String, Object>() {
            {
                put("x-dead-letter-exchange","dead_order_exchange");
                put("x-dead-letter-routing-key","dead_order");
            }
        };
        return new Queue("order_queue", durable, exclusive, autoDelete,params);
    }

    //定义订单死信队列
    @Bean
    public Queue deadOrderQueue() {
        boolean durable = true;
        boolean exclusive = false;
        boolean autoDelete = false;
        return new Queue("dead_order_queue", durable, exclusive, autoDelete);
    }

    @Bean
    public DirectExchange directOrdertExchange() {
        boolean durable = true;
        boolean autoDelete = false;
        return new DirectExchange("direct_order_exchange", durable, autoDelete);
    }

    //定义死信交换机
    @Bean
    public DirectExchange deadOrdertExchange() {
        boolean durable = true;
        boolean autoDelete = false;
        return new DirectExchange("dead_order_exchange", durable, autoDelete);
    }

    //绑定死信队列和死信交换机
    @Bean
    public Binding deadOrderBinding() {
        return BindingBuilder
                .bind(deadOrderQueue())
                .to(deadOrdertExchange())
                .with("dead_order");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder
                .bind(orderQueue())
                .to(directOrdertExchange())
                .with("order");
    }
}
  1. 改造消费者代码
// 如果@RabbitListener注解放在类上,那么接收消息的方法就需要加上 @RabbitHandler注解
// 如果@RabbitListener放在方法上,就不需要
@RabbitListener(queues = "order_queue")
@Component
public class OrderConsumer {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    //解决消息重试的几种方案
    //1:控制重试次数+进入死信队列
    //2:try+catch+手动ack
    //3:try+catch+手动ack+死信队列
    //注意:
    //1:如果使用了手动ack,那么重试机制将自动失效
    //2:如果消费队列绑定了死信队列,那么手动ack后消息自动移到死信队列
    @RabbitHandler
    @Transactional(rollbackFor = Exception.class)
    public void receiveMessage(String message, Channel channel,
                               CorrelationData correlationData,
                               @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("开始消费了");
        try {
            Order order = JSON.parseObject(message, Order.class);
            String sql = "insert into t_dispatch values (?,?,?,?);";
            final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
            if (effectCount > 0) {
                System.out.println("插入成功:" + order.toString());
            }
            System.out.println(1/0);
            //参数:消息标识,是否回复多条,是否重试(如果再重试,那就极有可能死循环)
            channel.basicNack(tag,false,false);
        } catch (Exception e) {
            System.out.println("出现错误,消息即将进入死信队列");
            channel.basicNack(tag,false,false);
            //把异常抛出,这样@Transactional就能生效,异常之前的插入、修改都能回滚
            throw e;
        }
    }
}
  1. 定义死信队列消费者
@RabbitListener(queues = "dead_order_queue")
@Component
public class DeadOrderConsumer {

    @RabbitHandler
    public void reciveMessage(String message) {
        final Order order = JSON.parseObject(message, Order.class);
        System.out.println("死信队列收到了消息:" + message);
        System.out.println("发送消息告诉订单系统,该订单无法正常完成,让其修改is_delete 标识为 1");
    }
}

结果:

image-20210818120930685

空文件

简介

从RabbitMQ的角度来思考如何实现分布式事务 展开 收起
Java
取消

发行版

暂无发行版

贡献者

全部

近期动态

加载更多
不能加载更多了
Java
1
https://gitee.com/xxjqr/rabbitmq-tx.git
git@gitee.com:xxjqr/rabbitmq-tx.git
xxjqr
rabbitmq-tx
rabbitmq-tx
master

搜索帮助