并发管制的概念置信大家都十分相熟,比方浏览器申请的并发管制等。明天,咱们联合 async-pool 这个开源工具来看看如何实现一个简略的并发管制。
async-pool 的代码分为 es6 和 es7 两个版本,都非常简单,咱们次要基于 es6 版本进行阐明。
在去除参数校验等逻辑当前,外围代码如下,十分短小精悍:
function asyncPool(poolLimit, array, iteratorFn) { let i = 0; const ret = []; const executing = []; const enqueue = function() { if (i === array.length) { return Promise.resolve(); } const item = array[i++]; const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); let r = Promise.resolve(); if (poolLimit <= array.length) { const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); } } return r.then(() => enqueue()); }; return enqueue().then(() => Promise.all(ret));}
asyncPool
反对三个参数,第一个是并发数量,第二个是一组申请输出,第三个是返回 promise 的迭代函数。咱们举一个例子来进行阐明。
假如咱们当初有 500 个申请须要发送,并发数量管制是 50。那么咱们能够这样应用 asyncPool
:
asyncPool(50, [/* 500 个申请的参数数据 */], () => {/* 发动申请的函数 */})
咱们当初来具体阐明 asyncPool
的工作原理。
首先,asyncPool
中初始化了两个数组,ret
保留返回后果,其程序要与输出程序统一,executing
用于记录以后正在执行的申请。
asyncPool
中创立了一个 enqueue
函数,负责具体的并发管制逻辑。
在 enqueue
函数中,通过变量 i
来一一获取申请输出参数,调用迭代函数发动申请,而后将返回的 promise 保留在 ret
中。
const item = array[i++];const p = Promise.resolve().then(() => iteratorFn(item, array));ret.push(p);
之后就是并发数量管制的外围逻辑:
let r = Promise.resolve();if (poolLimit <= array.length) { const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { r = Promise.race(executing); }}return r.then(() => enqueue());
如果并发数量限度大于要发动的申请数量,则无需通过 executing
数组来记录正在执行的申请,间接循环发动申请即可。
如果并发数量限度小于要发动的申请数量,则首先通过之前调用迭代函数返回的 promise 生成一个新的 promise,放入 executing
中。在这个新的 promise 实现时,将其从 executing
中删除。
如果 executing
数组长度大于并发数量管制,则应用 Promise.race(executing)
获取最先返回的 promsie,并通过它进行下一次迭代。
通过变量 r
咱们能够看到,在整个循环过程中,enqueue
函数会造成一个 promise 链,在最初一个 promise 返回之后,asyncPool
通过 Promise.all
将所有的后果返回。
return enqueue().then(() => Promise.all(ret));
至此,async-pool
的外围逻辑咱们就剖析完了。下面的剖析过程是基于 es6 版本的代码,es7 版本更加简洁,如下,看官们能够自行剖析:
async function asyncPool(poolLimit, array, iteratorFn) { const ret = []; const executing = []; for (const item of array) { const p = Promise.resolve().then(() => iteratorFn(item, array)); ret.push(p); if (poolLimit <= array.length) { const e = p.then(() => executing.splice(executing.indexOf(e), 1)); executing.push(e); if (executing.length >= poolLimit) { await Promise.race(executing); } } } return Promise.all(ret);}
咱们晓得,不论是 Promise.race
还是 Promise.all
,只有有一个 promise 达到 Fufilled
或者 Rejected
状态,整个就会返回。这在接口申请的的场景中是不适合的。咱们应该如何革新呢?
其实也非常简单,只有在迭代函数的调用处做一些非凡解决即可。
iteratorFn(item, array).then(resp => resp).catch(error => error);
常见面试知识点、技术解决方案、教程,都能够扫码关注公众号“众里千寻”获取,或者来这里 https://everfind.github.io 。