关于java:延时任务基于redis-zset的完整实现

7次阅读

共计 4106 个字符,预计需要花费 11 分钟才能阅读完成。

所谓的延时工作给大家举个例子:你买了一张火车票,必须在 30 分钟之内付款,否则该订单被主动勾销。订单 30 分钟不付款主动勾销,这个工作就是一个延时工作。 我之前曾经写过 2 篇对于延时工作的文章:

  • 《残缺实现 - 通过 DelayQueue 实现延时工作》
  • 《延时工作(二)- 基于 netty 工夫轮算法实战》

这两种办法都有一个毛病:都是基于单体利用的内存的形式运行延时工作的,一旦呈现单点故障,可能呈现延时工作数据的失落。所以此篇文章给大家介绍实现延时工作的第三种形式,联合 redis zset 实现延时工作,能够解决单点故障的问题。给出实现原理、残缺实现代码,以及这种实现形式的优缺点。

一、实现原理

首先来介绍一下实现原理,咱们须要应用 redis zset 来实现延时工作的需要,所以咱们须要晓得 zset 的利用个性。zset 作为 redis 的有序汇合数据结构存在,排序的根据就是 score。

所以咱们能够利用 zset score 这个排序的这个个性,来实现延时工作

  • 在用户下单的时候,同时生成延时工作放入 redis,key 是能够自定义的,比方:delaytask:order
  • value 的值分成两个局部,一个局部是 score 用于排序,一个局部是 member,member 的值咱们设置为订单对象(如:订单编号),因为后续延时工作时效达成的时候,咱们须要有一些必要的订单信息(如:订单编号),能力实现订单主动勾销敞开的动作。
  • 延时工作实现的重点来了,score 咱们设置为:订单生成工夫 + 延时时长。这样 redis 会对 zset 依照 score 延时工夫进行排序。
  • 开启 redis 扫描工作,获取 ” 以后工夫 > score” 的延时工作并执行。即:以后工夫 > 订单生成工夫 + 延时时长的时候,执行延时工作。

二、筹备工作

应用 redis zset 这个计划来实现延时工作的需要,首先必定是须要 redis,这一点毫无疑问。redis 的搭建网上有很多的文章,我这里就不赘述了。

其次,笔者长期的 java 类利用零碎开发都是应用 SpringBoot 来实现,所以也是习惯应用 SpringBoot 的 redis 集成计划。首先通过 maven 坐标引入 spring-boot-starter-data-redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

其次须要在 Spring Boot 的 application.yml 配置文件中,配置 redis 数据库的链接信息。我这里配置的是 redis 的单例,如果大家的生产环境是哨兵模式、或者是集群模式的 redis,这里的配置形式须要进行微调。其实这部分内容在我的集体博客外面都已经零碎的介绍过,感兴趣的敌人能够关注我的集体博客。

spring:
  redis:
    database: 0 # Redis 数据库索引(默认为 0)host: 192.168.161.3 # Redis 服务器地址
    port: 6379 # Redis 服务器连贯端口
    password: 123456 # Redis 服务器连贯明码(默认为空)timeout:  5000  # 连贯超时,单位 ms
    lettuce:
      pool:
        max-active: 8 # 连接池最大连接数(应用负值示意没有限度)默认 8
        max-wait: -1 # 连接池最大阻塞等待时间(应用负值示意没有限度)默认 -1
        max-idle: 8 # 连接池中的最大闲暇连贯 默认 8
        min-idle: 0 # 连接池中的最小闲暇连贯 默认 0

三、代码实现

上面的这个类就是延时工作的外围实现了,一共蕴含三个外围办法,咱们来一一阐明一下:

  • produce 办法,用于生成订单 -order 为订单信息,能够是订单流水号,用于延时工作达到时效后敞开订单
  • afterPropertiesSet 办法是 InitializingBean 接口的办法,之所以实现这个接口,是因为咱们须要在利用启动的时候开启 redis 扫描工作。即:当 OrderDelayService bean 初始化的时候,开启 redis 扫描工作循环获取延时工作数据。
  • consuming 函数,用于从 redis 获取延时工作数据,生产延时工作,执行超时订单敞开等操作。为了防止阻塞 for 循环,影响前面延时工作的执行,所以这个 consuming 函数肯定要做成异步的,参考 Spring Boot 异步工作及 Async 注解的应用办法。我之前写过一个 SpringBoot 的 可观测、易配置 的异步工作线程池开源我的项目,源代码地址:https://gitee.com/hanxt/zimug…。我的这个 zimug-monitor-threadpool 开源我的项目,能够做到对线程池应用状况的监控,我本人平时用的成果还不错,向大家举荐一下!

    @Component
    public class OrderDelayService  implements InitializingBean {
    //redis zset key
    public static final String ORDER_DELAY_TASK_KEY = "delaytask:order";
    
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    
    // 生成订单 -order 为订单信息,能够是订单流水号,用于延时工作达到时效后敞开订单
    public void produce(String orderSerialNo){stringRedisTemplate.opsForZSet().add(
              ORDER_DELAY_TASK_KEY,     // redis key
              orderSerialNo,    // zset  member
              //30 分钟延时
              System.currentTimeMillis() + (30 * 60 * 1000)    //zset score
      );
    }
    
    // 延时工作,也是异步工作,延时工作达到时效之后敞开订单,并将延时工作从 redis zset 删除
    @Async("test")
    public void consuming(){Set<ZSetOperations.TypedTuple<String>> orderSerialNos = stringRedisTemplate.opsForZSet().rangeByScoreWithScores(
                ORDER_DELAY_TASK_KEY,
                0,  // 延时工作 score 最小值
                System.currentTimeMillis() // 延时工作 score 最大值(以后工夫));
        if (!CollectionUtils.isEmpty(orderSerialNos)) {for (ZSetOperations.TypedTuple<String> orderSerialNo : orderSerialNos) {
            // 这里依据 orderSerialNo 去检查用户是否实现了订单领取
            // 如果用户没有领取订单,去执行订单敞开的操作
            System.out.println("订单" + orderSerialNo.getValue() + "超时被主动敞开");
            // 订单敞开之后,将订单延时工作从队列中删除
            stringRedisTemplate.opsForZSet().remove(ORDER_DELAY_TASK_KEY, orderSerialNo.getValue());
          }
        }
    }
    
    // 该类对象 Bean 实例化之后,就开启 while 扫描工作
    @Override
    public void afterPropertiesSet() throws Exception {new Thread(() -> {  // 开启新的线程,否则 SpringBoot 利用初始化无奈启动
        while(true){
          try {Thread.sleep(5 * 1000);   // 每 5 秒扫描一次 redis 库获取延时数据,不必太频繁没必要
          } catch (InterruptedException e) {e.printStackTrace();  // 本文只是示例,生产环境请做好相干的异样解决
          }
          consuming();}
      }).start();}
    }

    更多的内容参考代码中的正文,须要关注的点是:

  • 上文中的 rangeByScoreWithScores 办法用于从 redis 中获取延时工作,score 大于 0 小于以后工夫的所有延时工作,都将被从 redis 外面取出来。每 5 秒执行一次,所以延时工作的误差不会超过 5 秒。
  • 上文中的订单信息,我只保留了订单惟一流水号,用于敞开订单。如果你的业务须要传递更多的订单信息,请应用 RedisTemplate 操作订单类对象,而不是 StringRedisTemplate 操作订单流水号字符串。

订单下单的时候,应用如下的办法,将订单序列号放入 redis zset 中即可实现延时工作

orderDelayService.produce("这里填写订单编号");

四、优缺点

应用 redis zset 来实现延时工作的长处是:绝对于本文结尾介绍的两种办法,咱们的延时工作是保留在 redis 外面的,redis 具备数据长久化的机制,能够无效的防止延时工作数据的失落。另外,redis 还能够通过哨兵模式、集群模式无效的防止单点故障造成的服务中断。
至于毛病嘛,我感觉没什么毛病。如果非要勉强的说一个毛病的话,那就是咱们须要额定保护 redis 服务,减少了硬件资源的需要和运维老本。然而当初随着微服务的衰亡,redis 简直曾经成了利用零碎的标配,redis 复用即可,所以我感觉这也算不上什么毛病吧!

码文不易,如果您感觉有帮忙,请帮忙点击在看或者分享,没有您的反对我可能无奈坚持下去!
欢送关注我的布告号:字母哥杂谈,回复 003 赠送作者专栏《docker 修炼之道》的 PDF 版本,30 余篇精品 docker 文章。字母哥博客:zimug.com

正文完
 0