关于node.js:NodejsRedis实现消息队列的最佳实践

6次阅读

共计 4991 个字符,预计需要花费 13 分钟才能阅读完成。

问题来由

最近在开发一个小型前端监控我的项目,因为技术栈中应用到了 Node + Redis 作为音讯队列实现,因而这里记录下在 Node 中通过 Redis 来实现音讯队列时的 应用办法 注意事项

什么是音讯队列

音讯队列,是一种寄存 音讯 是队列构造,能够用来解决 分布式系统通信 从而解耦零碎模块 异步工作解决 申请消峰限流 的问题。

既然叫做队列,那它个别是从一侧推入音讯,从另一侧生产音讯;大略是如下的流程。

在我的需要当中,我用音讯队列来做异步的入库解决。

我通过 Node 做了一个对外的日志接管层 (即图中 Koa Server) 用于接管上报日志,当 Koa Server 接管实现会立刻给用户响应 OK,因为用户是没必要去感知后端日志的入库后果的。

因而 Koa Server 收到日志后,将音讯放入 Redis 音讯队列即可。另外一端,我启动了一个 生产 程序(即上图中的日志入库模块,它也是一个 Node 脚本) 来对 MQ 音讯进行读取并进行入库操作。

Redis 如何做音讯队列

音讯队列,其实有 2 种类型。一种是基于 队列模型 的,一种是基于 订阅公布模式 的。

对于 订阅公布模式 来说,是指的多个消费者都能够订阅某一个 channel 的音讯,当 channel 中来了音讯,所有的订阅者都会收到告诉,并且所有的订阅者都能够对同一个音讯进行解决(生产)。

对于 队列模型 来说,当音讯入队后,在另一端只出队一次,如果有多个消费者在期待这个队列,那么只有一个消费者能拿到这个音讯进行解决。

在 Redis 中,以上 2 种模型,别离通过 pub/sub 性能和 list 构造能够来实现。

对于我的日志接管场景来说,我冀望的是无论我后端有多少个 入库消费者 ,我心愿同一条上报只能入库一次。因而对我来说,我须要应用 队列模型 来实现音讯队列,即应用 Redis 的 List 构造。

CLI 简略试验

咱们通过 redis-cli 来简略试验下 list 构造是如何当做音讯队列的。

首先,通过 lpush 命令往 redis 中某个队列的左侧推入一条音讯:

lpush my_mq abc

这样,咱们就往 my_mq 这个队列推入了一条内容为 abc 的音讯。因为此时并没有消费者,所以这条音讯仍然存在于队列当中。咱们甚至能够再次往里推入第 2 条 def 音讯,并通过 llen 命令来查看以后队列的长度。

接下来,咱们在另外一个命令行窗口输出:

rpop my_mq

意思是从 my_mq 队列的右侧拿出一条音讯。后果:

阻塞模式试验

Redis 的 List 构造,为了不便大家当做音讯队列。提供了一种阻塞模式。阻塞和非阻塞有什么区别呢?

咱们用一个新命令行窗口,去执行 阻塞期待音讯:

brpop my_mq 0

留神前面要加一个 超时工夫 ,0 就示意始终阻塞期待。而后,咱们看到 redis 命令行就阻塞在这里了,处于期待音讯的状态:

而如果应用 rpop 非阻塞命令的话,则会返回空并间接退出期待:

因而,能够发现,阻塞非阻塞模式,最大的区别:是在于当音讯队列为空的时候,阻塞模式不会退出期待,而非阻塞模式则会间接返回空并退出期待。

brpop 正在期待的时候,咱们往队列中 push 一个音讯:

lpush my_mq 123

能够看到,阻塞模式的生产端,收到了 123 这个音讯,同时本人也退出了期待:

这阐明:

  • 阻塞模式: 当队列为空时,(即没有等到音讯时),则始终阻塞着;等到一条音讯就退出
  • 非阻塞模式:当队列为空(即没有等到音讯),也不阻塞,而是间接返回 null 退出

因而 redis 所谓的阻塞,是 当还未等到 1 条音讯时,则阻塞期待;当等到 1 条音讯,即立即退出;它并不会循环阻塞 — 即等到音讯后它就不再阻塞监听这个队列了。这将给咱们编写 Node 代码提供一些启发。

Node 如何应用

到了重点了。咱们在 Node 中编码来应用 redis 音讯队列,跟在 cli 界面应用的形式是一样的。然而须要咱们思考如何编写 消费者 端的代码,能力实现所谓的 继续监听队列 。毕竟,咱们的 消费者 是须要常驻过程,继续监听队列音讯的。并不是说 收到一个音讯就退出过程

因而,咱们须要编写一个

  • 能常驻的 Node 过程,可能继续的期待 redis 队列音讯
  • 当收到 1 条音讯,便由 Node 脚本解决;解决完要持续期待队列中下一条音讯。如此周而复始。

首先,咱们能够这样编写代码来在 Node 中创立 redis 客户端:

const redis = require('promise-redis-client')
let client = redis.createClient(...options)
client.on('error', err => {console.log('redis 链接出错')
})
client.on('ready', () => {console.log('redis ready')
})

为了实现 当 redis 客户端创立结束,再开启音讯队列监听,咱们把下面的代码,封装成一个模块,用 promise 形式导出:

// redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {return new Promise((resolve, reject) => {let client = redis.createClient(...options)
        client.on('error', err => {console.log('redis 连贯出错')
            reject(err)
        })
        client.on('ready', () => {console.log('redis ready')
            resolve(client)
        })
    })
}

OK,接下来,咱们能够去 app.js 中编写队列的消费者代码。为了更优雅的应用 async/await,咱们能够这样来编写一个 startWait 函数:

async function startWaitMsg(client) {...}

而后,在 client ready 的时候,去启动它:

const {createClient} = require('./redis.js')
const c = createClient()
client.then(async c => {await startWaitMsg(c)
})

最难的中央在于,startWaitMsg 函数该如何编写。因为咱们应用了 promise 版本的 redis 库。因而,咱们能够像这样去读取一个音讯:

async function startWaitMsg(client) {await client.rpop('my_mq')
}

但这样写的话,redis 返回音讯后,node 持续往后执行,最终 startWaitMsg 函数就执行完结了。只管整个 Node 过程会因为 redis 连贯未断开而不会退出,但 node 此时曾经无奈再次去执行 client.rpop 这句代码了,也因而无奈再次从音讯队列中获取新来的音讯。

循环实现继续期待

咱们想到,能够应用循环来实现 继续监听队列。于是,把代码改成:

async function startWaitMsg(client) {while(true) {await client.rpop('my_mq')
    }
}

如此便实现了 继续执行 rpop 指令。然而,如果你在 rpop 代码前面加一行日志打印的话,会察看到 client.rpop 在继续打印 null。

这是因为,rpop 指令是 非阻塞的,因而当队列没有音讯,他便返回一个 null,由此触发你的 while 循环在一直执行。这会导致咱们程序占用过多的 cpu 工夫片,且对 redis 网络 IO 有过多的没必要的耗费。

整个 while 循环不停的执行,只有执行 rpop 这一行的时候会短暂开释一下 EventLoop 给其余代码,这对脚本性能影响也会较大。国家提倡节能减排,这显然不是最优雅的。

应用阻塞模式

让咱们来用上 redis 队列的阻塞模式试试。

async function startWaitMsg(c) {while(true) {const res = await c.brpop('my_mq', 0)
        console.log('收到音讯', res)
    }
}

通过 brpop 指令,能够让 brpop 代码阻塞在这里。这里所谓的 阻塞 并不是对 Node 程序的阻塞,而是 redis 客户端本身的阻塞。实际上对 Node 过程来说,无论是 rpop 还是 brpop 都是 非阻塞 的异步 IO 操作,只是在音讯队列为空时 rpop 底层会立即返回 null,从而 node 过程会 resolve 一个空,而 brpop 会在底层 redis 阻塞期待音讯,音讯达到后再给 Node 过程告诉 resolve。

因而,brpop 对 Node 来说,能够防止本人实现队列的内容轮询,能够在期待 IO 回调期间将 cpu 留给其余工作。从而大大减少 Node 过程的 CPU 耗费。

redis 断开无奈持续生产的问题

在代码运行过程中,呈现了一个新的问题:redis 客户端会在某些状况下断开连接(可能因为网络等起因)。而通过剖析日志发现:一旦产生连贯异样,咱们的消费者脚本就无奈持续接管新的音讯了(我的日志入库性能生效)。

通过剖析,发现问题起因仍然在于咱们的 while 语句 和 brpop 的配合问题。

当 redis client 对象产生连贯异样时,会向以后正在期待的 brpop 代码抛出一个 reject 异样。咱们回看上述代码的 startWait 函数:

async function startWaitMsg(c) {while(true) {const res = await c.brpop('my_mq', 0)
        console.log('收到音讯', res)
    }
}

如果 await brpop 这一行抛出 reject 异样,因为咱们未捕捉该异样,则异样会抛出 startWaitMsg 函数,后果就是 while 循环被退出了。

思考如何解决

事实上,当连贯呈现问题,咱们须要对 client 进行重连。不过,这个重连机制,redisclient 会主动进行,因而咱们的代码要做的仅仅只须要 保障 while 循环能在异样时复原。于是,咱们在产生异样时,continue 一下:

async function startWaitMsg(c) {while(true) {
        let res = null
        try {res = await c.brpop('my_mq', 0)
            console.log('收到音讯', res)
        }
        catch(err) {console.log('brpop 出错,从新 brpop')
            continue
        }
        // ... 音讯解决工作
    }
}

因为 redis 客户端外部的重连过程不会再触发 reject (只是断开连接的时候触发一次),因而 continue 之后的 brpop 又会从新 “ 阻塞 ” 期待,由此,咱们的 消费者 便能够失常活着了。

最终代码

  • 客户端连贯代码文件:redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {return new Promise((resolve, reject) => {let client = redis.createClient(...options)
        client.on('error', err => {console.log('redis 连贯出错')
            reject(err)
        })
        client.on('ready', () => {console.log('redis ready')
            resolve(client)
        })
    })
}

app.js

const {createClient} = require('./redis.js')

const c = createClient()
client.then(async c => {await startWaitMsg(c) // 启动音讯监听
})

async function startWaitMsg(c) {while(true) {
        let res = null
        try {res = await c.brpop('my_mq', 0)
            console.log('收到音讯', res)
        }
        catch(err) {console.log('brpop 出错,从新 brpop')
            continue
        }
        // ... 音讯解决工作
    }
}

总结

  • redis 的 list 数据结构,能够用作实现 队列模式 音讯队列
  • Node 中能够通过 while(true) 实现队列的继续循环监听
  • 通过 brpop 阻塞指令的应用,能够防止 cpu 空转来监听队列
  • Node 中要留神 redis 连贯断开时的错误处理,以防止因出错导致无奈从新监听队列
正文完
 0