关于redis:Redis-使用-List-实现消息队列能保证消息可靠么

47次阅读

共计 4078 个字符,预计需要花费 11 分钟才能阅读完成。

分布式系统中必备的一个中间件就是音讯队列,通过音讯队列咱们能对服务间进行异步解耦、流量消峰、实现最终一致性。

目前市面上曾经有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人会问:“Redis 适宜做音讯队列么?”

在答复这个问题之前,咱们先从实质思考:

  • 音讯队列提供了什么个性?
  • Redis 如何实现音讯队列?是否满足存取需要?

明天,码哥联合音讯队列的特点一步步带大家剖析应用 Redis 的 List 作为音讯队列的实现原理,并分享如何把 SpringBoot 与 Redission 整合使用到我的项目中。

什么是音讯队列

音讯队列是一种异步的服务间通信形式,实用于分布式和微服务架构。音讯在被解决和删除之前始终存储在队列上。

每条音讯仅可被一位用户解决一次。音讯队列可被用于拆散重量级解决、缓冲或批处理工作以及缓解高峰期工作负载。

  • Producer:音讯生产者,负责产生和发送音讯到 Broker;
  • Broker:音讯解决核心。负责音讯存储、确认、重试等,个别其中会蕴含多个 queue;
  • Consumer:音讯消费者,负责从 Broker 中获取音讯,并进行相应解决;

音讯队列的应用场景有哪些呢?

音讯队列在理论利用中包含如下四个场景:

  • 利用耦合:发送方、接管方零碎之间不须要理解单方,只须要意识音讯。多利用间通过音讯队列对同一音讯进行解决,防止调用接口失败导致整个过程失败;
  • 异步解决:多利用对音讯队列中同一音讯进行解决,利用间并发解决音讯,相比串行解决,缩小解决工夫;
  • 限流削峰:广泛应用于秒杀或抢购流动中,防止流量过大导致利用零碎挂掉的状况;
  • 音讯驱动的零碎:零碎分为音讯队列、音讯生产者、音讯消费者,生产者负责产生音讯,消费者 (可能有多个) 负责对音讯进行解决;

音讯队列满足哪些个性

音讯有序性

音讯是异步解决的,然而消费者须要依照生产者发送音讯的程序来生产,避免出现后发送的音讯被先解决的状况。

反复音讯解决

生产者可能因为网络问题呈现音讯重传导致消费者可能会收到多条反复音讯。

同样的音讯反复屡次的话可能会造成一业务逻辑屡次执行,须要确保如何防止反复生产问题。

可靠性

一次保障音讯的传递。如果发送音讯时接收者不可用,音讯队列会保留音讯,直到胜利地传递它。

当消费者重启后,能够持续读取音讯进行解决,避免音讯脱漏。

List 实现音讯队列

Redis 的列表(List)是一种线性的有序构造,能够依照元素被推入列表中的程序来存储元素,能满足「先进先出」的需要,这些元素既能够是文字数据,又能够是二进制数据。

LPUSH

生产者应用 LPUSH key element[element...] 将音讯插入到队列的头部,如果 key 不存在则会创立一个空的队列再插入音讯。

如下,生产者向队列 queue 先后插入了「Java」「码哥字节」「Go」,返回值示意音讯插入队列后的个数。

> LPUSH queue Java 码哥字节 Go
(integer) 3

RPOP

消费者应用 RPOP key 顺次读取队列的音讯,先进先出,所以「Java」会先读取生产:

> RPOP queue
"Java"
> RPOP queue
"码哥字节"
> RPOP queue
"Go"

实时生产问题

65 哥:这么简略就实现了么?

别快乐的太早,LPUSH、RPOP 存在一个性能危险,生产者向队列插入数据的时候,List 并不会被动告诉消费者及时生产。

咱们须要写一个 while(true) 不停地调用 RPOP 指令,当有新音讯就会返回音讯,否则返回空。

程序须要一直轮询并判断是否为空再执行生产逻辑,这就会导致即便没有新音讯写入到队列,消费者也要不停地调用 RPOP 命令占用 CPU 资源。

65 哥:要如何防止循环调用导致的 CPU 性能损耗呢?

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令,消费者在在读取队列没有数据的时候主动阻塞,直到有新的音讯写入队列,才会持续读取新音讯执行业务逻辑。

BRPOP queue 0

参数 0 示意阻塞等待时间无无限度

反复生产

  • 音讯队列为每一条音讯生成一个「全局 ID」;
  • 生产者为每一条音讯创立一条「全局 ID」,消费者把一件解决过的音讯 ID 记录下来判断是否反复。

其实这就是幂等,对于同一条音讯,消费者收到后处理一次的后果和屡次的后果是统一的。

音讯可靠性

65 哥:消费者从 List 中读取一条在音讯处理过程中宕机了就会导致音讯没有解决实现,可是数据曾经没有保留在 List 中了咋办?

实质就是消费者在解决音讯的时候解体了,就无奈再还原音讯,不足一个音讯确认机制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令,含意是从 List 从读取音讯的同时把这条音讯复制到另一个 List 中(备份),并且是原子操作。

咱们就能够在业务流程正确处理实现后再删除队列音讯实现音讯确认机制。如果在解决音讯的时候宕机了,重启后再从备份 List 中读取音讯解决。

LPUSH redisMQ 公众号 码哥字节
BRPOPLPUSH redisMQ redisMQBack

生产者用 LPUSH 把音讯插入到 redisMQ 队列中,消费者应用 BRPOPLPUSH 读取音讯「公众号」,同时该音讯会被插入到「redisMQBack」队列中。

如果生产胜利则把「redisMQBack」的音讯删除即可,异样的话能够持续从「redisMQBack」再次读取音讯解决。

须要留神的是,如果生产者音讯发送的很快,而消费者处理速度慢就会导致音讯沉积,给 Redis 的内存带来过大压力。

Redission 实战

在 Java 中,咱们能够利用 Redission 封装的 API 来疾速实现队列,接下来码哥基于 SpringBoot 2.1.4 版本来交大家如何整合并实战。

具体 API 文档大家可查阅:https://github.com/redisson/r…

增加依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>

增加 Redis 配置,码哥的 Redis 没有配置明码,大家依据理论状况配置即可。

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false

Java 代码实战

RBlockingDeque 继承 java.util.concurrent.BlockingDeque,在应用过程中咱们齐全能够依据接口文档来抉择适合的 API 去实现业务逻辑。

次要办法如下

码哥采纳了双端队列来举例

@Slf4j
@Service
public class QueueService {

    @Autowired
    private RedissonClient redissonClient;

    private static final String REDIS_MQ = "redisMQ";

    /**
     * 发送音讯到队列头部
     *
     * @param message
     */
    public void sendMessage(String message) {RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);

        try {blockingDeque.putFirst(message);
            log.info("将音讯: {} 插入到队列。", message);
        } catch (InterruptedException e) {e.printStackTrace();
        }
    }

    /**
     * 从队列尾部阻塞读取音讯,若没有音讯,线程就会阻塞期待新音讯插入,避免 CPU 空转
     */
    public void onMessage() {RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ);
        while (true) {
            try {String message = blockingDeque.takeLast();
                log.info("从队列 {} 中读取到音讯:{}.", REDIS_MQ, message);
            } catch (InterruptedException e) {e.printStackTrace();
            }

        }
    }

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {

    @Autowired
    private QueueService queueService;

    @Test
    public void testQueue() throws InterruptedException {new Thread(() -> {for (int i = 0; i < 1000; i++) {queueService.sendMessage("音讯" + i);
            }
        }).start();

        new Thread(() -> queueService.onMessage()).start();

        Thread.currentThread().join();
    }


}

总结

能够应用 List 数据结构来实现音讯队列,满足先进先出。为了实现音讯可靠性,Redis 提供了 BRPOPLPUSH 命令是解决。

Redis 是一个十分轻量级的键值数据库,部署一个 Redis 实例就是启动一个过程,部署 Redis 集群,也就是部署多个 Redis 实例。

而 Kafka、RabbitMQ 部署时,波及额定的组件,例如 Kafka 的运行就须要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 个别被认为是重量级的音讯队列。

须要留神的是,咱们要防止生产者过快,消费者过慢导致的音讯沉积占用 Redis 的内存。

在音讯量不大的状况下应用 Redis 作为音讯队列,他能给咱们带来高性能的音讯读写,这仿佛也是一个很好消息队列解决方案。

正文完
 0