前言

音讯队列是存储数据的一个中间件,能够了解为一个容器。生产者生产音讯投递 到队列中,消费者能够拉取音讯进行生产,如果消费者目前没有生产的打算,则音讯队列会保留音讯,直到消费者有生产的打算。

设计思路

生产者

  • 连贯 redis
  • 向指定通道 通过 lpush 音讯

消费者

  • 连贯 redis
  • 死循环通过 brpop 阻塞式获取音讯
  • 拿到音讯进行生产
  • 循环拿去下一个音讯

Redis

装置及启动

此步骤各位道友随便就好,不肯定要用docker 。只有保障本人能连贯到redis 服务即可。
# 应用docker 拉取redis 镜像docker pull redis:latest# 启动redis服务 # --name 前面是容器名字不便后续保护和治理 # -p 前面是指映射容器服务的 6379 端口到宿主机的 6379 端口docker run -itd --name redis-mq -p 6379:6379 redis# ============ docker 罕用基本操作(题外话) =================# 拉取镜像docker pull 镜像名称 # 查看镜像docker images# 删除镜像docker rmi 镜像名称# 查看运行容器(仅为启动中的)docker ps # 查看运行容器(蕴含未启动)docker ps -a# 启动容器docker start 容器名称/容器id# 进行容器docker stop 容器名称/容器id

Nodejs连贯

初始化工程

# 创立文件夹并进入mkdir queue-node-redis && cd queue-node-redis# yarn 初始化yarn init -y# 下载redis包,# 指定版本的起因是尽量减少道友们的失败几率 毕竟前端的工具迭代太快了yarn add [email protected]   

创立 lib 与 utils 目录

├── .gitignore├── lib├── package.json├── utils│   └── redis.js└── yarn.lock

utils/redis.js

const redis = require("redis");const redisCreateClient = async (config) => {  try {    const client = redis.createClient({      url: `redis://${config.host}:${config.port}`,    });    await client.connect();    await client.select(config.db);    console.log("redis connect success");    return client;  } catch (err) {    console.log("redis connect error");    throw err;  }};module.exports = {  redisCreateClient,};

index.js

在我的项目根目录下创立此文件,测试redis连贯是否胜利
const { redisCreateClient } = require("./utils/redis");const test = async () => {  const client = await redisCreateClient({    host: "127.0.0.1",    port: 6379,    db: 0,  });};test();

呈现如下图所示即可

minimist

轻量级的命令行参数解析引擎。

# 装置 minimistyarn add [email protected]

应用办法

const systemArg = require("minimist")(process.argv.slice(2));console.log(systemArg);
# 运行 node index.js --name test# 输入{ _: [], name: 'test' }

注释开始

从目录构造及文件创建,手把手教程

目录构造变更

├── config.js       # 配置文件├── lib│   └── index.js # 主目录入口文件├── package.json ├── utils                 # 工具函数库│   └── redis.js└── yarn.lock

config.js

配置文件思路的重要性大于代码的实现

参考nodejs进阶视频解说:进入学习

module.exports = {  // redis 配置  redis: {    default: {      host: "127.0.0.1",      port: 6379,      password: "",      db: 0,    },  },  // 音讯队列频道设置  mqList: [    {      // 音讯频道名称      name: "QUEUE_MY_MQ",      // 阻塞式取值超时配置      brPopTimeout: 100,      // 开启工作数 此配置须要 PM 启动失效      instances: 1,      // redis 配置key      redis: "default",    },  ],};

lib/index.js

针对配置做程序异样解决
const systemArg = require("minimist")(process.argv.slice(2));const config = require("../config");const { bootstrap } = require("./core");// 程序自检// 判断是否输出了 频道名称if (!systemArg.name) {  console.error("ERROR: channel name cannot be empty");  process.exit(99);}// 频道队列配置const mqConfig =  config.mqList.find((item) => item.name === systemArg.name) ?? false;// 如果config不存在if (!mqConfig) {  console.error("ERROR:  configuration not obtained");  process.exit(99);}// redis 配置const redisConfig = config.redis[mqConfig.redis];if (!redisConfig) {  console.error("ERROR: redis configuration not obtained");  process.exit(99);}// node index.js --name QUEUE_MY_MQbootstrap(mqConfig, redisConfig);

lib/core.js

前面的外围逻辑写在此处
async function bootstrap(config) {  console.log(config);}module.exports = {  bootstrap,};

外围逻辑

lib/core.js

const { redisCreateClient } = require("../utils/redis");async function bootstrap(mqConfig, redisConfig) {  try {    // 创立redis连贯    const client = await redisCreateClient(redisConfig);    // 通过死循环阻塞程序    while (true) {      let res = null;      console.log("队列执行");      try {        // 从队列中获取工作, 采纳阻塞式获取工作 最大阻塞工夫为config.queue.timeout        res = await client.brPop(mqConfig.name, mqConfig.brPopTimeout);        if (res === null) {          continue;        }        console.log("TODO:: Task processing", res);      } catch (error) {        console.log("ERROR: redis brPop error", error);        continue;      }    }  } catch (err) {    // 处理程序异样    console.log("ERROR: ", err);    process.exit(1);  }}module.exports = {  bootstrap,};

生成测试数据

为了接下来的测试,咱们学生成一些测试数据

test/mockMq.js

const { redisCreateClient } = require("../utils/redis");const config = require("../config");/** 生成 1000 条测试音讯 */const mockMq = async (key) => {  const client = await redisCreateClient(config.redis.default);  for (let i = 0; i < 1000; i++) {    // 向队列中 push 音讯    await client.lPush(key, "test" + i);  }  // 获取队列长度  const count = await client.lLen(key);  console.log(`生成1000条测试音讯实现,目前共有${count}条音讯`);  // 敞开redis连贯  client.quit();};mockMq("QUEUE_MY_MQ");

验证脚本有效性

# 执行音讯生成命令node ./test/mockMq.js# 程序输入# redis connect success# 生成 1000 条测试音讯 实现,目前共有 1000 条音讯# 执行开启消费者node ./lib/index.js --name QUEUE_MY_MQ # TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test0' }# TODO:: Task processing .......# TODO:: Task processing { key: 'QUEUE_MY_MQ', element: 'test999' }

定义Job

后记

到此为止倡议队列就实现实现了,当然前面还有一些优化。例如通过配置文件 动静引入 Job 和如何应用 Pm2 启动生产队列并且可配置启动个数、启停管制。(ps:此处的坑会很快补上)

当然除了这些,目前这个繁难队列还有很多有余。例如工作执行失败如何解决,生产后如何ack , 没有用成熟的topic 协定,没有实现延时队列。这些坑因为集体程度以及redis自身的个性 可能很长一段时间都不会填了。倡议生产用成熟的套件 例如 Kafka RabbitMq 以及一些其余更适宜以后语言的套件。