乐趣区

Spring-boot-Redislist-模拟消息队列

Spring boot + Redis(list)模拟消息队列

使用 Spring Boot + Redis 的 list,组成生产者与消费者模型,模拟出消息队列。

项目依赖

使用 RedisTemplate 模板方法使用 Redis

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

模拟生产者消费者接口

模拟接口

// 参数 message 为消费者待消费的消息对象
public interface Consumer {void consume(Object message);
}

// 生产者参数  queue 为消息队列名即 Redis 的键,message 为发送的消息对象 Redis 的值
public interface Producer {void produce(String queue,Object message);
}

具体实现需要

Consumer Producer Listener

// 实现序列化 
public class OrderMessage implements Serializable {
    private static final long serialVersionUID = 1L;
    Integer userId;
    Long orderId;
    Integer itemId;
    
    //setter getter ···
}
@Component
@Slf4j
public class OrderProducer implements Producer {

    @Autowired
    private RedisTemplate redisTemplate;


    public OrderProducer(){}

    @Override
    public void produce(String queue,Object message) {redisTemplate.opsForList().leftPush(queue,message);// 消息左入
        log.info("添加消息到队列:{}",message.toString());
    }
}
@Slf4j
@AllArgsConstructor
public class OrderConsumer implements Consumer {public OrderConsumer(){ }

    @Override
    public void consume(Object message) {if(message instanceof OrderMessage){OrderMessage msg =(OrderMessage) message;
            log.info("开始消费消息{}",msg.toString());
            // 添加具体的消费逻辑,修改数据库什么的
            log.info("消费消息 {} 完成",msg.toString())
        }
    }
}
@Slf4j
public class QueueListener implements Runnable {

    private RedisTemplate redisTemplate;

    private String queue;

    private Consumer consumer;

    /**
     * 使用队列右出获取消息
     * 没获取到消息则线程 sleep 一秒,减少资源浪费
     * 实现了 Runnable 接口,可以作为线程任务执行
     */
    @Override
    public void run() {log.info("QueueListener start...queue:{}", queue);

        while (RedisMQConsumerContainer.RUNNING){Object message = redisTemplate.opsForList().rightPop(queue,500,TimeUnit.MICROSECONDS);
            if(message instanceof OrderMessage){consumer.consume(message);
            }else{
                try {Thread.sleep(1000);
                } catch (InterruptedException e) {e.printStackTrace();
                }
            }
        }


    }
}

添加 QueueConfiguration 与 RedisMQConsumerContainer 类,实现消息消费线程池

// 使用此类将 Redis 键与对应消息消费者 Consumer 组合在一起
public class QueueConfiguration {

    private String queue;

    private Consumer consumer;

    public static Builder builder(){return new Builder();
    }

    public static class Builder{private QueueConfiguration configuration = new QueueConfiguration();

        public Builder queue(String queue) {
            configuration.queue = queue;
            return this;
        }

        public Builder consumer(Consumer consumer) {
            configuration.consumer = consumer;
            return this;
        }

        public QueueConfiguration build() {if (configuration.queue == null || configuration.queue.length() == 0) {if (configuration.consumer != null) {configuration.queue = configuration.getClass().getSimpleName();}
            }
            return configuration;
        }
    }
    //setter ···
}
    /**
     * 具体消费者线程池实现类
     *    初始化时线程池初始化,每当添加消费者时(这里实现的是一组生产者),就添加对应的线程任务
     */
@Slf4j
@Component
public class RedisMQConsumerContainer {
    public static boolean RUNNING;

    @Autowired
    private RedisTemplate redisTemplate;

    private Map<String,QueueConfiguration> consumerMap= new HashMap<>();

    private ExecutorService executor;

    public RedisMQConsumerContainer() {init();
    }

    public void addConsumer(QueueConfiguration configuration) {if (consumerMap.containsKey(configuration.getQueue())) {log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue());
        }
        if (configuration.getConsumer() == null) {log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue());
        }
        consumerMap.put(configuration.getQueue(), configuration);
        executor.submit(new QueueListener(redisTemplate,configuration.getQueue(),configuration.getConsumer()));
        log.info("队列 {} 提交消息任务",configuration.getQueue());

    }

    public void destroy() {log.info("Redis 消息队列线程池关闭中");
        RUNNING = false;
        this.executor.shutdown();
        log.info("QueueListener exiting.");
        while (!this.executor.isTerminated()) { }
        log.info("QueueListener exited.");
    }

    public void init() {log.info("消息队列线程池初始化");
        RUNNING = true;
        this.executor = Executors.newCachedThreadPool(r -> {final AtomicInteger threadNumber = new AtomicInteger(1);
            return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement());
        });
    }
}

Schedule 定期产生消息

@Autowired
private RedisMQConsumerContainer mqContainer;

@Scheduled(cron = "0/5 * * * * ?")
// 5 秒执行一次
@Transactional
public void startSeckill() {
        // 判断条件满足后方可添加
        String queue = RedisKey.ITEM_MESSAGE.getValue()+item.getId();
        Consumer orderConsumer = new OrderConsumer(itemOrderMapper,seckillService);
        mqContainer.addConsumer(QueueConfiguration.builder().queue(queue).consumer(orderConsumer).build());
        log.info(item.getId() + "号秒杀商品预热完毕!");
    }
}

注;文中代码参考作者因时间久了未记录

退出移动版