乐趣区

通过Iterator控制Promiseall的并发数

背景

异步是 js 一个非常重要的特性,但很多时候,我们不仅仅想让一系列任务并行执行,还想要控制同时执行的并发数,尤其是在针对操作有限资源的异步任务,比如文件句柄,网络端口等等。

看一个例子。

function sleep(ms) {return new Promise(resolve => setTimeout(resolve, ms));
}

// simulate an async work that takes 1s to finish
async function execute(id) {console.log(`start work ${id}`);
  await sleep(1000);
  console.log(`work ${id} done`);
}

Promise.all([1, 2, 3, 4, 5, 6, 7, 8, 9].map(execute));

输出结果:

"start work 1"
"start work 2"
"start work 3"
"start work 4"
"start work 5"
"start work 6"
"start work 7"
"start work 8"
"start work 9"
"work 1 done"
"work 2 done"
"work 3 done"
"work 4 done"
"work 5 done"
"work 6 done"
"work 7 done"
"work 8 done"
"work 9 done"

可以看到,所有的 work 都同时开始执行了。

现在,如果我们想让这些 work 每次只执行 2 个,2 个完成之后再继续后面的 2 个,即并发数为 2 应该怎么做呢?

解决方案

控制 Promise 的生成是关键

我们知道,Promise.all并不会触发 Promise 的执行,真正触发执行的是创建 Promise 本身,换句话说,Promise在生成的一瞬间就已经开始执行了!因此,如果要控制 Promise 的并发,我们就要控制 Promise 的生成。

通过 Iterator 控制并发数

常见的解决方案是通过一个函数接收 并发任务数组 并发函数 并发数 3 个参数,根据并发数,监控Promise 的完成状态,批量创建新的 Promise,从而达到控制Promise 生成的目的。

现在,我们来尝试另外一个思路,通过 Iterator 来控制并发数。

同时遍历同一个 Iterator 会发生什么?

让我们先来看一个简化的例子。

// Array.values returns an Array Iterator
const iterator = [1, 2, 3].values();

for (const x of iterator) {console.log(`loop x: ${x}`);

  for (const y of iterator) {console.log(`loop y: ${y}`);
  }
}

输出结果:

"loop x: 1"
"loop y: 2"
"loop y: 3"

注意到没有?y 循环接着 x 循环继续,并且 2 个循环都在所有元素遍历完之后结束了!这正是我们要利用的特性。
对 Iterator 不熟悉的同学可以参考 MDN 文章:https://developer.mozilla.org…

Iterator 改造 work 的例子

让我们用 Iterator 的这个特性来改造最开始的 work 例子。

// generate workers according to concurrency number
// each worker takes the same iterator
const limit = concurrency => iterator => {const workers = new Array(concurrency);
  return workers.fill(iterator);
};

// run tasks in an iterator one by one
const run = func => async iterator => {for (const item of iterator) {await func(item);
  }
};

// wrap limit and run together
function asyncTasks(array, func, concurrency = 1) {return limit(concurrency)(array.values()).map(run(func));
}

Promise.all(asyncTasks(tasks, execute, 2));

输出结果:

"start work 1"
"start work 2"
"work 1 done"
"start work 3"
"work 2 done"
"start work 4"
"work 3 done"
"start work 5"
"work 4 done"
"start work 6"
"work 5 done"
"start work 7"
"work 6 done"
"start work 8"
"work 7 done"
"start work 9"
"work 8 done"
"work 9 done"

结果和我们预想的一样,每次只同时执行两个异步任务直到所有任务都执行完毕。

不过,这个方案也不是完美无缺。主要问题在于,如果某个 worker 在执行过程中出错了,其余的 worker 并不会因此停止工作。也就是说,上面的例子中,如果 worker 1 出现异常停止了,worker 2 会独自执行剩下所有任务,直到全部完毕。因此,如果想要时刻保持 2 个并发,最简单的方法是给每个 execute 方法添加catch

尽管不够完美,将 Iterator 作为控制 Promise 创建,也不失为一种简单有效的控制异步并发数的简单方法。

当然,实际项目中,还是尽量避免重复造轮子,p-limit,async-pool 甚至 bluebird 都是简单易用的解决方案。

退出移动版