问题来由

最近在开发一个小型前端监控我的项目,因为技术栈中应用到了 Node + Redis 作为音讯队列实现,因而这里记录下在 Node 中通过 Redis 来实现音讯队列时的 应用办法注意事项

什么是音讯队列

音讯队列,是一种寄存 音讯 是队列构造,能够用来解决 分布式系统通信 从而解耦零碎模块异步工作解决申请消峰限流的问题。

既然叫做队列,那它个别是从一侧推入音讯,从另一侧生产音讯;大略是如下的流程。

在我的需要当中,我用音讯队列来做异步的入库解决。

我通过 Node 做了一个对外的日志接管层(即图中 Koa Server)用于接管上报日志,当Koa Server接管实现会立刻给用户响应 OK,因为用户是没必要去感知后端日志的入库后果的。

因而 Koa Server 收到日志后,将音讯放入 Redis 音讯队列即可。另外一端,我启动了一个 生产 程序(即上图中的日志入库模块,它也是一个 Node 脚本)来对MQ音讯进行读取并进行入库操作。

Redis 如何做音讯队列

音讯队列,其实有 2种类型。一种是基于 队列模型 的,一种是基于 订阅公布模式的。

对于 订阅公布模式 来说,是指的多个消费者都能够订阅某一个 channel 的音讯,当channel中来了音讯,所有的订阅者都会收到告诉,并且所有的订阅者都能够对同一个音讯进行解决(生产)。

对于 队列模型 来说,当音讯入队后,在另一端只出队一次,如果有多个消费者在期待这个队列,那么只有一个消费者能拿到这个音讯进行解决。

在 Redis 中,以上 2 种模型,别离通过 pub/sub 性能和 list 构造能够来实现。

对于我的日志接管场景来说,我冀望的是无论我后端有多少个 入库消费者,我心愿同一条上报只能入库一次。因而对我来说,我须要应用 队列模型 来实现音讯队列,即应用 Redis 的 List 构造。

CLI 简略试验

咱们通过 redis-cli 来简略试验下 list 构造是如何当做音讯队列的。

首先,通过 lpush 命令往 redis 中某个队列的左侧推入一条音讯:

lpush my_mq abc

这样,咱们就往 my_mq 这个队列推入了一条内容为 abc 的音讯。因为此时并没有消费者,所以这条音讯仍然存在于队列当中。咱们甚至能够再次往里推入第2条 def 音讯,并通过 llen 命令来查看以后队列的长度。

接下来,咱们在另外一个命令行窗口输出:

rpop my_mq

意思是从 my_mq 队列的右侧拿出一条音讯。后果:

阻塞模式试验

Redis 的 List 构造,为了不便大家当做音讯队列。提供了一种阻塞模式。 阻塞和非阻塞有什么区别呢?

咱们用一个新命令行窗口,去执行 阻塞期待音讯:

brpop my_mq 0

留神前面要加一个 超时工夫,0就示意始终阻塞期待。而后,咱们看到 redis 命令行就阻塞在这里了,处于期待音讯的状态:

而如果应用 rpop 非阻塞命令的话,则会返回空并间接退出期待:

因而,能够发现,阻塞非阻塞模式,最大的区别:是在于当音讯队列为空的时候,阻塞模式不会退出期待,而非阻塞模式则会间接返回空并退出期待。

brpop 正在期待的时候,咱们往队列中 push 一个音讯:

lpush my_mq 123

能够看到,阻塞模式的生产端,收到了 123 这个音讯,同时本人也退出了期待:

这阐明:

  • 阻塞模式: 当队列为空时,(即没有等到音讯时),则始终阻塞着;等到一条音讯就退出
  • 非阻塞模式:当队列为空(即没有等到音讯),也不阻塞,而是间接返回null退出

因而 redis 所谓的阻塞,是 当还未等到1条音讯时,则阻塞期待;当等到1条音讯,即立即退出;它并不会循环阻塞---即等到音讯后它就不再阻塞监听这个队列了。 这将给咱们编写 Node 代码提供一些启发。

Node 如何应用

到了重点了。咱们在 Node 中编码来应用 redis 音讯队列,跟在 cli 界面应用的形式是一样的。然而须要咱们思考如何编写 消费者 端的代码,能力实现所谓的 继续监听队列。毕竟,咱们的 消费者 是须要常驻过程,继续监听队列音讯的。并不是说 收到一个音讯就退出过程

因而,咱们须要编写一个

  • 能常驻的Node过程,可能继续的期待 redis 队列音讯
  • 当收到1条音讯,便由 Node 脚本解决;解决完要持续期待队列中下一条音讯。如此周而复始。

首先,咱们能够这样编写代码来在 Node 中创立 redis 客户端:

const redis = require('promise-redis-client')let client = redis.createClient(...options)client.on('error', err => {    console.log('redis链接出错')})client.on('ready', () => {    console.log('redis ready')})

为了实现 当redis客户端创立结束,再开启音讯队列监听,咱们把下面的代码,封装成一个模块,用 promise 形式导出:

// redis.jsconst redis = require('promise-redis-client')exports.createClient = function() {    return new Promise((resolve, reject) => {        let client = redis.createClient(...options)        client.on('error', err => {            console.log('redis 连贯出错')            reject(err)        })        client.on('ready', () => {            console.log('redis ready')            resolve(client)        })    })}

OK,接下来,咱们能够去 app.js 中编写队列的消费者代码。为了更优雅的应用 async/await,咱们能够这样来编写一个 startWait 函数:

async function startWaitMsg(client) {    ...}

而后,在 client ready 的时候,去启动它:

const { createClient } = require('./redis.js')const c = createClient()client.then(async c => {    await startWaitMsg(c)})

最难的中央在于,startWaitMsg 函数该如何编写。因为咱们应用了 promise 版本的 redis库。因而,咱们能够像这样去读取一个音讯:

async function startWaitMsg(client) {    await client.rpop('my_mq')}

但这样写的话,redis返回音讯后,node持续往后执行,最终 startWaitMsg 函数就执行完结了。只管整个 Node 过程会因为 redis 连贯未断开而不会退出,但 node 此时曾经无奈再次去执行 client.rpop 这句代码了,也因而无奈再次从音讯队列中获取新来的音讯。

循环实现继续期待

咱们想到,能够应用循环来实现 继续监听队列。于是,把代码改成:

async function startWaitMsg(client) {    while(true) {      await client.rpop('my_mq')    }}

如此便实现了 继续执行 rpop 指令。然而,如果你在 rpop 代码前面加一行日志打印的话,会察看到 client.rpop 在继续打印 null。

这是因为,rpop 指令是 非阻塞的,因而当队列没有音讯,他便返回一个 null,由此触发你的 while 循环在一直执行。这会导致咱们程序占用过多的 cpu工夫片,且对 redis 网络IO有过多的没必要的耗费。

整个while循环不停的执行,只有执行rpop这一行的时候会短暂开释一下EventLoop给其余代码,这对脚本性能影响也会较大。国家提倡节能减排,这显然不是最优雅的。

应用阻塞模式

让咱们来用上 redis 队列的阻塞模式试试。

async function startWaitMsg(c) {    while(true) {        const res = await c.brpop('my_mq', 0)        console.log('收到音讯', res)    }}

通过 brpop 指令,能够让 brpop 代码阻塞在这里。这里所谓的 阻塞 并不是对 Node 程序的阻塞,而是 redis 客户端本身的阻塞。实际上对 Node 过程来说,无论是 rpop 还是 brpop 都是 非阻塞 的异步 IO操作,只是在音讯队列为空时 rpop 底层会立即返回null,从而node过程会 resolve一个空,而 brpop 会在底层redis阻塞期待音讯,音讯达到后再给 Node 过程告诉 resolve。

因而,brpop 对 Node 来说,能够防止本人实现队列的内容轮询,能够在期待IO回调期间将cpu留给其余工作。从而大大减少 Node 过程的 CPU 耗费。

redis断开无奈持续生产的问题

在代码运行过程中,呈现了一个新的问题: redis 客户端会在某些状况下断开连接(可能因为网络等起因)。而通过剖析日志发现:一旦产生连贯异样,咱们的消费者脚本就无奈持续接管新的音讯了(我的日志入库性能生效)。

通过剖析,发现问题起因仍然在于咱们的 while 语句 和 brpop 的配合问题。

当 redis client 对象产生连贯异样时,会向以后正在期待的 brpop 代码抛出一个 reject 异样。咱们回看上述代码的 startWait 函数:

async function startWaitMsg(c) {    while(true) {        const res = await c.brpop('my_mq', 0)        console.log('收到音讯', res)    }}

如果 await brpop 这一行抛出 reject 异样,因为咱们未捕捉该异样,则异样会抛出 startWaitMsg 函数,后果就是 while 循环被退出了。

思考如何解决

事实上,当连贯呈现问题,咱们须要对 client 进行重连。不过,这个重连机制,redisclient 会主动进行,因而咱们的代码要做的仅仅只须要保障while循环能在异样时复原。于是,咱们在产生异样时,continue 一下:

async function startWaitMsg(c) {    while(true) {        let res = null        try {            res = await c.brpop('my_mq', 0)            console.log('收到音讯', res)        }        catch(err) {            console.log('brpop 出错,从新brpop')            continue        }        // ... 音讯解决工作    }}

因为 redis 客户端外部的重连过程不会再触发 reject (只是断开连接的时候触发一次),因而 continue 之后的 brpop 又会从新 "阻塞" 期待,由此,咱们的 消费者 便能够失常活着了。

最终代码

  • 客户端连贯代码文件:redis.js
const redis = require('promise-redis-client')exports.createClient = function() {    return new Promise((resolve, reject) => {        let client = redis.createClient(...options)        client.on('error', err => {            console.log('redis 连贯出错')            reject(err)        })        client.on('ready', () => {            console.log('redis ready')            resolve(client)        })    })}

app.js

const { createClient } = require('./redis.js')const c = createClient()client.then(async c => {    await startWaitMsg(c) // 启动音讯监听})async function startWaitMsg(c) {    while(true) {        let res = null        try {            res = await c.brpop('my_mq', 0)            console.log('收到音讯', res)        }        catch(err) {            console.log('brpop 出错,从新brpop')            continue        }        // ... 音讯解决工作    }}

总结

  • redis 的 list 数据结构,能够用作实现队列模式音讯队列
  • Node 中能够通过 while(true) 实现队列的继续循环监听
  • 通过 brpop 阻塞指令的应用,能够防止 cpu 空转来监听队列
  • Node 中要留神 redis 连贯断开时的错误处理,以防止因出错导致无奈从新监听队列