Spring boot + Redis(list) 模拟消息队列

使用 Spring Boot + Redis的 list,组成生产者与消费者模型,模拟出消息队列。

项目依赖

使用RedisTemplate模板方法使用Redis

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

模拟生产者消费者接口

模拟接口

//参数 message 为消费者待消费的消息对象public interface Consumer {    void consume(Object message);}//生产者参数  queue为消息队列名即 Redis的键,message为发送的消息对象 Redis的值public interface Producer {    void produce(String queue,Object message);}

具体实现需要

Consumer Producer Listener

//实现序列化 public class OrderMessage implements Serializable {    private static final long serialVersionUID = 1L;    Integer userId;    Long orderId;    Integer itemId;        //setter getter ···}
@Component@Slf4jpublic class OrderProducer implements Producer {    @Autowired    private RedisTemplate redisTemplate;    public OrderProducer(){    }    @Override    public void produce(String queue,Object message) {        redisTemplate.opsForList().leftPush(queue,message);//消息左入        log.info("添加消息到队列:{}",message.toString());    }}
@Slf4j@AllArgsConstructorpublic class OrderConsumer implements Consumer {    public OrderConsumer(){    }    @Override    public void consume(Object message) {        if(message instanceof OrderMessage){            OrderMessage msg =(OrderMessage) message;            log.info("开始消费消息{}",msg.toString());            //添加具体的消费逻辑,修改数据库什么的            log.info("消费消息{}完成",msg.toString())        }    }}
@Slf4jpublic class QueueListener implements Runnable {    private RedisTemplate redisTemplate;    private String queue;    private Consumer consumer;    /**     * 使用队列右出获取消息     * 没获取到消息则线程 sleep 一秒,减少资源浪费     * 实现了 Runnable 接口,可以作为线程任务执行     */    @Override    public void run() {        log.info("QueueListener start...queue:{}", queue);        while (RedisMQConsumerContainer.RUNNING){                        Object message = redisTemplate.opsForList().rightPop(queue,500,TimeUnit.MICROSECONDS);            if( message instanceof OrderMessage){                consumer.consume(message);            }else{                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        }    }}

添加 QueueConfiguration 与 RedisMQConsumerContainer 类,实现消息消费线程池

//使用此类将Redis键与对应消息消费者Consumer组合在一起public class QueueConfiguration {    private String queue;    private Consumer consumer;    public static Builder builder(){        return new Builder();    }    public static class Builder{        private QueueConfiguration configuration = new QueueConfiguration();        public Builder queue(String queue) {            configuration.queue = queue;            return this;        }        public Builder consumer(Consumer consumer) {            configuration.consumer = consumer;            return this;        }        public QueueConfiguration build() {            if (configuration.queue == null || configuration.queue.length() == 0) {                if (configuration.consumer != null) {                    configuration.queue = configuration.getClass().getSimpleName();                }            }            return configuration;        }    }    //setter ···}
    /**     * 具体消费者线程池实现类     *    初始化时线程池初始化,每当添加消费者时(这里实现的是一组生产者),就添加对应的线程任务     */@Slf4j@Componentpublic class RedisMQConsumerContainer {    public static boolean RUNNING;    @Autowired    private RedisTemplate redisTemplate;    private Map<String,QueueConfiguration> consumerMap= new HashMap<>();    private ExecutorService executor;    public RedisMQConsumerContainer() {        init();    }    public void addConsumer(QueueConfiguration configuration) {        if (consumerMap.containsKey(configuration.getQueue())) {            log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue());        }        if (configuration.getConsumer() == null) {            log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue());        }        consumerMap.put(configuration.getQueue(), configuration);        executor.submit(new QueueListener(redisTemplate,configuration.getQueue(),configuration.getConsumer()));        log.info("队列 {} 提交消息任务",configuration.getQueue());    }    public void destroy() {        log.info("Redis消息队列线程池关闭中");        RUNNING = false;        this.executor.shutdown();        log.info("QueueListener exiting.");        while (!this.executor.isTerminated()) {        }        log.info("QueueListener exited.");    }    public void init() {        log.info("消息队列线程池初始化");        RUNNING = true;        this.executor = Executors.newCachedThreadPool(r -> {            final AtomicInteger threadNumber = new AtomicInteger(1);            return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement());        });    }}

Schedule 定期产生消息

@Autowiredprivate RedisMQConsumerContainer mqContainer;@Scheduled(cron = "0/5 * * * * ?")//5秒执行一次@Transactionalpublic void startSeckill() {        //判断条件满足后方可添加        String queue = RedisKey.ITEM_MESSAGE.getValue()+item.getId();        Consumer orderConsumer = new OrderConsumer(itemOrderMapper,seckillService);        mqContainer.addConsumer(                QueueConfiguration.builder().queue(queue).consumer(orderConsumer).build()        );        log.info(item.getId() + "号秒杀商品预热完毕!");    }}

注;文中代码参考作者因时间久了未记录