Spring boot + Redis(list)模拟消息队列
使用 Spring Boot + Redis 的 list,组成生产者与消费者模型,模拟出消息队列。
项目依赖
使用 RedisTemplate 模板方法使用 Redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
模拟生产者消费者接口
模拟接口
// 参数 message 为消费者待消费的消息对象
public interface Consumer {void consume(Object message);
}
// 生产者参数 queue 为消息队列名即 Redis 的键,message 为发送的消息对象 Redis 的值
public interface Producer {void produce(String queue,Object message);
}
具体实现需要
Consumer Producer Listener
// 实现序列化
public class OrderMessage implements Serializable {
private static final long serialVersionUID = 1L;
Integer userId;
Long orderId;
Integer itemId;
//setter getter ···
}
@Component
@Slf4j
public class OrderProducer implements Producer {
@Autowired
private RedisTemplate redisTemplate;
public OrderProducer(){}
@Override
public void produce(String queue,Object message) {redisTemplate.opsForList().leftPush(queue,message);// 消息左入
log.info("添加消息到队列:{}",message.toString());
}
}
@Slf4j
@AllArgsConstructor
public class OrderConsumer implements Consumer {public OrderConsumer(){ }
@Override
public void consume(Object message) {if(message instanceof OrderMessage){OrderMessage msg =(OrderMessage) message;
log.info("开始消费消息{}",msg.toString());
// 添加具体的消费逻辑,修改数据库什么的
log.info("消费消息 {} 完成",msg.toString())
}
}
}
@Slf4j
public class QueueListener implements Runnable {
private RedisTemplate redisTemplate;
private String queue;
private Consumer consumer;
/**
* 使用队列右出获取消息
* 没获取到消息则线程 sleep 一秒,减少资源浪费
* 实现了 Runnable 接口,可以作为线程任务执行
*/
@Override
public void run() {log.info("QueueListener start...queue:{}", queue);
while (RedisMQConsumerContainer.RUNNING){Object message = redisTemplate.opsForList().rightPop(queue,500,TimeUnit.MICROSECONDS);
if(message instanceof OrderMessage){consumer.consume(message);
}else{
try {Thread.sleep(1000);
} catch (InterruptedException e) {e.printStackTrace();
}
}
}
}
}
添加 QueueConfiguration 与 RedisMQConsumerContainer 类,实现消息消费线程池
// 使用此类将 Redis 键与对应消息消费者 Consumer 组合在一起
public class QueueConfiguration {
private String queue;
private Consumer consumer;
public static Builder builder(){return new Builder();
}
public static class Builder{private QueueConfiguration configuration = new QueueConfiguration();
public Builder queue(String queue) {
configuration.queue = queue;
return this;
}
public Builder consumer(Consumer consumer) {
configuration.consumer = consumer;
return this;
}
public QueueConfiguration build() {if (configuration.queue == null || configuration.queue.length() == 0) {if (configuration.consumer != null) {configuration.queue = configuration.getClass().getSimpleName();}
}
return configuration;
}
}
//setter ···
}
/**
* 具体消费者线程池实现类
* 初始化时线程池初始化,每当添加消费者时(这里实现的是一组生产者),就添加对应的线程任务
*/
@Slf4j
@Component
public class RedisMQConsumerContainer {
public static boolean RUNNING;
@Autowired
private RedisTemplate redisTemplate;
private Map<String,QueueConfiguration> consumerMap= new HashMap<>();
private ExecutorService executor;
public RedisMQConsumerContainer() {init();
}
public void addConsumer(QueueConfiguration configuration) {if (consumerMap.containsKey(configuration.getQueue())) {log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue());
}
if (configuration.getConsumer() == null) {log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue());
}
consumerMap.put(configuration.getQueue(), configuration);
executor.submit(new QueueListener(redisTemplate,configuration.getQueue(),configuration.getConsumer()));
log.info("队列 {} 提交消息任务",configuration.getQueue());
}
public void destroy() {log.info("Redis 消息队列线程池关闭中");
RUNNING = false;
this.executor.shutdown();
log.info("QueueListener exiting.");
while (!this.executor.isTerminated()) { }
log.info("QueueListener exited.");
}
public void init() {log.info("消息队列线程池初始化");
RUNNING = true;
this.executor = Executors.newCachedThreadPool(r -> {final AtomicInteger threadNumber = new AtomicInteger(1);
return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement());
});
}
}
Schedule 定期产生消息
@Autowired
private RedisMQConsumerContainer mqContainer;
@Scheduled(cron = "0/5 * * * * ?")
// 5 秒执行一次
@Transactional
public void startSeckill() {
// 判断条件满足后方可添加
String queue = RedisKey.ITEM_MESSAGE.getValue()+item.getId();
Consumer orderConsumer = new OrderConsumer(itemOrderMapper,seckillService);
mqContainer.addConsumer(QueueConfiguration.builder().queue(queue).consumer(orderConsumer).build());
log.info(item.getId() + "号秒杀商品预热完毕!");
}
}
注;文中代码参考作者因时间久了未记录