乐趣区

关于java:Spring-Boot-Redis-实现延时队列写得太好了

起源:blog.csdn.net/qq330983778/article/details/99341671

业务流程

首先咱们剖析下这个流程

  1. 用户提交工作。首先将工作推送至提早队列中。
  2. 提早队列接管到工作后,首先将工作推送至 job pool 中,而后计算其执行工夫。
  3. 而后生成提早工作(仅仅蕴含工作 id)放入某个桶中
  4. 工夫组件时刻轮询各个桶,当工夫达到的时候从 job pool 中取得工作元信息。
  5. 监测工作的合法性如果曾经删除则 pass。持续轮询。如果工作非法则再次计算工夫
  6. 如果非法则计算工夫,如果工夫非法:依据 topic 将工作放入对应的 ready queue,而后从 bucket 中移除。如果工夫不非法,则从新计算工夫再次放入 bucket,并移除之前的 bucket 中的内容
  7. 生产端轮询对应 topic 的 ready queue。获取 job 后做本人的业务逻辑。与此同时,服务端将曾经被生产端获取的 job 依照其设定的 TTR,从新计算执行工夫,并将其放入 bucket。
  8. 实现生产后,发送 finish 音讯,服务端依据 job id 删除对应信息。

对象

咱们当初能够理解到两头存在的几个组件

  • 提早队列,为 Redis 提早队列。实现消息传递
  • Job pool 工作池保留 job 元信息。依据文章形容应用 K / V 的数据结构,key 为 ID,value 为 job
  • Delay Bucket 用来保留业务的提早工作。文章中形容应用轮询形式放入某一个 Bucket 能够晓得其并没有应用 topic 来辨别,集体这里默认应用程序插入
  • Timer 工夫组件,负责扫描各个 Bucket。依据文章形容存在多个 Timer,然而同一个 Timer 同一时间只能扫描一个 Bucket
  • Ready Queue 负责寄存须要被实现的工作,然而依据形容依据 Topic 的不同存在多个 Ready Queue。

其中 Timer 负责轮询,Job pool、Delay Bucket、Ready Queue 都是不同职责的汇合。

工作状态

  • ready:可执行状态,
  • delay:不可执行状态,期待时钟周期。
  • reserved:已被消费者读取,但没有实现生产。
  • deleted:已被生产实现或者已被删除。

对外提供的接口

额定的内容

  1. 首先依据状态状态形容,finish 和 delete 操作都是将工作设置成 deleted 状态。
  2. 依据文章形容的操作,在执行 finish 或者 delete 的操作的时候工作曾经从元数据中移除,此时 deleted 状态可能只存在极短时间,所以理论实现中就间接删除了。
  3. 文章中并没有阐明响应超时后如何解决,所以集体当初将其从新投入了待处理队列。
  4. 文章中因为应用了集群,所以应用 redis 的 setnx 锁来保障多个工夫循环解决多个桶的时候不会呈现反复循环。这里因为是简略的实现,所以就很简略的每个桶设置一个工夫队列解决。也是为了不便简略解决。对于分布式锁能够看我之前的文章外面有形容。

实现

当初咱们依据设计内容实现设计。这一块设计咱们分四步实现

工作及相干对象

目前须要两个对象,一个是工作对象(job)一个负责保留工作援用的对象(delay job),Spring Boot 根底就不介绍了,举荐下这个实战教程:
https://github.com/javastacks…

工作对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {

    /**
     * 提早工作的惟一标识,用于检索工作
     */
    @JsonSerialize(using = ToStringSerializer.class)
    private Long id;

    /**
     * 工作类型(具体业务类型)*/
    private String topic;

    /**
     * 工作的延迟时间
     */
    private long delayTime;

    /**
     * 工作的执行超时工夫
     */
    private long ttrTime;

    /**
     * 工作具体的音讯内容,用于解决具体业务逻辑用
     */
    private String message;

    /**
     * 重试次数
     */
    private int retryCount;
    /**
     * 工作状态
     */
    private JobStatus status;
}

工作援用对象

@Data
@AllArgsConstructor
public class DelayJob implements Serializable {


    /**
     * 提早工作的惟一标识
     */
    private long jodId;
    
    /**
     * 工作的执行工夫
     */
    private long delayDate;

    /**
     * 工作类型(具体业务类型)*/
    private String topic;


    public DelayJob(Job job) {this.jodId = job.getId();
        this.delayDate = System.currentTimeMillis() + job.getDelayTime();
        this.topic = job.getTopic();}

    public DelayJob(Object value, Double score) {this.jodId = Long.parseLong(String.valueOf(value));
        this.delayDate = System.currentTimeMillis() + score.longValue();
    }
}

容器

目前咱们须要实现三个容器的创立,Job 工作池、提早工作容器、待实现工作容器

job 工作池,为一般的 K / V 构造,提供根底的操作

@Component
@Slf4j
public class JobPool {
    
    @Autowired
    private RedisTemplate redisTemplate;

    private String NAME = "job.pool";
    
    private BoundHashOperations getPool () {BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
        return ops;
    }

    /**
     * 增加工作
     * @param job
     */
    public void addJob (Job job) {log.info("工作池增加工作:{}", JSON.toJSONString(job));
        getPool().put(job.getId(),job);
        return ;
    }

    /**
     * 取得工作
     * @param jobId
     * @return
     */
    public Job getJob(Long jobId) {Object o = getPool().get(jobId);
        if (o instanceof Job) {return (Job) o;
        }
        return null;
    }

    /**
     * 移除工作
     * @param jobId
     */
    public void removeDelayJob (Long jobId) {log.info("工作池移除工作:{}",jobId);
        // 移除工作
        getPool().delete(jobId);
    }
}

提早工作,应用可排序的 ZSet 保留数据,提供取出最小值等操作

@Slf4j
@Component
public class DelayBucket {

    @Autowired
    private RedisTemplate redisTemplate;

    private static AtomicInteger index = new AtomicInteger(0);

    @Value("${thread.size}")
    private int bucketsSize;

    private List <String> bucketNames = new ArrayList <>();

    @Bean
    public List <String> createBuckets() {for (int i = 0; i < bucketsSize; i++) {bucketNames.add("bucket" + i);
        }
        return bucketNames;
    }

    /**
     * 取得桶的名称
     * @return
     */
    private String getThisBucketName() {int thisIndex = index.addAndGet(1);
        int i1 = thisIndex % bucketsSize;
        return bucketNames.get(i1);
    }

    /**
     * 取得桶汇合
     * @param bucketName
     * @return
     */
    private BoundZSetOperations getBucket(String bucketName) {return redisTemplate.boundZSetOps(bucketName);
    }

    /**
     * 放入延时工作
     * @param job
     */
    public void addDelayJob(DelayJob job) {log.info("增加提早工作:{}", JSON.toJSONString(job));
        String thisBucketName = getThisBucketName();
        BoundZSetOperations bucket = getBucket(thisBucketName);
        bucket.add(job,job.getDelayDate());
    }

    /**
     * 取得最新的延期工作
     * @return
     */
    public DelayJob getFirstDelayTime(Integer index) {String name = bucketNames.get(index);
        BoundZSetOperations bucket = getBucket(name);
        Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
        if (CollectionUtils.isEmpty(set)) {return null;}
        ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
        Object value = typedTuple.getValue();
        if (value instanceof DelayJob) {return (DelayJob) value;
        }
        return null;
    }

    /**
     * 移除延时工作
     * @param index
     * @param delayJob
     */
    public void removeDelayTime(Integer index,DelayJob delayJob) {String name = bucketNames.get(index);
        BoundZSetOperations bucket = getBucket(name);
        bucket.remove(delayJob);
    }

}

待实现工作,外部应用 topic 进行细分,每个 topic 对应一个 list 汇合

@Component
@Slf4j
public class ReadyQueue {

    @Autowired
    private RedisTemplate redisTemplate;

    private String NAME = "process.queue";

    private String getKey(String topic) {return NAME + topic;}

    /**
     * 取得队列
     * @param topic
     * @return
     */
    private BoundListOperations getQueue (String topic) {BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
        return ops;
    }

    /**
     * 设置工作
     * @param delayJob
     */
    public void pushJob(DelayJob delayJob) {log.info("执行队列增加工作:{}",delayJob);
        BoundListOperations listOperations = getQueue(delayJob.getTopic());
        listOperations.leftPush(delayJob);
    }

    /**
     * 移除并取得工作
     * @param topic
     * @return
     */
    public DelayJob popJob(String topic) {BoundListOperations listOperations = getQueue(topic);
        Object o = listOperations.leftPop();
        if (o instanceof DelayJob) {log.info("执行队列取出工作:{}", JSON.toJSONString((DelayJob) o));
            return (DelayJob) o;
        }
        return null;
    }
    
}

轮询解决

设置了线程池为每个 bucket 设置一个轮询操作

@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {

    @Autowired
    private DelayBucket delayBucket;
    @Autowired
    private JobPool     jobPool;
    @Autowired
    private ReadyQueue  readyQueue;
    
    @Value("${thread.size}")
    private int length;
    
    @Override 
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        ExecutorService executorService = new ThreadPoolExecutor(
                length, 
                length,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue <Runnable>());

        for (int i = 0; i < length; i++) {
            executorService.execute(
                    new DelayJobHandler(
                            delayBucket,
                            jobPool,
                            readyQueue,
                            i));
        }
        
    }
}

测试申请

/**
 * 测试用申请
 * @author daify
 **/
@RestController
@RequestMapping("delay")
public class DelayController {
    
    @Autowired
    private JobService jobService;
    /**
     * 增加
     * @param request
     * @return
     */
    @RequestMapping(value = "add",method = RequestMethod.POST)
    public String addDefJob(Job request) {DelayJob delayJob = jobService.addDefJob(request);
        return JSON.toJSONString(delayJob);
    }

    /**
     * 获取
     * @return
     */
    @RequestMapping(value = "pop",method = RequestMethod.GET)
    public String getProcessJob(String topic) {Job process = jobService.getProcessJob(topic);
        return JSON.toJSONString(process);
    }

    /**
     * 实现一个执行的工作
     * @param jobId
     * @return
     */
    @RequestMapping(value = "finish",method = RequestMethod.DELETE)
    public String finishJob(Long jobId) {jobService.finishJob(jobId);
        return "success";
    }

    @RequestMapping(value = "delete",method = RequestMethod.DELETE)
    public String deleteJob(Long jobId) {jobService.deleteJob(jobId);
        return "success";
    }
    
}

测试

增加提早工作

通过 postman 申请:localhost:8000/delay/add

此时这条延时工作被增加进了线程池中

2019-08-12 21:21:36.589  INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool  : 工作池增加工作:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609  INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket    : 增加提早工作:{"delayDate":1565616106609,"jodId":3,"topic":"test"}

依据设置 10 秒钟之后工作会被增加至 ReadyQueue 中

2019-08-12 21:21:46.744  INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue     : 执行队列增加工作:DelayJob(jodId=3, delayDate=1565616106609, topic=test)

取得工作

这时候咱们申请 localhost:8000/delay/pop

这个时候工作被响应,批改状态的同时设置其超时工夫,而后搁置在 DelayBucket 中

2019-08-09 19:36:02.342  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue     : 执行队列取出工作:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19:36:02.364  INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool  : 工作池增加工作:{"delayTime":10000,"id":1,"message":"提早 10 秒,超时 30 秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19:36:02.384  INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket    : 增加提早工作:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}

依照设计在 30 秒后,工作如果没有被生产将会从新搁置在 ReadyQueue 中

2019-08-12 21:21:48.239  INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue     : 执行队列取出工作:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261  INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool  : 工作池增加工作:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}

工作的删除 / 生产

当初咱们申请:localhost:8000/delay/delete

此时在 Job pool 中此工作将会被移除,此时元数据曾经不存在,但工作还在 DelayBucket 中循环,然而在循环中当检测到元数据曾经不存的话此延时工作会被移除。

2019-08-12 21:21:54.880  INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool  : 工作池移除工作:3
2019-08-12 21:21:59.104  INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler  : 移除不存在工作:{"delayDate":1565616118261,"jodId":3,"topic":"test"}

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿 (2021 最新版)

2. 别在再满屏的 if/ else 了,试试策略模式,真香!!

3. 卧槽!Java 中的 xx ≠ null 是什么新语法?

4.Spring Boot 2.5 重磅公布,光明模式太炸了!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

退出移动版