侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

延迟消息实现方案

再见理想
2022-06-27 / 0 评论 / 0 点赞 / 454 阅读 / 2,414 字

一,前言

在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?

一般实现的方法有几种:

  1. 使用 rocketmq、rabbitmq、pulsar 等消息队列的延时投递功能
  2. 使用 redisson 提供的 DelayedQueue

有一些方案虽然广为流传但存在着致命缺陷,不要用来实现延时任务

  1. 使用 redis 的过期监听
  2. 使用 rabbitmq 的死信队列
  3. 使用 JDK 延迟队列或非持久化的时间轮(服务器重启后,数据全部消失;且只适用于单机场景)

二,redis 过期监听

在 Redis 官方手册的keyspace-notifications: timing-of-expired-events中明确指出:

Basically expired events are generated when the Redis server deletes the key and not when the time to live theoretically reaches the value of zero

redis 自动过期的实现方式是:定时任务离线扫描并删除部分过期键;在访问键时惰性检查是否过期并删除过期键。redis 从未保证会在设定的过期时间立即删除并发送过期通知。实际上,过期通知晚于设定的过期时间数分钟的情况也比较常见。

此外键空间通知采用的是发送即忘(fire and forget)策略,并不像消息队列一样保证送达。当订阅事件的客户端会丢失所有在断线期间所有分发给它的事件。

这是一种比定时扫描数据库更 “LOW” 的解决方案,请不要使用。

三,Rabbitmq 死信

死信(Dead Letter) 是 rabbitmq 提供的一种机制。当一条消息满足下列条件之一那么它会成为死信:

  • 消息被否定确认(如channel.basicNack) 并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间
  • 消息队列的消息数量已经超过最大队列长度

若配置了死信队列,死信会被 rabbitmq 投到死信队列中。

在 rabbitmq 中创建死信队列的操作流程大概是:

  • 创建一个交换机作为死信交换机
  • 在业务队列中配置 x-dead-letter-exchange 和 x-dead-letter-routing-key,将第一步的交换机设为业务队列的死信交换机
  • 在死信交换机上创建队列,并监听此队列

死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。死信队列同样也没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。

为了解决这个问题,rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange ,推荐使用官方插件来做延时消息。

这里说点题外话,使用 redis 过期监听或者 rabbitmq 死信队列做延时任务都是以设计者预想之外的方式使用中间件,这种出其不意必自毙的行为通常会存在某些隐患,比如缺乏一致性和可靠性保证,吞吐量较低、资源泄漏等。比较出名的一个事例是很多人使用 redis 的 list 作为消息队列,以致于最后作者看不下去写了 disque 并最后演变为 redis stream。工作中还是尽量不要滥用中间件,用专业的组件做专业的事

四,时间轮

时间轮是一种很优秀的定时任务的数据结构,然而绝大多数时间轮实现是纯内存没有持久化的。运行时间轮的进程崩溃之后其中所有的任务都会灰飞烟灭,所以奉劝各位勇士谨慎使用。

五,Redisson delayqueue

redisson delayqueue 是一种基于 redis zset 结构的延时队列实现。delayqueue 中有一个名为 timeoutSetName 的有序集合,其中元素的 score 为投递时间戳。delayqueue 会定时使用 zrangebyscore 扫描已到投递时间的消息,然后把它们移动到就绪消息列表中。

delayqueue 保证 redis 不崩溃的情况下不会丢失消息,在没有更好的解决方案时不妨一试。

在数据库索引设计良好的情况下,定时扫描数据库中未完成的订单产生的开销并没有想象中那么大。在使用 redisson delayqueue 等定时任务中间件时可以同时使用扫描数据库的方法作为补偿机制,避免中间件故障造成任务丢失。

delayqueue 代码实现:

实体类 DelayMsgVO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayMsgVO implements Serializable {

    private static final long serialVersionUID = 1L;

    private Long id;
    private String scene;
    private String putTime;

    public void setPutTime() {
        this.putTime = DateUtil.now();
    }

    public DelayMsgVO(Long id, String scene) {
        this.id = id;
        this.scene = scene;
        this.setPutTime();
    }
}

工具类 RedissonDelayMsgUtil

import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class RedissonDelayMsgUtil {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 投递延迟消息
     * @param id 场景id
     * @param scene 场景
     * @param blockingDeque 延迟队列
     */
    public void putDelayMsg(Long id, String scene, String blockingDeque) {
        // 目标投放队列,消息到期后将被投放到此队列
        RBlockingDeque<DelayMsgVO> blockingFairQueue = redissonClient.getBlockingDeque(blockingDeque);
        RDelayedQueue<DelayMsgVO> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        // 配置30s以后将消息发送到指定队列,相当于30s后取消订单
        // 同时将DelayMsgVO传输到 blockingFairQueue中, 这时可以在blockingFairQueue中获取DelayMsgVO了
        DelayMsgVO delayMsgVO = new DelayMsgVO(id, scene);
        delayedQueue.offer(new DelayMsgVO(id, scene), 30, TimeUnit.SECONDS);
        System.out.println("delayMsgVO =========> " + delayMsgVO);
        // 在该对象不再需要的情况下,应该主动销毁。
        // 仅在相关的 Redisson 对象也需要关闭的时候可以不用主动销毁。
        delayedQueue.destroy();
    }

	/**
     * 监听延迟消息
     * @param id 场景id
     * @param scene 场景
     * @param blockingDeque 延迟队列
     */
    public void getDelayMsg(String blockingDeque) {
        RBlockingQueue<DelayMsgVO> blockingFairQueue = redissonClient.getBlockingQueue(blockingDeque);
        // 死循环监听延迟消息,实际业务中可结合定时任务工具(如 xxljob)进行处理
        while (true) {
            DelayMsgVO delayMsgVO = null;
            try {
                delayMsgVO = blockingFairQueue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("订单号:" + delayMsgVO.getId() + ",场景:" + delayMsgVO.getScene() + ",订单取消时间:" + DateUtil.now() + ",订单生成时间" + delayMsgVO.getPutTime());
        }
    }
}

测试

@Autowired
private RedissonDelayMsgUtil delayMsgUtil;

@ApiOperation(value = "延迟消息测试")
@GetMapping("/delayMsg")
public Result delayMsg() {
    Long id = null;
    String scene = "order";
    String blockingDeque = "delay_test";
    for (int i = 0; i < 10; i++) {
        id = Integer.toUnsignedLong(i);
        delayMsgUtil.putDelayMsg(id, scene, blockingDeque);
        try {
            //模拟间隔投递消息
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    delayMsgUtil.getDelayMsg(blockingDeque);
    return Result.success();
}

结果打印:

delayMsgVO =========> DelayMsgVO(id=0, scene=order, putTime=2022-06-27 15:46:00)
delayMsgVO =========> DelayMsgVO(id=1, scene=order, putTime=2022-06-27 15:46:01)
delayMsgVO =========> DelayMsgVO(id=2, scene=order, putTime=2022-06-27 15:46:02)
delayMsgVO =========> DelayMsgVO(id=3, scene=order, putTime=2022-06-27 15:46:03)
delayMsgVO =========> DelayMsgVO(id=4, scene=order, putTime=2022-06-27 15:46:04)
delayMsgVO =========> DelayMsgVO(id=5, scene=order, putTime=2022-06-27 15:46:05)
delayMsgVO =========> DelayMsgVO(id=6, scene=order, putTime=2022-06-27 15:46:06)
delayMsgVO =========> DelayMsgVO(id=7, scene=order, putTime=2022-06-27 15:46:07)
delayMsgVO =========> DelayMsgVO(id=8, scene=order, putTime=2022-06-27 15:46:08)
delayMsgVO =========> DelayMsgVO(id=9, scene=order, putTime=2022-06-27 15:46:09)
订单号:0,场景:order,订单取消时间:2022-06-27 15:46:30,订单生成时间2022-06-27 15:46:00
订单号:1,场景:order,订单取消时间:2022-06-27 15:46:31,订单生成时间2022-06-27 15:46:01
订单号:2,场景:order,订单取消时间:2022-06-27 15:46:32,订单生成时间2022-06-27 15:46:02
订单号:3,场景:order,订单取消时间:2022-06-27 15:46:33,订单生成时间2022-06-27 15:46:03
订单号:4,场景:order,订单取消时间:2022-06-27 15:46:34,订单生成时间2022-06-27 15:46:04
订单号:5,场景:order,订单取消时间:2022-06-27 15:46:35,订单生成时间2022-06-27 15:46:05
订单号:6,场景:order,订单取消时间:2022-06-27 15:46:36,订单生成时间2022-06-27 15:46:06
订单号:7,场景:order,订单取消时间:2022-06-27 15:46:37,订单生成时间2022-06-27 15:46:07
订单号:8,场景:order,订单取消时间:2022-06-27 15:46:38,订单生成时间2022-06-27 15:46:08
订单号:9,场景:order,订单取消时间:2022-06-27 15:46:40,订单生成时间2022-06-27 15:46:09

六,结论

  1. 首先推荐使用 rocketmq、pulsar 等拥有定时投递功能的消息队列。
  2. 在不方便获得专业消息队列时可以考虑使用 redisson delayqueue 等基于 redis 的延时队列方案,但要为 redis 崩溃等情况设计补偿保护机制。
  3. 在无法使用 redisson delayqueue 等方案时可以考虑使用时间轮。由于时间轮重启远比 redis 重启要频繁,定时扫库等保护机制更为重要。
  4. 永远不要使用 redis 过期监听实现定时任务。
0

评论区