RabbitMQ实现订单30分钟超时自动关闭
订单30分钟未支付,系统自动超时关闭有哪些实现方案?1.基于任务调度实现,效率是非常低,耗服务器性能2.基于redis过期key实现.用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;redis.set(orderToken ,orderID) 下单时候存放到redis,并存储id入库,30分钟过期,redis客户端监听,过期获取到orderId,拿orderId去查订单,没有
·
目录
1. 前言
项目中有一个简单的场景,涉及的用户量非常少,其实也不存在并发,都是客服管理在操作,而客服管理就那么几个人。客服需要在系统下单投放广告到广告屏。但是如果30分钟未确认投放则超时关闭。
2. 有哪些方案?
订单30分钟未支付,系统自动超时关闭有哪些实现方案?
- 基于任务调度实现,效率是非常低,耗服务器性能
- 基于redis过期key实现.用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;
- redis.set(orderToken ,orderID) 下单时候存放到redis,并存储id入库,30分钟过期,redis客户端监听,过期获取到orderId,拿orderId去查订单,没有支付则,订单关闭,库存增加。
- 缺点:非常冗余 ,会在表中存放一个冗余字段
- 基于redis延迟队列,处理正常订单超时自动关闭
- 基于MQ的延迟队列实现(最佳) 死信队列(延迟队列)
下文将基于MQ的延迟队列实现来介绍:
3. 实现原理
当我们在下单的时候,往MQ投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否有支付。
(消息过期投递到死信队列)
- 下单投放消息到A交换机(过期时间30分钟)
- 消息到aa队列(绑定死信交换机)
- 不设置aa队列的消费者(故此消息一直未消费).
- 30分钟后,过期消息投递到死信交换机,死信队列,由死信消费者消费
- 判断订单id是否支付,执行业务逻辑
- 支付—> return
- 未支付—>关闭订单,返还库存
4. DEMO演示
首先创建一个类DeadLetterMQConfig
(类名可随意),在类上加上@Component
注解
4.1 订单交换机
@Value("${mayikt.order.exchange}")
private String orderExchange;
4.2 订单队列
@Value("${mayikt.order.queue}")
private String orderQueue;
4.3 订单路由key
@Value("${mayikt.order.routingKey}")
private String orderRoutingKey;
4.4 死信交换机
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
4.5 死信队列
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
4.6 死信路由
@Value("${mayikt.dlx.routingKey}")
private String dlxRoutingKey;
4.7 声明死信交换机
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
4.8 声明死信队列
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
4.9 声明订单业务交换机
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
4.10 声明订单队列 核心操作一
@Bean
public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 绑定我们的死信交换机
arguments.put("x-dead-letter-exchange", dlxExchange);
// 绑定我们的路由key
arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, arguments);
}
4.11 绑定订单队列到订单交换机
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey);
}
4.12 绑定死信队列到死信交换机
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);
}
4.13 上述完整代码
@Component
public class DeadLetterMQConfig {
/**
* 订单交换机
*/
@Value("${mayikt.order.exchange}")
private String orderExchange;
/**
* 订单队列
*/
@Value("${mayikt.order.queue}")
private String orderQueue;
/**
* 订单路由key
*/
@Value("${mayikt.order.routingKey}")
private String orderRoutingKey;
/**
* 死信交换机
*/
@Value("${mayikt.dlx.exchange}")
private String dlxExchange;
/**
* 死信队列
*/
@Value("${mayikt.dlx.queue}")
private String dlxQueue;
/**
* 死信路由
*/
@Value("${mayikt.dlx.routingKey}")
private String dlxRoutingKey;
/**
* 声明死信交换机
*/
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(dlxExchange);
}
/**
* 声明死信队列
*/
@Bean
public Queue dlxQueue() {
return new Queue(dlxQueue);
}
/**
* 声明订单业务交换机
*/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(orderExchange);
}
/**
* 声明订单队列 核心操作一
*/
@Bean
public Queue orderQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 绑定我们的死信交换机
arguments.put("x-dead-letter-exchange", dlxExchange);
// 绑定我们的路由key
arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
return new Queue(orderQueue, true, false, false, arguments);
}
/**
* 绑定订单队列到订单交换机
*/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey);
}
/**
* 绑定死信队列到死信交换机
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);
}
}
4.14 application.properties
server.port=8082
spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#开启驼峰命名 譬如数据库create_time 自动映射pojo属性createTime
mybatis.configuration.map-underscore-to-camel-case=true
#配置virtual-host虚拟主机
spring.rabbitmq.virtual-host=/zhang_rabbit
#ip地址
spring.rabbitmq.host=127.0.0.1
#用户名 密码
spring.rabbitmq.username=zhang
spring.rabbitmq.password=zhang
#连接端口号
spring.rabbitmq.port=5672
#spring.rabbitmq.publisher-confirm-type=
#模拟演示死信队列
mayikt.dlx.exchange=mayikt_order_dlx_exchange
mayikt.dlx.queue=mayikt_order_dlx_queue
mayikt.dlx.routingKey=dlx
##备胎交换机
mayikt.order.exchange=mayikt_order_exchange
mayikt.order.queue=mayikt_order_queue
mayikt.order.routingKey=mayikt.order
-
MySQL数据库配置:
server.port=8082
: 指定内嵌Tomcat服务器的端口。spring.datasource.url
: 定义连接到MySQL数据库的JDBC URL。spring.datasource.username
和spring.datasource.password
: 提供MySQL数据库连接的用户名和密码。spring.datasource.driver-class-name
: 指定MySQL的JDBC驱动程序类。
-
MyBatis配置:
mybatis.configuration.map-underscore-to-camel-case=true
: 配置MyBatis将数据库列名中的下划线映射为Java对象属性的驼峰命名方式。
-
RabbitMQ配置:
spring.rabbitmq.virtual-host
: 设置RabbitMQ的虚拟主机。spring.rabbitmq.host=127.0.0.1
: 指定RabbitMQ服务器的IP地址。spring.rabbitmq.username
和spring.rabbitmq.password
: 设置RabbitMQ连接的用户名和密码。spring.rabbitmq.port=5672
: 指定RabbitMQ连接的端口号。
-
死信队列和备胎交换机配置:
mayikt.dlx.exchange
,mayikt.dlx.queue
,mayikt.dlx.routingKey
: 配置死信队列的交换机、队列和路由键。mayikt.order.exchange
,mayikt.order.queue
,mayikt.order.routingKey
: 配置备胎交换机的交换机、队列和路由键。
4.15 创建producer生产者
暂时设置消息10秒过期,验证消息是否加入死信队列
@RestController
public class OrderController {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${mayikt.order.exchange}")
private String orderExchange; //订单交换机
@Value("${mayikt.order.routingKey}")
private String orderRoutingKey; //订单路由key
@GetMapping("/addOrder")
public String addOrder(){
String orderId=System.currentTimeMillis()+"";
OrderEntity orderEntity=new OrderEntity("订单30分钟过期",orderId,0);
//订单入库
int result= orderMapper.addOrder(orderEntity);
if(result<=0){
return "fail";
}
//rabbit投递消息
rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey,orderId,messagePostProcessor());
return "success";
}
//处理待发送消息
private MessagePostProcessor messagePostProcessor(){
return new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置有效期30分钟
//message.getMessageProperties().setExpiration("1800000");
message.getMessageProperties().setExpiration("10000");
return message;
}
};
}
}
- 添加订单接口:
@GetMapping(“/addOrder”): 处理GET请求的 /addOrder 接口。
生成一个唯一的订单号 orderId,通常使用当前时间戳。
创建一个 OrderEntity 对象,表示订单信息。
调用 orderMapper.addOrder(orderEntity) 将订单信息插入数据库。
如果插入成功,使用 rabbitTemplate.convertAndSend 发送消息到RabbitMQ,将订单号作为消息发送到指定的交换机(orderExchange)和路由键(orderRoutingKey)。 - 消息处理:
messagePostProcessor(): 定义了一个消息后处理器,用于设置消息的属性,这里通过 message.getMessageProperties().setExpiration(“10000”) 设置消息的过期时间为10秒(实际项目中需要修改)。
4.16 死信消费者消费过期消息
@Component //死信队列
public class OrderDlxConsumer {
@Autowired
private OrderMapper orderMapper;
/**
* 监听我们的死信队列
*/
@RabbitListener(queues = "mayikt_order_dlx_queue")
public void orderConsumer(String orderId) {
System.out.println("死信队列获取消息:" + orderId);
if (StringUtils.isEmpty(orderId)) {
return;
}
//根据id查询
OrderEntity orderEntity = orderMapper.getOrder(orderId);
if (null == orderEntity) {
return;
}
//获取状态
Integer orderStatus=orderEntity.getOrderStatus();
//判断未支付 , 关闭订单
if(0==orderStatus){
orderMapper.updateStatus(orderId,2);
//库存返还
}
}
}
- 死信队列消费方法:
@RabbitListener(queues = “mayikt_order_dlx_queue”): 使用 @RabbitListener 注解标记了一个监听名为 “mayikt_order_dlx_queue” 的死信队列的方法。
public void orderConsumer(String orderId): 该方法接收从死信队列中获取的消息,消息内容为订单号 orderId。 - 业务逻辑:
打印消息内容,用于日志记录。
根据订单号查询数据库中的订单信息。
如果订单存在且订单状态为未支付(orderStatus为0),则更新订单状态为已关闭(orderStatus为2)。 - 在实际应用中,可能还包含其他业务逻辑,比如返还库存等操作。
4.16 下单测试
http://localhost:8082/addOrder.订单入库成功.状态0未支付
10秒后,死信消费者处理 状态0未支付 变为2 已关闭
更多推荐
已为社区贡献1条内容
所有评论(0)