共计 2266 个字符,预计需要花费 6 分钟才能阅读完成。
背景
异步是 js 一个非常重要的特性,但很多时候,我们不仅仅想让一系列任务并行执行,还想要控制同时执行的并发数,尤其是在针对操作有限资源的异步任务,比如文件句柄,网络端口等等。
看一个例子。
function sleep(ms) {return new Promise(resolve => setTimeout(resolve, ms));
}
// simulate an async work that takes 1s to finish
async function execute(id) {console.log(`start work ${id}`);
await sleep(1000);
console.log(`work ${id} done`);
}
Promise.all([1, 2, 3, 4, 5, 6, 7, 8, 9].map(execute));
输出结果:
"start work 1"
"start work 2"
"start work 3"
"start work 4"
"start work 5"
"start work 6"
"start work 7"
"start work 8"
"start work 9"
"work 1 done"
"work 2 done"
"work 3 done"
"work 4 done"
"work 5 done"
"work 6 done"
"work 7 done"
"work 8 done"
"work 9 done"
可以看到,所有的 work 都同时开始执行了。
现在,如果我们想让这些 work 每次只执行 2 个,2 个完成之后再继续后面的 2 个,即并发数为 2 应该怎么做呢?
解决方案
控制 Promise
的生成是关键
我们知道,Promise.all
并不会触发 Promise
的执行,真正触发执行的是创建 Promise
本身,换句话说,Promise
在生成的一瞬间就已经开始执行了!因此,如果要控制 Promise
的并发,我们就要控制 Promise
的生成。
通过 Iterator
控制并发数
常见的解决方案是通过一个函数接收 并发任务数组
, 并发函数
, 并发数
3 个参数,根据并发数,监控Promise
的完成状态,批量创建新的 Promise
,从而达到控制Promise
生成的目的。
现在,我们来尝试另外一个思路,通过 Iterator
来控制并发数。
同时遍历同一个 Iterator
会发生什么?
让我们先来看一个简化的例子。
// Array.values returns an Array Iterator
const iterator = [1, 2, 3].values();
for (const x of iterator) {console.log(`loop x: ${x}`);
for (const y of iterator) {console.log(`loop y: ${y}`);
}
}
输出结果:
"loop x: 1"
"loop y: 2"
"loop y: 3"
注意到没有?y 循环接着 x 循环继续,并且 2 个循环都在所有元素遍历完之后结束了!这正是我们要利用的特性。
对 Iterator 不熟悉的同学可以参考 MDN 文章:https://developer.mozilla.org…
用 Iterator
改造 work 的例子
让我们用 Iterator
的这个特性来改造最开始的 work 例子。
// generate workers according to concurrency number
// each worker takes the same iterator
const limit = concurrency => iterator => {const workers = new Array(concurrency);
return workers.fill(iterator);
};
// run tasks in an iterator one by one
const run = func => async iterator => {for (const item of iterator) {await func(item);
}
};
// wrap limit and run together
function asyncTasks(array, func, concurrency = 1) {return limit(concurrency)(array.values()).map(run(func));
}
Promise.all(asyncTasks(tasks, execute, 2));
输出结果:
"start work 1"
"start work 2"
"work 1 done"
"start work 3"
"work 2 done"
"start work 4"
"work 3 done"
"start work 5"
"work 4 done"
"start work 6"
"work 5 done"
"start work 7"
"work 6 done"
"start work 8"
"work 7 done"
"start work 9"
"work 8 done"
"work 9 done"
结果和我们预想的一样,每次只同时执行两个异步任务直到所有任务都执行完毕。
不过,这个方案也不是完美无缺。主要问题在于,如果某个 worker 在执行过程中出错了,其余的 worker 并不会因此停止工作。也就是说,上面的例子中,如果 worker 1 出现异常停止了,worker 2 会独自执行剩下所有任务,直到全部完毕。因此,如果想要时刻保持 2 个并发,最简单的方法是给每个 execute
方法添加catch
。
尽管不够完美,将 Iterator
作为控制 Promise
创建,也不失为一种简单有效的控制异步并发数的简单方法。
当然,实际项目中,还是尽量避免重复造轮子,p-limit,async-pool 甚至 bluebird 都是简单易用的解决方案。