Spring boot + Redis(list) 模拟消息队列
使用 Spring Boot + Redis的 list,组成生产者与消费者模型,模拟出消息队列。
项目依赖
使用RedisTemplate模板方法使用Redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>
模拟生产者消费者接口
模拟接口
//参数 message 为消费者待消费的消息对象public interface Consumer { void consume(Object message);}//生产者参数 queue为消息队列名即 Redis的键,message为发送的消息对象 Redis的值public interface Producer { void produce(String queue,Object message);}
具体实现需要
Consumer Producer Listener
//实现序列化 public class OrderMessage implements Serializable { private static final long serialVersionUID = 1L; Integer userId; Long orderId; Integer itemId; //setter getter ···}
@Component@Slf4jpublic class OrderProducer implements Producer { @Autowired private RedisTemplate redisTemplate; public OrderProducer(){ } @Override public void produce(String queue,Object message) { redisTemplate.opsForList().leftPush(queue,message);//消息左入 log.info("添加消息到队列:{}",message.toString()); }}
@Slf4j@AllArgsConstructorpublic class OrderConsumer implements Consumer { public OrderConsumer(){ } @Override public void consume(Object message) { if(message instanceof OrderMessage){ OrderMessage msg =(OrderMessage) message; log.info("开始消费消息{}",msg.toString()); //添加具体的消费逻辑,修改数据库什么的 log.info("消费消息{}完成",msg.toString()) } }}
@Slf4jpublic class QueueListener implements Runnable { private RedisTemplate redisTemplate; private String queue; private Consumer consumer; /** * 使用队列右出获取消息 * 没获取到消息则线程 sleep 一秒,减少资源浪费 * 实现了 Runnable 接口,可以作为线程任务执行 */ @Override public void run() { log.info("QueueListener start...queue:{}", queue); while (RedisMQConsumerContainer.RUNNING){ Object message = redisTemplate.opsForList().rightPop(queue,500,TimeUnit.MICROSECONDS); if( message instanceof OrderMessage){ consumer.consume(message); }else{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }}
添加 QueueConfiguration 与 RedisMQConsumerContainer 类,实现消息消费线程池
//使用此类将Redis键与对应消息消费者Consumer组合在一起public class QueueConfiguration { private String queue; private Consumer consumer; public static Builder builder(){ return new Builder(); } public static class Builder{ private QueueConfiguration configuration = new QueueConfiguration(); public Builder queue(String queue) { configuration.queue = queue; return this; } public Builder consumer(Consumer consumer) { configuration.consumer = consumer; return this; } public QueueConfiguration build() { if (configuration.queue == null || configuration.queue.length() == 0) { if (configuration.consumer != null) { configuration.queue = configuration.getClass().getSimpleName(); } } return configuration; } } //setter ···}
/** * 具体消费者线程池实现类 * 初始化时线程池初始化,每当添加消费者时(这里实现的是一组生产者),就添加对应的线程任务 */@Slf4j@Componentpublic class RedisMQConsumerContainer { public static boolean RUNNING; @Autowired private RedisTemplate redisTemplate; private Map<String,QueueConfiguration> consumerMap= new HashMap<>(); private ExecutorService executor; public RedisMQConsumerContainer() { init(); } public void addConsumer(QueueConfiguration configuration) { if (consumerMap.containsKey(configuration.getQueue())) { log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue()); } if (configuration.getConsumer() == null) { log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue()); } consumerMap.put(configuration.getQueue(), configuration); executor.submit(new QueueListener(redisTemplate,configuration.getQueue(),configuration.getConsumer())); log.info("队列 {} 提交消息任务",configuration.getQueue()); } public void destroy() { log.info("Redis消息队列线程池关闭中"); RUNNING = false; this.executor.shutdown(); log.info("QueueListener exiting."); while (!this.executor.isTerminated()) { } log.info("QueueListener exited."); } public void init() { log.info("消息队列线程池初始化"); RUNNING = true; this.executor = Executors.newCachedThreadPool(r -> { final AtomicInteger threadNumber = new AtomicInteger(1); return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement()); }); }}
Schedule 定期产生消息
@Autowiredprivate RedisMQConsumerContainer mqContainer;@Scheduled(cron = "0/5 * * * * ?")//5秒执行一次@Transactionalpublic void startSeckill() { //判断条件满足后方可添加 String queue = RedisKey.ITEM_MESSAGE.getValue()+item.getId(); Consumer orderConsumer = new OrderConsumer(itemOrderMapper,seckillService); mqContainer.addConsumer( QueueConfiguration.builder().queue(queue).consumer(orderConsumer).build() ); log.info(item.getId() + "号秒杀商品预热完毕!"); }}
注;文中代码参考作者因时间久了未记录