小丁下单后,订单微服务要存储该订单信息到数据库中,同时订单微服务要把消息发送到派单微服务中,派单微服务再存储派单信息到数据库中。这交互的期间可能就有很多不稳定因素,其中某些服务总会有异常、超时、甚至奔溃的情况,就会造成数据的不一致问题;
这种项目很常见了,各位小主们都熟练得很,就不做过多的演示,贴出一些核心的代码即可
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* @描述 下单service
* @码农 丁昌江
* @日期 2021/5/17 15:14
*/
public boolean order(Order order) {
//1 插入订单
String sql = "INSERT INTO `t_order` VALUES (?, ?,?);";
int effectCount = 0;
effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
//2 如果订单插入成功,再讲订单信息推送到派单服务中
if (effectCount > 0) {
//使用hutool发送post请求,如果第二个参数是string类型,那么自动是application/json格式;指定读取超时时间2秒
String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
//打印返回结果
System.out.println(JSONUtil.toJsonStr(post));
return true;
}
return false;
}
}
@RestController
public class DispatchController {
@Autowired
private JdbcTemplate jdbcTemplate;
@PostMapping("dispatch")
public String dispatch(@RequestBody Order order) throws InterruptedException {
//为了方便演示,就不写service层了
String sql = "insert into t_dispatch values (?,?,?,?);";
final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
if (effectCount > 0) {
return "success";
}
return "fail";
}
}
@SpringBootTest
class OrderServerApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
Order order = new Order();
order.setId((int) (Math.random() * 100));
order.setUserName("xxjqr");
order.setProduct("肾宝");
orderService.order(order);
}
}
这里我们模拟订单微服务 调用 派单微服务出现错误的情况
@PostMapping("dispatch")
public String dispatch(@RequestBody Order order) throws InterruptedException {
//错误演示代码
System.out.println(1/0);
String sql = "insert into t_dispatch values (?,?,?,?);";
final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
if (effectCount > 0) {
return "success";
}
return "fail";
}
此时结果是:
订单插入了,但是派单没有插入;
在 订单微服务->下单方法 上使用@Transactional()注解可以解决该问题;因为加了事务的方法,只有等待方法执行完成才会commit到数据库;只要派单方法报错,下单方法捕捉到,那么下单方法也不会commit
@Transactional(rollbackFor = Exception.class)
public boolean order(Order order) {
//1 插入订单
String sql = "INSERT INTO `t_order` VALUES (?, ?,?);";
int effectCount = 0;
effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
//2 如果订单插入成功,再讲订单信息推送到派单服务中
if (effectCount > 0) {
//使用hutool发送post请求,如果第二个参数是string类型,那么自动是application/json格式;指定读取超时时间3秒
String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
System.out.println(JSONUtil.toJsonStr(post));
return true;
}
return false;
}
这种情况下,要么是下单服务出错了,要么就是派单服务超时之类的;这里我们演示订单微服务 调用 派单微服务超时吧!
派单方法加入3s延时
@PostMapping("dispatch")
public String dispatch(@RequestBody Order order) throws InterruptedException {
//错误演示代码
TimeUnit.SECONDS.sleep(3);
String sql = "insert into t_dispatch values (?,?,?,?);";
final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
if (effectCount > 0) {
return "success";
}
return "fail";
}
下单方法post请求超时时间设置为2s
String post = HttpUtil.post("http://localhost:8082/dispatch", JSONUtil.toJsonStr(order),2000);
此时结果是:
cn.hutool.http.HttpException: Read timed out 读取超时且
如何处理呢?
使用RabbitMQ机制来处理
spring:
datasource:
username: root
url: jdbc:mysql://localhost:3306/order
password: 221121
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: alisv
port: 5672
username: root
password: 221121
virtual-host: /
server:
port: 8081
package com.xxjqr.dispatchserver.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMQConfig {
//定义队列 订单队列
@Bean
public Queue orderQueue() {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
return new Queue("order_queue", durable, exclusive, autoDelete);
}
@Bean
public DirectExchange direcOrdertExchange() {
boolean durable = true;
boolean autoDelete = false;
return new DirectExchange("direct_order_exchange", durable, autoDelete);
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(orderQueue())
.to(direcOrdertExchange())
.with("order");
}
}
public boolean orderByRabbitMQ(Order order) {
//1 插入订单
String sql = "INSERT INTO `t_order` VALUES (?, ?,?);";
int effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
//2 如果订单插入成功,再讲订单信息推送到派单服务中
if (effectCount == 1) {
rabbitTemplate.convertAndSend("direct_order_exchange","order", JSON.toJSONString(order));
return true;
}
return false;
}
@Component
@RabbitListener(queues = "order_queue")
public class OrderConsumer {
@Autowired
private JdbcTemplate jdbcTemplate;
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("开始消费了");
Order order = JSON.parseObject(message, Order.class);
String sql = "insert into t_dispatch values (?,?,?,?);";
final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
if (effectCount > 0) {
System.out.println("插入成功:"+order.toString());
}
}
}
这样就完成了RabbitMQ来替代restfull进行微服务间通讯的功能,当然可能大家平时都是用的服务注册中心+openFeign的方式,也是一样的,替换成RabbitMQ;
那替换成RabbitMQ就不需要考虑我们前面提到的分布式事务的问题了吗?no,我们需要保证生产者消息的可靠投递和消费者的可靠消费,至于RabbitMQ服务的稳定性只能是搭建可靠的集群环境了,接下来我们从 生产者消息的可靠投递和消费者的可靠消费两个方面来着手
我们把RabbitMQ的组成部分再回顾一下,我们的消息发送到交换机,由交换机根据我们的routingKey(如果是direct、topic模式的话)发送到对应的队列中;消费者再去从队列中取出消息消费
那如何保证 生产者消息的可靠投递,其实就是保证我们发送到交换机的消息确实是被它收到了,那我们就需要打开RabbitMQ的消息确认机制
两个微服务配置,都需要开启消息确认机制
spring:
datasource:
username: root
url: jdbc:mysql://localhost:3306/dispatch
password: 221121
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: alisv
port: 5672
username: root
password: 221121
virtual-host: /
# 发布确认类型:理解为 显示的
publisher-confirm-type: correlated
server:
port: 8082
在与插入订单同一个事务的方法中,插入一条记录订单消息发送到交换机状态的消息;
还要设置好消息确认回调:这里我们每个2s重试3次,如果都失败就放弃
@PostConstruct
public void confirmCallback() {
//如果消息投递失败,就会有cause
rabbitTemplate.setConfirmCallback((correlationData,ack,cause)->{
assert correlationData != null;
final String data = correlationData.getId();
final Order order = JSON.parseObject(data, Order.class);
if (!ack) {
System.out.println("失败原因:" + cause);
//判断确认信息是否尝试了3次
String querySql = String.format("select count(1) from t_order_confirm where order_id = %d and status = 0 and attemp_num < %d;", order.getId(), 3);
final Integer queryCount = jdbcTemplate.queryForObject(querySql, Integer.class);
if (queryCount != null && queryCount > 0) {
try {
TimeUnit.SECONDS.sleep(2);
rabbitTemplate.convertAndSend("direct_order_exchange","order",
JSON.toJSONString(order),
new CorrelationData(data));
String updateSQL = "update t_order_confirm set attemp_num = attemp_num+1 where order_id = ?;";
jdbcTemplate.update(updateSQL, order.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("MQ队列应答失败,orderId 是:" + order.getId());
//发送邮件通知,不过这里要考虑到死锁的问题,因为如果是如果broker挂掉了的话,会反复这个过程;所以是不是可以选择通过预留一个http发送邮件的接口
}
} else {
try {
String updateSQL = "update t_order_confirm set status = 1 where order_id = ?;";
final int update = jdbcTemplate.update(updateSQL, order.getId());
if (update == 1) {
System.out.println("消息成功投递到交换机");
} else {
System.out.println("修改确认状态失败");
}
} catch (DataAccessException e) {
e.printStackTrace();
}
}
});
}
//开启事务
@Transactional(rollbackFor = Exception.class)
public boolean orderByRabbitMQ(Order order) {
//1 插入订单
String sql = "INSERT INTO `t_order` VALUES (?, ?,?);";
int effectCount = jdbcTemplate.update(sql, order.getId(), order.getUserName(), order.getProduct());
//2 如果订单插入成功,再讲订单信息推送到派单服务中
if (effectCount == 1) {
//3 插入记录订单消息发送到交换机状态的消息
String confirmSql = "insert into `t_order_confirm` (`id`,`order_id`,`product`,`user_name`) values (?,?,?,?);";
final int update = jdbcTemplate.update(confirmSql, (int) (Math.random() * 100), order.getId(), order.getProduct(), order.getUserName());
if (update == 1) {
//4 发送消息,我们加入了最后一个参数:相关数据
rabbitTemplate.convertAndSend("direct_order_exchange","order",
JSON.toJSONString(order),
new CorrelationData(JSON.toJSONString(order)));
}
return true;
}
return false;
}
表结构
CREATE TABLE `t_order_confirm` (
`id` int NOT NULL,
`order_id` int DEFAULT NULL,
`product` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`status` tinyint DEFAULT '0' COMMENT '0 已投递未确认 1 已投递已确认',
`attempt_num` int DEFAULT '0' COMMENT '消息尝试发送次数',
PRIMARY KEY (`id`),
UNIQUE KEY `order_id_index` (`order_id`) USING BTREE COMMENT '订单id也是唯一的'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
这样就能保证我们生产者消息的可靠投递了
这里主要是打断 消费者接收到消息后,自动应答到队列的过程,因为这样队列会移除该消息,而且开了重试机制的话还会造成死循环;我们要手动去应答并且将消息移到死信队列去做后续处理
spring:
datasource:
username: root
url: jdbc:mysql://localhost:3306/dispatch
password: 221121
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: alisv
port: 5672
username: root
password: 221121
virtual-host: /
# 发布确认类型:理解为 显示的
publisher-confirm-type: correlated
listener:
#我使用的模式是direct模式,所以配置direct,如果是还简单模式、fanout模式、工作模式那就填simple
direct:
acknowledge-mode: manual
retry:
enabled: false # 开启重试
max-attempts: 3 # 最大重试次数
initial-interval: 2000ms # 重试间隔
server:
port: 8082
@Configuration
@EnableRabbit
public class RabbitMQConfig {
//定义队列 订单队列
@Bean
public Queue orderQueue() {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
Map<String, Object> params = new HashMap<String, Object>() {
{
put("x-dead-letter-exchange","dead_order_exchange");
put("x-dead-letter-routing-key","dead_order");
}
};
return new Queue("order_queue", durable, exclusive, autoDelete,params);
}
//定义订单死信队列
@Bean
public Queue deadOrderQueue() {
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
return new Queue("dead_order_queue", durable, exclusive, autoDelete);
}
@Bean
public DirectExchange directOrdertExchange() {
boolean durable = true;
boolean autoDelete = false;
return new DirectExchange("direct_order_exchange", durable, autoDelete);
}
//定义死信交换机
@Bean
public DirectExchange deadOrdertExchange() {
boolean durable = true;
boolean autoDelete = false;
return new DirectExchange("dead_order_exchange", durable, autoDelete);
}
//绑定死信队列和死信交换机
@Bean
public Binding deadOrderBinding() {
return BindingBuilder
.bind(deadOrderQueue())
.to(deadOrdertExchange())
.with("dead_order");
}
@Bean
public Binding binding() {
return BindingBuilder
.bind(orderQueue())
.to(directOrdertExchange())
.with("order");
}
}
// 如果@RabbitListener注解放在类上,那么接收消息的方法就需要加上 @RabbitHandler注解
// 如果@RabbitListener放在方法上,就不需要
@RabbitListener(queues = "order_queue")
@Component
public class OrderConsumer {
@Autowired
private JdbcTemplate jdbcTemplate;
//解决消息重试的几种方案
//1:控制重试次数+进入死信队列
//2:try+catch+手动ack
//3:try+catch+手动ack+死信队列
//注意:
//1:如果使用了手动ack,那么重试机制将自动失效
//2:如果消费队列绑定了死信队列,那么手动ack后消息自动移到死信队列
@RabbitHandler
@Transactional(rollbackFor = Exception.class)
public void receiveMessage(String message, Channel channel,
CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("开始消费了");
try {
Order order = JSON.parseObject(message, Order.class);
String sql = "insert into t_dispatch values (?,?,?,?);";
final int effectCount = jdbcTemplate.update(sql, (int) (Math.random() * 100), order.getId(), "超人", 0);
if (effectCount > 0) {
System.out.println("插入成功:" + order.toString());
}
System.out.println(1/0);
//参数:消息标识,是否回复多条,是否重试(如果再重试,那就极有可能死循环)
channel.basicNack(tag,false,false);
} catch (Exception e) {
System.out.println("出现错误,消息即将进入死信队列");
channel.basicNack(tag,false,false);
//把异常抛出,这样@Transactional就能生效,异常之前的插入、修改都能回滚
throw e;
}
}
}
@RabbitListener(queues = "dead_order_queue")
@Component
public class DeadOrderConsumer {
@RabbitHandler
public void reciveMessage(String message) {
final Order order = JSON.parseObject(message, Order.class);
System.out.println("死信队列收到了消息:" + message);
System.out.println("发送消息告诉订单系统,该订单无法正常完成,让其修改is_delete 标识为 1");
}
}
结果:
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。