关于javascript:Redis-NodeJS-实现一个能处理海量数据的异步任务队列系统-25

6次阅读

共计 6885 个字符,预计需要花费 18 分钟才能阅读完成。

我的项目仓库:https://github.com/jrainlau/n…

前言

在最近的业务中,接到了一个须要解决约十万条数据的需要。这些数据都以字符串的模式给到,并且解决它们的步骤是异步且耗时的(均匀解决一条数据须要 25s 的工夫)。如果以串行的形式实现,其耗时是相当长的:

总耗时工夫 = 数据量 × 单条数据处理工夫

T = N * t (N = 100,000; t = 25s)

总耗时工夫 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天

显然,咱们不能简略地把数据一条一条地解决。那么有没有方法可能缩小解决的工夫呢?通过调研后发现,应用异步工作队列是个不错的方法。

一、异步工作队列原理

咱们能够把“解决单条数据”了解为一个异步工作,因而对这十万条数据的解决,就能够转化成有十万个异步工作期待进行。咱们能够把这十万条数据塞到一个队列外面,让工作处理器自发地从队列外面去获得并实现。

工作处理器能够有多个,它们同时从队列外面把工作取走并解决。当工作队列为空,示意所有工作曾经被认领完;当所有工作处理器实现工作,则示意所有工作曾经被解决完。

其基本原理如下图所示:

首先来解决工作队列的问题。在这个需要中,工作队列外面的每一个工作,都蕴含了待处理的数据,数据以字符串的模式存在。为了不便起见,咱们能够应用 Redis 的 List 数据格式来寄存这些工作。

因为我的项目是基于 NodeJS 的,咱们能够利用 PM2 的 Cluster 模式来启动多个工作处理器,并行地解决工作。以一个 8 核的 CPU 为例,如果齐全开启了多过程,其实践解决工夫将晋升 8 倍,从 29 天缩短到 3.6 天。

接下来,咱们会从理论编码的角度来解说上述内容的实现过程。

二、应用 NodeJS 操作 Redis

异步工作队列应用 Redis 来实现,因而咱们须要部署一个独自的 Redis 服务。在本地开发中为了疾速实现 Redis 的装置,我应用了 Docker 的方法(默认机器曾经装置了 Docker)。

  1. Docker 拉取 Redis 镜像
docker pull redis:latest
  1. Docker 启动 Redis
docker run -itd --name redis-local -p 6379:6379 redis

此时咱们曾经应用 Docker 启动了一个 Redis 服务,其对外的 IP 及端口为 127.0.0.1:6379。此外,咱们还能够在本地装置一个名为 Another Redis DeskTop Manager
的 Redis 可视化工具,来实时查看、批改 Redis 的内容。

在 NodeJS 中,咱们能够应用 node-redis 来操作 Redis。新建一个 mqclient.ts 文件并写入如下内容:

import * as Redis from 'redis'

const client = Redis.createClient({
  host: '127.0.0.1',
  port: 6379
})

export default client

Redis 实质上是一个数据库,而咱们对数据库的操作无非就是增删改查。node-redis 反对 Redis 的所有交互操作形式,然而操作后果默认是以回调函数的模式返回。为了可能应用 async/await,咱们能够新建一个 utils.ts 文件,把 node-redis 操作 Redis 的各种操作都封装成 Promise 的模式,不便咱们后续应用。

import client from './mqClient'

// 获取 Redis 中某个 key 的内容
export const getRedisValue = (key: string): Promise<string | null> => new Promise(resolve => client.get(key, (err, reply) => resolve(reply)))
// 设置 Redis 中某个 key 的内容
export const setRedisValue = (key: string, value: string) => new Promise(resolve => client.set(key, value, resolve))
// 删除 Redis 中某个 key 及其内容
export const delRedisKey = (key: string) => new Promise(resolve => client.del(key, resolve))

除此之外,还能在 utils.ts 中搁置其余罕用的工具办法,以实现代码的复用、保障代码的整洁。

为了在 Redis 中创立工作队列,咱们能够独自写一个 createTasks.ts 的脚本,用于往队列中塞入自定义的工作。

import {TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey} from './utils'
import client from './mqClient'

client.on('ready', async () => {await delRedisKey(TASK_NAME)
  for (let i = TASK_AMOUNT; i > 0 ; i--) {client.lpush(TASK_NAME, `task-${i}`)
  }
  
  client.lrange(TASK_NAME, 0, TASK_AMOUNT, async (err, reply) => {if (err) {console.error(err)
      return
    }
    console.log(reply)
    process.exit()})
})

在这段脚本中,咱们从 utils.ts 中获取了各个 Redis 操作的办法,以及工作的名称 TASK_NAME(此处为 local_tasks)和工作的总数 TASK_AMOUNT(此处为 20 个)。通过 LPUSH 办法往 TASK_NAME 的 List 当中塞入内容为 task-1task-20 的工作,如图所示:

三、异步工作解决

首先新建一个 index.ts 文件,作为整个异步工作队列解决零碎的入口文件。

import taskHandler from './tasksHandler'
import client from './mqClient'

client.on('connect', () => {console.log('Redis is connected!')
})
client.on('ready', async () => {console.log('Redis is ready!')
  await taskHandler()})
client.on('error', (e) => {console.log('Redis error!' + e)
})

在运行该文件时,会主动连贯 Redis,并且在 ready 状态时执行工作处理器 taskHandler()

在上一节的操作中,咱们往工作队列外面增加了 20 个工作,每个工作都是形如 task-n 的字符串。为了验证异步工作的实现,咱们能够在工作处理器 taskHandler.ts 中写一段 demo 函数,来模仿真正的异步工作:

  function handleTask(task: string) {return new Promise((resolve) => {setTimeout(async () => {console.log(`Handling task: ${task}...`)
        resolve()}, 2000)
    })
  }

下面这个 handleTask() 函数,将会在执行的 2 秒后打印出当前任务的内容,并返回一个 Promise,很好地模仿了异步函数的实现形式。接下来咱们将会围绕这个函数,来解决队列中的工作。

其实到了这一步为止,整个异步工作队列解决零碎曾经根本实现了,只须要在 taskHandler.ts 中补充一点点代码即可:

import {popTask} from './utils'
import client from './mqClient'

function handleTask(task: string) {/* ... */}

export default async function tasksHandler() {
  // 从队列中取出一个工作
  const task = await popTask()
  // 解决工作
  await handleTask(task)
  // 递归运行
  await tasksHandler()}

最初,咱们应用 PM2 启动 4 个过程,来试着跑一下整个我的项目:

pm2 start ./dist/index.js -i 4 && pm2 logs

能够看到,4 个工作处理器别离解决完了队列中的所有工作,互相之前互不影响。

事到如今曾经功败垂成了吗?未必。为了测试咱们的这套零碎到底晋升了多少的效率,还须要统计实现队列外面所有工作的总耗时。

四、统计工作实现耗时

要统计工作实现的耗时,只须要实现下列的公式即可:

 总耗时 = 最初一个工作的实现工夫 - 首个工作被获得的工夫 

首先来解决“ 获取首个工作被获得的工夫 ”这个问题。

因为咱们是通过 PM2 的 Cluster 模式来启动利用的,且从 Redis 队列中读取工作是个异步操作,因而在多过程运行的状况下无奈间接保障从队列中读取工作的先后顺序,必须通过一个额定的标记来判断。其原理如下图:

如图所示,绿色的 worker 因为无奈保障运行的先后顺序,所以编号用问号来示意。当第一个工作被获得时,把黄色的标记值从 false 设置成 true。当且仅当黄色的标记值为 false 时才会设置工夫。这样一来,当其余工作被获得时,因为黄色的标记值曾经是 true 了,因而无奈设置工夫,所以咱们便能失去首个工作被获得的工夫。

在本文的例子中,黄色的标记值和首个工作被获得的工夫也被寄存在 Redis 中,别离被命名为 local_tasks_SET_FIRSTlocal_tasks_BEGIN_TIME

原理曾经弄懂,然而在实践中还有一个中央值得注意。咱们晓得,从 Redis 中读写数据也是一个异步操作。因为咱们有多个 worker 但只有一个 Redis,那么在读取黄色标记值的时候很可能会呈现“抵触”的问题。举个例子,当 worker-1 批改标记值为 true 的同时,worker-2 正好在读取标记值。因为工夫的关系,可能 worker-2 读到的标记值仍然是 false,那么这就抵触了。为了解决这个问题,咱们能够应用 node-redlock 这个工具来实现“锁”的操作。

顾名思义,“锁”的操作能够了解为当 worker-1 读取并批改标记值的时候,不容许其余 worker 读取该值,也就是把标记值给锁住了。当 worker-1 实现标记值的批改时会开释锁,此时才容许其余的 worker 去读取该标记值。

node-redlock 是 Redis 分布式锁 Redlock 算法的 JavaScript 实现,对于该算法的解说可参考 https://redis.io/topics/distlock。

值得注意的是,在 node-redlock 在应用的过程中,如果要锁一个已存在的 key,就必须为该 key 增加一个前缀 locks:,否则会报错。

回到 utils.ts,编写一个 setBeginTime() 的工具函数:

export const setBeginTime = async (redlock: Redlock) => {
  // 读取标记值前先把它锁住
  const lock = await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)
  const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)
   // 当且仅当标记值不等于 true 时,才设置起始工夫
  if (setFirst !== 'true') {console.log(`${pm2tips} Get the first task!`)
    await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')
    await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)
  }
  // 实现标记值的读写操作后,开释锁
  await lock.unlock().catch(e => e)
}

而后把它增加到 taskHandler() 函数外面即可:

export default async function tasksHandler() {
+  // 获取第一个工作被获得的工夫
+  await setBeginTime(redlock)
  // 从队列中取出一个工作
  const task = await popTask()
  // 解决工作
  await handleTask(task)
  // 递归运行
  await tasksHandler()}

接下来解决“ 最初一个工作的实现工夫 ”这个问题。

相似上一个问题,因为工作执行的先后顺序无奈保障,异步操作的实现工夫也无奈保障,因而咱们也须要一个额定的标识来记录工作的实现状况。在 Redis 中创立一个初始值为 0 的标识 local_tasks_CUR_INDEX,当 worker 实现一个工作就让标识加。因为工作队列的初始长度是已知的(为 TASK_AMOUNT 常量,也写入了 Redis 的 local_tasks_TOTAL 中),因而当标识的值等于队列初始长度的值时,即可表明所有工作都曾经实现。

如图所示,被实现的工作都会让黄色的标识加一,任何时候只有判断到标识的值等于队列的初始长度值,即可表明工作曾经全副实现。

回到 taskHandler() 函数,退出下列内容:

export default async function tasksHandler() {
+  // 获取标识值和队列初始长度
+  let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))
+  const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))
+  // 期待新工作
+  if (taskAmount === 0) {+    console.log(`${pm2tips} Wating new tasks...`)
+    await sleep(2000)
+    await tasksHandler()
+    return
+  }
+  // 判断所有工作曾经实现
+  if (curIndex === taskAmount) {+    const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)
+    // 获取总耗时
+    const cost = new Date().getTime() - Number(beginTime)
+    console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)
+    // 初始化 Redis 的一些标识值
+    await setRedisValue(`${TASK_NAME}_TOTAL`, '0') 
+    await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')
+    await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')
+    await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)
+    await sleep(2000)
+    await tasksHandler()}
  // 获取第一个工作被获得的工夫
  await setBeginTime(redlock)
  // 从队列中取出一个工作
  const task = await popTask()
  // 解决工作
  await handleTask(task)
+ // 工作实现后须要为标识位加一
+  try {+    const lock = await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)
+    curIndex = await getCurIndex()
+    await setCurIndex(curIndex + 1)
+    await lock.unlock().catch((e) => e)
+  } catch (e) {+    console.log(e)
+  }
+  // recursion
+  await tasksHandler()
+}
  // 递归运行
  await tasksHandler()}

到这一步为止,咱们曾经解决了获取“ 最初一个工作的实现工夫 ”的问题,再联合后面的首个工作被获得的工夫,便能得出运行的总耗时。


最初来看一下理论的运行成果。咱们循例往队列外面增加了 task-1task-20 这 20 个工作,而后启动 4 个过程来跑:

运行状况良好。从运行后果来看,4 个过程解决 20 个均匀耗时 2 秒的工作,只须要 10 秒的工夫,完全符合构想。

五、小结

当面对海量的异步工作须要解决的时候,多过程 + 工作队列的形式是一个不错的解决形式。本文通过摸索 Redis + NodeJS 联合的形式,结构出了一个异步工作队列解决零碎,能较好地实现最后计划的构想,但仍然有很多问题须要改良。比如说当工作出错了应该怎么办,零碎是否反对不同类型的工作,是否运行多个队列等等,都是值得思考的问题。如果读者敌人有更好的想法,欢送留言和我交换!

(完)

正文完
 0