本篇文章主要介绍如何在nodeJs中使用nsq,其他实现将在后续文章输出。
起因
前段时间做了一个网页生成pdf的node服务。由于puppteer和canvas生成过程中对内存的消耗比较大,内容量大的网页生成时间过长,对于第三方组件有时候会生成出问题等原因。
引入了nsq,使项目实现负载均衡,消除单点故障。
但是网上查找之后发现介绍node中加入nsq的方案很少,经过软膜硬泡终于把nsq引入了node中,希望能把自己的收获和大家聊一下吧。
初识nsq
NSQ是一个基于Go语言的分布式实时消息平台, 它具有分布式、去中心化的拓扑结构,支持无限水平扩展。无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。另外,NSQ非常容易配置和部署, 且支持众多的消息协议。支持多种客户端,协议简单。
nsq设计很简单,需要了解以下几个核心概念。
1、nsqd:一个负责接收、排队、转发消息到客户端的守护进程
2、nsqlookupd:管理拓扑信息, 用于收集nsqd上报的topic和channel,并提供最终一致性的发现服务的守护进程。
3、Topic:一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。
4、Channels:channel组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。
Topic只能有一个channel可以有多个,不同的channel用于分发不同的任务
下面附上一个金典nsq示意图:
安装中遇到的问题
在使用过程中发现gcc版本过低导致报错。
因为node.js 4升级了v8引擎,需要gcc版本在4.8以上。
实战开始
本项目是在eggjs基础上建立的。首先需要在项目中安装nsqjs
$ npm install nsqjs --save
在根文件app.js对nsq进行控制,nsq的配置非常简洁,nsq分为写和读两个独立的过程。
const nsq = require('nsqjs')module.exports = app => { app.beforeStart(async () => { // 实例化nsq的写操作 // 在config中配置nsq的host和port,这里是你配置的nsq地址 const writerNsq = new nsq.Writer(app.config.nsq.nsqHostWriter, app.config.nsq.writePort) // 连接nsq的写功能 writerNsq.connect() // 当写操作连接成功后,把其赋值到全局的app上以便写入信息 writerNsq.on('ready', () => { app.writerNsq = writerNsq })
当nsq写功能实现后,我们可以通过publish方法向nsq队列写入信息
ctx.app.writerNsq.publish(config.nsq.topic, { // 你所需要传递的参数 })
当服务器有空闲的时候nsq会随机分配到空闲线程上实现读操作,我们的核心业务是在读功能启用之后实现的。
读和写的初始化过程是在项目启动时候就要执行的。
项目运行过程中,我们只是不停的在进行读写操作而已。
// 实例化nsq的读操作 // 参数是对应需要读的topic和channel和应的nsq读的地址 const client = new nsq.Reader(app.config.nsq.topic, app.config.nsq.channel, { lookupdHTTPAddresses: app.config.nsq.nsqHostReader, maxInFlight: 1 }) // 连接nsq的读功能 client.connect() // 每当有消息队列进来的时候就会调用client.on方法 // message是我门在写的过程中传入的信息 client.on方法('message', async msg => { // 对写入的信息格式化 let data = JSON.parse(msg.body.toString()) try { // 为了保持连接状态,处理超时情况 const touch = () => { if (!msg.hasResponded) { msg.touch() // Touch the message again a second before the next timeout. setTimeout(touch, msg.timeUntilTimeout() - 1000) } } let timeTouch = setTimeout(touch, msg.timeUntilTimeout() - 1000) let timeFinish = setTimeout(msg.finish.bind(msg), msg.timeUntilTimeout() * 3 + 1000) // 这里是项目的核心处理部分,具体内容后面文章在说明,这里返回的url便是生成pdf的网络地址 let url = await ctx.service.pdf.index.generate(data) clearTimeout(timeTouch) clearTimeout(timeFinish) // 这里表示这个队列结束告诉nsq可以放下个兄弟进来了 msg.finish() } catch (error) { // 万一出现错误也不要阻塞,nsq在失败后会重新入队 msg.finish() // 这里可以加入网络日志 console.log(error) } }); client.on('error', function(err) { // 这里监听读操作时候发生错误情况处理 // 这里可以做一些错误处理,加错误日志 console.log(err) }); });};
刚开始说到用nsq还是有点慌的,毕竟作为一个前端工程师一脸懵逼,但是经过仔细学习,和后端大佬的请教,发现nsq其实就是一个高效队列,简易好用,对于上手来说还是比较简单的。
后续文章还会对puppteer生成pdf服务的核心业务做详细介绍,也就是上文ctx.service.pdf.index.generate(data)具体实现过程。
以上只是本人的学习总结,如有问题,请大神不吝赐教。