一,前言
在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?
一般实现的方法有几种:
- 使用 rocketmq、rabbitmq、pulsar 等消息队列的延时投递功能
- 使用 redisson 提供的 DelayedQueue
有一些方案虽然广为流传但存在着致命缺陷,不要用来实现延时任务
- 使用 redis 的过期监听
- 使用 rabbitmq 的死信队列
- 使用 JDK 延迟队列或非持久化的时间轮(服务器重启后,数据全部消失;且只适用于单机场景)
二,redis 过期监听
在 Redis 官方手册的keyspace-notifications: timing-of-expired-events中明确指出:
Basically expired events are generated when the Redis server deletes the key and not when the time to live theoretically reaches the value of zero
redis 自动过期的实现方式是:定时任务离线扫描并删除部分过期键;在访问键时惰性检查是否过期并删除过期键。redis 从未保证会在设定的过期时间立即删除并发送过期通知。实际上,过期通知晚于设定的过期时间数分钟的情况也比较常见。
此外键空间通知采用的是发送即忘(fire and forget)策略,并不像消息队列一样保证送达。当订阅事件的客户端会丢失所有在断线期间所有分发给它的事件。
这是一种比定时扫描数据库更 “LOW” 的解决方案,请不要使用。
三,Rabbitmq 死信
死信(Dead Letter) 是 rabbitmq 提供的一种机制。当一条消息满足下列条件之一那么它会成为死信:
- 消息被否定确认(如channel.basicNack) 并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间
- 消息队列的消息数量已经超过最大队列长度
若配置了死信队列,死信会被 rabbitmq 投到死信队列中。
在 rabbitmq 中创建死信队列的操作流程大概是:
- 创建一个交换机作为死信交换机
- 在业务队列中配置 x-dead-letter-exchange 和 x-dead-letter-routing-key,将第一步的交换机设为业务队列的死信交换机
- 在死信交换机上创建队列,并监听此队列
死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。死信队列同样也没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。
为了解决这个问题,rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange ,推荐使用官方插件来做延时消息。
这里说点题外话,使用 redis 过期监听或者 rabbitmq 死信队列做延时任务都是以设计者预想之外的方式使用中间件,这种出其不意必自毙的行为通常会存在某些隐患,比如缺乏一致性和可靠性保证,吞吐量较低、资源泄漏等。比较出名的一个事例是很多人使用 redis 的 list 作为消息队列,以致于最后作者看不下去写了 disque 并最后演变为 redis stream。工作中还是尽量不要滥用中间件,用专业的组件做专业的事
四,时间轮
时间轮是一种很优秀的定时任务的数据结构,然而绝大多数时间轮实现是纯内存没有持久化的。运行时间轮的进程崩溃之后其中所有的任务都会灰飞烟灭,所以奉劝各位勇士谨慎使用。
五,Redisson delayqueue
redisson delayqueue 是一种基于 redis zset 结构的延时队列实现。delayqueue 中有一个名为 timeoutSetName 的有序集合,其中元素的 score 为投递时间戳。delayqueue 会定时使用 zrangebyscore 扫描已到投递时间的消息,然后把它们移动到就绪消息列表中。
delayqueue 保证 redis 不崩溃的情况下不会丢失消息,在没有更好的解决方案时不妨一试。
在数据库索引设计良好的情况下,定时扫描数据库中未完成的订单产生的开销并没有想象中那么大。在使用 redisson delayqueue 等定时任务中间件时可以同时使用扫描数据库的方法作为补偿机制,避免中间件故障造成任务丢失。
delayqueue 代码实现:
实体类 DelayMsgVO
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayMsgVO implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String scene;
private String putTime;
public void setPutTime() {
this.putTime = DateUtil.now();
}
public DelayMsgVO(Long id, String scene) {
this.id = id;
this.scene = scene;
this.setPutTime();
}
}
工具类 RedissonDelayMsgUtil
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class RedissonDelayMsgUtil {
@Autowired
private RedissonClient redissonClient;
/**
* 投递延迟消息
* @param id 场景id
* @param scene 场景
* @param blockingDeque 延迟队列
*/
public void putDelayMsg(Long id, String scene, String blockingDeque) {
// 目标投放队列,消息到期后将被投放到此队列
RBlockingDeque<DelayMsgVO> blockingFairQueue = redissonClient.getBlockingDeque(blockingDeque);
RDelayedQueue<DelayMsgVO> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
// 配置30s以后将消息发送到指定队列,相当于30s后取消订单
// 同时将DelayMsgVO传输到 blockingFairQueue中, 这时可以在blockingFairQueue中获取DelayMsgVO了
DelayMsgVO delayMsgVO = new DelayMsgVO(id, scene);
delayedQueue.offer(new DelayMsgVO(id, scene), 30, TimeUnit.SECONDS);
System.out.println("delayMsgVO =========> " + delayMsgVO);
// 在该对象不再需要的情况下,应该主动销毁。
// 仅在相关的 Redisson 对象也需要关闭的时候可以不用主动销毁。
delayedQueue.destroy();
}
/**
* 监听延迟消息
* @param id 场景id
* @param scene 场景
* @param blockingDeque 延迟队列
*/
public void getDelayMsg(String blockingDeque) {
RBlockingQueue<DelayMsgVO> blockingFairQueue = redissonClient.getBlockingQueue(blockingDeque);
// 死循环监听延迟消息,实际业务中可结合定时任务工具(如 xxljob)进行处理
while (true) {
DelayMsgVO delayMsgVO = null;
try {
delayMsgVO = blockingFairQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("订单号:" + delayMsgVO.getId() + ",场景:" + delayMsgVO.getScene() + ",订单取消时间:" + DateUtil.now() + ",订单生成时间" + delayMsgVO.getPutTime());
}
}
}
测试
@Autowired
private RedissonDelayMsgUtil delayMsgUtil;
@ApiOperation(value = "延迟消息测试")
@GetMapping("/delayMsg")
public Result delayMsg() {
Long id = null;
String scene = "order";
String blockingDeque = "delay_test";
for (int i = 0; i < 10; i++) {
id = Integer.toUnsignedLong(i);
delayMsgUtil.putDelayMsg(id, scene, blockingDeque);
try {
//模拟间隔投递消息
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
delayMsgUtil.getDelayMsg(blockingDeque);
return Result.success();
}
结果打印:
delayMsgVO =========> DelayMsgVO(id=0, scene=order, putTime=2022-06-27 15:46:00)
delayMsgVO =========> DelayMsgVO(id=1, scene=order, putTime=2022-06-27 15:46:01)
delayMsgVO =========> DelayMsgVO(id=2, scene=order, putTime=2022-06-27 15:46:02)
delayMsgVO =========> DelayMsgVO(id=3, scene=order, putTime=2022-06-27 15:46:03)
delayMsgVO =========> DelayMsgVO(id=4, scene=order, putTime=2022-06-27 15:46:04)
delayMsgVO =========> DelayMsgVO(id=5, scene=order, putTime=2022-06-27 15:46:05)
delayMsgVO =========> DelayMsgVO(id=6, scene=order, putTime=2022-06-27 15:46:06)
delayMsgVO =========> DelayMsgVO(id=7, scene=order, putTime=2022-06-27 15:46:07)
delayMsgVO =========> DelayMsgVO(id=8, scene=order, putTime=2022-06-27 15:46:08)
delayMsgVO =========> DelayMsgVO(id=9, scene=order, putTime=2022-06-27 15:46:09)
订单号:0,场景:order,订单取消时间:2022-06-27 15:46:30,订单生成时间2022-06-27 15:46:00
订单号:1,场景:order,订单取消时间:2022-06-27 15:46:31,订单生成时间2022-06-27 15:46:01
订单号:2,场景:order,订单取消时间:2022-06-27 15:46:32,订单生成时间2022-06-27 15:46:02
订单号:3,场景:order,订单取消时间:2022-06-27 15:46:33,订单生成时间2022-06-27 15:46:03
订单号:4,场景:order,订单取消时间:2022-06-27 15:46:34,订单生成时间2022-06-27 15:46:04
订单号:5,场景:order,订单取消时间:2022-06-27 15:46:35,订单生成时间2022-06-27 15:46:05
订单号:6,场景:order,订单取消时间:2022-06-27 15:46:36,订单生成时间2022-06-27 15:46:06
订单号:7,场景:order,订单取消时间:2022-06-27 15:46:37,订单生成时间2022-06-27 15:46:07
订单号:8,场景:order,订单取消时间:2022-06-27 15:46:38,订单生成时间2022-06-27 15:46:08
订单号:9,场景:order,订单取消时间:2022-06-27 15:46:40,订单生成时间2022-06-27 15:46:09
六,结论
- 首先推荐使用 rocketmq、pulsar 等拥有定时投递功能的消息队列。
- 在不方便获得专业消息队列时可以考虑使用 redisson delayqueue 等基于 redis 的延时队列方案,但要为 redis 崩溃等情况设计补偿保护机制。
- 在无法使用 redisson delayqueue 等方案时可以考虑使用时间轮。由于时间轮重启远比 redis 重启要频繁,定时扫库等保护机制更为重要。
- 永远不要使用 redis 过期监听实现定时任务。
评论区