所谓延时队列就是延时的音讯队列,上面说一下一些业务场景

实际场景

订单领取失败,每隔一段时间揭示用户

用户并发量的状况,能够延时2分钟给用户发短信

先来看看Redis实现一般的音讯队列

咱们晓得,对于业余的音讯队列中间件,如Kafka和RabbitMQ,消费者在生产音讯之前要进行一系列的繁琐过程。

如RabbitMQ发消息之前要创立 Exchange,再创立 Queue,还要将 Queue 和 Exchange 通过某种规定绑定起来,发消息的时候要指定 routingkey,还要管制头部信息

然而绝大 少数状况下,尽管咱们的音讯队列只有一组消费者,但还是须要经验下面一些过程。

有了 Redis,对于那些只有一组消费者的音讯队列,应用 Redis 就能够十分轻松的搞定。Redis 的音讯队列不是业余的音讯队列,它没有十分多的高级个性, 没有 ack 保障,如果对音讯的可靠性有着极致的谋求,那么它就不适宜应用

异步音讯队列根本实现

Redis 的 list(列表) 数据结构罕用来作为异步音讯队列应用,应用 rpush/lpush 操作入队列, 应用 lpop 和 rpop 来出队列

> rpush queue 月伴飞鱼1 月伴飞鱼2 月伴飞鱼3(integer) 3> lpop queue"月伴飞鱼1"> llen queue(integer) 2

问题1:如果队列空了

客户端是通过队列的 pop 操作来获取音讯,而后进行解决。解决完了再接着获取音讯, 再进行解决。如此周而复始,这便是作为队列消费者的客户端的生命周期。

可是如果队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop, 又没有数据。这就是节约生命的空轮询。空轮询岂但拉高了客户端的 CPU,redis 的 QPS 也 会被拉高,如果这样空轮询的客户端有几十来个,Redis 的慢查问可能会显著增多。

通常咱们应用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就能够了。岂但客户端 的 CPU 能降下来,Redis 的 QPS 也降下来了

问题2:队列提早

用下面睡眠的方法能够解决问题。同时如果只有 1 个消费者,那么这个提早就是 1s。如果有多个消费者,这个提早会有所降落,因 为每个消费者的睡觉时间是岔开来的。

有没有什么方法能显著升高提早呢?

那就是 blpop/brpop。

这两个指令的前缀字符 b 代表的是 blocking,也就是阻塞读。

阻塞读在队列没有数据的时候,会立刻进入休眠状态,一旦数据到来,则立即醒过来。消 息的提早简直为零。用 blpop/brpop 代替后面的 lpop/rpop,就完满解决了下面的问题。

问题3:闲暇连贯主动断开

其实他还有个问题须要解决—— 闲暇连贯的问题。

如果线程始终阻塞在哪里,Redis 的客户端连贯就成了闲置连贯,闲置过久,服务器个别 会被动断开连接,缩小闲置资源占用。这个时候 blpop/brpop 会抛出异样来。

所以编写客户端消费者的时候要小心,留神捕捉异样,还要重试。

分布式锁抵触解决

如果客户端在解决申请时加分布式锁没加胜利怎么办。

个别有 3 种策略来解决加锁失败:

1、间接抛出异样,告诉用户稍后重试;

2、sleep 一会再重试;

3、将申请转移至延时队列,过一会再试;

间接抛出特定类型的异样

这种形式比拟适宜由用户间接发动的申请,用户看到谬误对话框后,会先浏览对话框的内 容,再点击重试,这样就能够起到人工延时的成果。如果思考到用户体验,能够由前端的代码 代替用户本人来进行延时重试管制。它实质上是对以后申请的放弃,由用户决定是否从新发动 新的申请。

sleep

sleep 会阻塞以后的音讯解决线程,会导致队列的后续音讯解决呈现提早。如果碰撞的比 较频繁或者队列里音讯比拟多,sleep 可能并不适合。如果因为个别死锁的 key 导致加锁不成 功,线程会彻底堵死,导致后续音讯永远得不到及时处理。

延时队列

这种形式比拟适宜异步音讯解决,将以后抵触的申请扔到另一个队列延后解决以避开抵触。

延时队列的实现

咱们能够应用 zset这个命令,用设置好的工夫戳作为score进行排序,应用 zadd score1 value1 ....命令就能够始终往内存中生产音讯。再利用 zrangebysocre 查问符合条件的所有待处理的工作,通过循环执行队列工作即可。也能够通过 zrangebyscore key min max withscores limit 0 1 查问最早的一条工作,来进行生产

private Jedis jedis;public void redisDelayQueueTest() {    String key = "delay_queue";    // 理论开发倡议应用业务 ID 和随机生成的惟一 ID 作为 value, 随机生成的惟一 ID 能够保障音讯的唯一性, 业务 ID 能够防止 value 携带的信息过多    String orderId1 = UUID.randomUUID().toString();    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1);    String orderId12 = UUID.randomUUID().toString();    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2);    new Thread() {        @Override        public void run() {            while (true) {                Set<String> resultList;                // 只获取第一条数据, 只获取不会移除数据                resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1);                if (resultList.size() == 0) {                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                        break;                    }                } else {                    // 移除数据获取到的数据                    if (jedis.zrem(key, resultList.get(0)) > 0) {                        String orderId = resultList.get(0);                        log.info("orderId = {}", resultList.get(0));                        this.handleMsg(orderId);                    }                }            }        }    }.start();}public void handleMsg(T msg) {    System.out.println(msg);}

下面的实现, 在多线程逻辑上也是没有问题的, 假如有两个线程 T1, T2和其余更多线程, 解决逻辑如下, 保障了多线程状况下只有一个线程解决了对应的音讯:

1.T1, T2 和其余更多线程调用 zrangebyscore 获取到了一条音讯 A

2.T1 筹备开始删除音讯 A, 因为是原子操作, T2 和其余更多线程期待 T1 执行 zrem 删除音讯 A 后再执行 zrem 删除音讯 A

3.T1 删除了音讯 A, 返回删除胜利标记 1, 并对音讯 A 进行解决

4.T2 其余更多线程开始 zrem 删除音讯 A, 因为音讯 A 曾经被删除, 所以所有的删除均失败, 放弃了对音讯 A 的解决

同时,咱们要留神肯定要对 handle_msg 进行异样捕捉,防止因为个别工作解决问题导致循环异样退 出

进一步优化

下面的算法中同一个工作可能会被多个过程取到之后再应用 zrem 进行争抢,那些没抢到 的过程都是白取了一次工作,这是节约。能够思考应用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一起挪到服务器端进行原子化操作,这样多个过程之间争抢工作时就不 会呈现这种节约了

应用调用Lua脚本进一步优化

Lua 脚本, 如果有超时的音讯, 就删除, 并返回这条音讯, 否则返回空字符串:

String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit' , 0, 1)\n" +        "if #resultArray > 0 then\n" +        "    if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" +        "        return resultArray[1]\n" +        "    else\n" +        "        return ''\n" +        "    end\n" +        "else\n" +        "    return ''\n" +        "end";jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));

Redis延时队列劣势

Redis用来进行实现延时队列是具备这些劣势的:

1.Redis zset反对高性能的 score 排序。

2.Redis是在内存上进行操作的,速度十分快。

3.Redis能够搭建集群,当音讯很多时候,咱们能够用集群来进步音讯解决的速度,进步可用性。

4.Redis具备长久化机制,当呈现故障的时候,能够通过AOF和RDB形式来对数据进行复原,保障了数据的可靠性

Redis延时队列劣势

应用 Redis 实现的延时音讯队列也存在数据长久化, 音讯可靠性的问题

没有重试机制 - 解决音讯出现异常没有重试机制, 这些须要本人去实现, 包含重试次数的实现等

没有 ACK 机制 - 例如在获取音讯并曾经删除了音讯状况下, 正在解决音讯的时候客户端解体了, 这条正在解决的这些音讯就会失落, MQ 是须要明确的返回一个值给 MQ 才会认为这个音讯是被正确的生产了

如果对音讯可靠性要求较高, 举荐应用 MQ 来实现

Redission实现延时队列

基于Redis的Redisson分布式提早队列构造的RDelayedQueue Java对象在实现了RQueue接口的根底上提供了向队列按要求提早增加我的项目的性能。该性能能够用来实现音讯传送提早按几何增长或几何衰减的发送策略

RQueue<String> distinationQueue = ...RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);// 10秒钟当前将音讯发送到指定队列delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);// 一分钟当前将音讯发送到指定队列delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再须要的状况下,应该被动销毁。仅在相干的Redisson对象也须要敞开的时候能够不必被动销毁。

RDelayedQueue<String> delayedQueue = ...delayedQueue.destroy();

是不是很不便...............

公众号,欢送关注!!!!!!

参考

Redis Lua scripts debugger

Redis 深度历险:外围原理与利用实际