一、场景
构想上面几个场景:
- 商品列表页中,点击商品将会进入详情页。为了实现详情页秒开,在用户点击前要预申请商品信息,假如有100个商品条目。如果同时发动100个申请,可能会将带宽打满。局部设施可能还会有申请的限度,这样会阻塞本来页面的其余失常申请。
- 一台带宽无限的机器须要对网页进行关上时长的统计,假如每天要执行几千个这样的工作,这时候如果能通过申明一个类就能够进行并发管制,那将将是比拟不便的。
基本上本次要实现的就是一个类,用于满足同一时间内,仅有制订异步工作运行的一个工作的队列。
二、实现思路
- 工作如何排队执行?只有在工作完结时执行回调就能够了
function genTask(i) { return new Promise(resolve => { setTimeout(() => { resolve(i); }, i * 1000); });}const tasks = [ genTask.bind(null, 1), genTask.bind(null, 2), genTask.bind(null, 3),];function processTask() { const task = tasks.shift(); const prom = typeof task === 'function' && task(); prom instanceof Promise && prom.then(data => { console.log(data); }).finally(() => { processTask(); });}processTask();
- 多个工作排队如何并行。下面实现了单个工作的排队执行,如果是多个工作呢?其实也很简略,只有循环执行屡次 processTask就能够简略满足了,这里异步并发量为3为例
// ...// processTask(); 删除let i = 3;while(i--) processTask();
- 应用class革新。增加办法:工作通过start办法开始执行,并且能够通过addTask增加工作
// 应用类改写class FixedThreadPool { constructor({size, tasks}) { this.size = size; this.tasks = tasks; } start() { let i = this.size; while(i--) this.processTask(); } addTask(...tasks) { tasks.forEach(task => { if (typeof task === 'function') { this.tasks.push(task); } else { console.error('task can only be function'); } }) } processTask() { const task = this.tasks.shift(); const prom = typeof task === 'function' && task(); prom instanceof Promise && prom.then(data => { console.log(data); }).finally(() => { this.processTask(); }); }}
// 测试一下const tasks = [ genTask.bind(null, 1), genTask.bind(null, 2), genTask.bind(null, 3),];// 并发数量为3const pool = new FixedThreadPool({ size: 3, // 并发数量为3 tasks: [...tasks]});pool.start(); // (共计3s)1s输入1,2s输入2,3s输入3// 并发数量为1const pool2 = new FixedThreadPool({ size: 1, // 并发数量为1 tasks: [...tasks]});pool2.start(); // (共计6s)1s输入1,3s输入2,6s输入3
增加工作队列的监听。剖析下面的代码,当所有工作执行完时候,再增加工作,将不会持续被解决。因为当前情况下,如果不满足
prom instanceof Promise
,processTask也就进行执行了,将不会有新的回调,也不会再次触发 processTask。这里有两种思路解决:- addTasks时从新执行 start办法
- 如果无工作,processTask中生成一个 setTimeout,检测是不是有新工作,检测是不是有新的工作。
a计划:当执行start会从新生成 size个processTask,假如pool 仍在运行或者局部processTask 依然在运行,这会导致运行数量超过size个。当然,能够加若干条件判断。
b 计划:如果没有工作则执行 setTimeout,过一段时间后再执行processTask
这里抉择了b计划,因为实现起来简略,也不容易出错 :
class FixedThreadPool { // ... processTask() { const task = this.tasks.shift(); const prom = typeof task === 'function' && task(); if (prom instanceof Promise) { prom.then(data => { console.log(data); }).finally(() => { this.processTask(); }); } else { // 如果没有适合的工作,将会在500ms后再次执行 setTimeout(() => { this.processTask(); }, 500); } }}
// 测试一下const pool3 = new FixedThreadPool({ size: 3, // 并发数量为3 tasks: [...tasks]});pool3.start();setTimeout(() => { pool3.addTask(...tasks);}, 5000); // 5s后工作都执行完了,再增加工作
- 增加回调函数。下面收到完结的数据后只是 console.log(data),理论状况下接到返回数据,须要做一些自定义的解决。这里在类外面增加了一个回调函数defaultCb,用于解决返回的后果
// class FixedThreadPool { constructor({ // ... defaultCb = (data) => {console.log(data)} // 增加回调函数 }) { // ... if (typeof defaultCb === 'function') { this.defaultCb = defaultCb; } else { throw new Error('defaultCb can only be function'); } } // ... processTask() { // ... if (prom instanceof Promise) { prom.then(data => { // console.log(data); this.defaultCb(data); // 替换为自定义的callback }).finally(() => { this.processTask(); }); } // ... }}
// 自定义回调函数const pool4 = new FixedThreadPool({ size: 3, // 并发数量为3 tasks: [...tasks], defaultCb: (data) => { console.log('custom callback', data); }});pool4.start();
- 创立 Task 类,能够在池子里执行各种各样的工作。工作可能会有不同的生成函数、不同的参数、不同的回调函数,这时候将每个工作降级一下,生成一个Task类,它将领有一个processor用于执行异步工作,params作为入参,callback 作为回调函数。
class Task { constructor({params, processor = () => {}, callback = () => {}}) { this.params = params; if (typeof processor === 'function') { this.processor = processor; } else { throw new Error('processor must be a funtion'); } if (typeof callback === 'function') { this.callback = callback || (() => {}); } else { throw new Error('callback must be a funtion'); } }}
// 对于Task的降级class FixedThreadPool { constructor({ size, tasks }) { this.size = size; this.tasks = []; this.addTask(...tasks); // 通过addTask增加,addTask增加了工作类型的判断 } // ... addTask(...tasks) { tasks.forEach(task => { if (task instanceof Task) { // 增加对于是否为 Task 实例的判断 this.tasks.push(task); } else { console.error('expected to be instanceof Task'); // 文案也改一下 } }) } // ... processTask() { const task = this.tasks.shift(); if (task) { const prom = task.processor(task.params); // promise将由自定义processor生成 if (prom instanceof Promise) { prom.then(data => { task.callback(data); // 自定义的callback }).finally(() => { this.processTask(); }); } } else { setTimeout(() => { this.processTask(); }, 500); } }}
// 测试一下// 工作类型降级function genTaskObj(i) { return new Task({ params: i, processor: (params) => { return new Promise(resolve => { setTimeout(() => { resolve(params); }, params * 1000); }); }, callback: (data) => { console.log(`callback for ${i}, rst is`, data); } });}const tasks = [ genTaskObj(1), genTaskObj(2), genTaskObj(3),];// let pool5 = new FixedThreadPool({ size: 3, tasks: [...tasks]});pool5.start();
- 假如callback也是个异步工作呢?理论状况中,callback可能须要进行入库或者接口调用,这也是异步操作,咱们心愿在callback执行完结后后再进行相应的操作。如果 callback 执行后也是个异步工作(这里拿 promise 举例)这时须要将执行下一次processTask的机会,改为callback的 then 中,否则就是间接调用:
class FixedThreadPool { // ... processTask() { const task = this.tasks.shift(); if (task) { const prom = task.processor(task.params); if (prom instanceof Promise) { let cb; prom.then(data => { cb = task.callback(data); // 失去callback的回调 }).finally(() => { if (cb instanceof Promise) { // 进行是否为异步工作的判断 cb.finally(() => { this.processTask(); }); } else { this.processTask(); } }); } } else { setTimeout(() => { this.processTask(); }, 500); } }}
// 测试一下// callback也是异步的状况function genTaskObjWithPromCb(i) { return new Task({ params: i, processor: (params) => { return new Promise(resolve => { setTimeout(() => { resolve(params); }, params * 1000); }); }, callback: (data) => { return new Promise(resolve => { console.log('2s later, cb will finish', data); setTimeout(() => { console.log(data, 'finish'); resolve(); }, 2000); }) } });}const tasksWithPromCb = [ genTaskObjWithPromCb(1), genTaskObjWithPromCb(2), genTaskObjWithPromCb(3),];let pool6 = new FixedThreadPool({ size: 3, tasks: [...tasksWithPromCb]});pool6.start();
是时候给工作加个刹车了。假如当结尾提到的a场景,用户点击了详情页,咱们的工作池就应该进行了,此处填加stop办法。
这里以后正在执行的工作不会被进行,然而剩下的工作将不会被执行了。
// 异步池class FixedThreadPool { constructor({ size, tasks }) { // ... // 增加标识 this.runningFlag = false; this.runningProcessorCount = 0; // ... } // 是否运行中判断 isRunning() { return this.runningFlag || this.runningProcessorCount > 0; } start() { if (this.isRunning()) { return; } this.runningFlag = true; let i = this.size; while(i--) { this.processTask(); this.runningProcessorCount++; } } // 通知processTask,工作不须要继续执行了 stop() { this.runningFlag = false; } // ... processTask() { // 如果flag被置为false,则进行执行 if (!this.runningFlag) { this.runningProcessorCount--; console.log('stop'); return; } // ... }}
// 测试工作进行function genTaskObj(i) { return new Task({ params: i, processor: (params) => { return new Promise(resolve => { setTimeout(() => { resolve(params); }, params * 1000); }); }, callback: (data) => { console.log(`callback for ${i}, rst is`, data); } });}const tasks7 = [ genTaskObj(1), genTaskObj(2), genTaskObj(3),];let pool7 = new FixedThreadPool({ size: 3, tasks: [...tasks7, ...tasks7, ...tasks7]})pool7.start();setTimeout(() => { pool7.stop();}, 3000); // 3s后进行
三、总结
整体代码如下:
// 工作实体class Task { constructor({params, processor = () => {}, callback = () => {}}) { this.params = params; if (typeof processor === 'function') { this.processor = processor; } else { throw new Error('processor must be a funtion'); } if (typeof callback === 'function') { this.callback = callback || (() => {}); } else { throw new Error('callback must be a funtion'); } }}// 异步池class FixedThreadPool { constructor({ size, tasks }) { this.size = size; this.tasks = []; this.addTask(...tasks); this.runningFlag = false; this.runningProcessorCount = 0; } isRunning() { return this.runningFlag || this.runningProcessorCount > 0; } start() { if (this.isRunning()) { return; } this.runningFlag = true; let i = this.size; while(i--) { this.processTask(); this.runningProcessorCount++; } } stop() { this.runningFlag = false; } addTask(...tasks) { tasks.forEach(task => { if (task instanceof Task) { this.tasks.push(task); } else { console.error('expected to be instanceof Task'); } }) } processTask() { if (!this.runningFlag) { this.runningProcessorCount--; console.log('stop'); return; } const task = this.tasks.shift(); if (task) { const prom = task.processor(task.params); if (prom instanceof Promise) { let cb; prom.then(data => { cb = task.callback(data); }).finally(() => { if (cb instanceof Promise) { cb.finally(() => { this.processTask(); }); } else { this.processTask(); } }); } } else { setTimeout(() => { this.processTask(); }, 500); } }}