共计 7695 个字符,预计需要花费 20 分钟才能阅读完成。
一、场景
构想上面几个场景:
- 商品列表页中,点击商品将会进入详情页。为了实现详情页秒开,在用户点击前要预申请商品信息,假如有 100 个商品条目。如果同时发动 100 个申请,可能会将带宽打满。局部设施可能还会有申请的限度,这样会阻塞本来页面的其余失常申请。
- 一台带宽无限的机器须要对网页进行关上时长的统计,假如每天要执行几千个这样的工作,这时候如果能通过申明一个类就能够进行并发管制,那将将是比拟不便的。
基本上本次要实现的就是一个类,用于满足同一时间内,仅有制订异步工作运行的一个工作的队列。
二、实现思路
- 工作如何排队执行?只有在工作完结时执行回调就能够了
function genTask(i) {
return new Promise(resolve => {setTimeout(() => {resolve(i);
}, i * 1000);
});
}
const tasks = [genTask.bind(null, 1),
genTask.bind(null, 2),
genTask.bind(null, 3),
];
function processTask() {const task = tasks.shift();
const prom = typeof task === 'function' && task();
prom instanceof Promise && prom.then(data => {console.log(data);
}).finally(() => {processTask();
});
}
processTask();
- 多个工作排队如何并行。下面实现了单个工作的排队执行,如果是多个工作呢?其实也很简略,只有循环执行屡次 processTask 就能够简略满足了,这里异步并发量为 3 为例
// ...
// processTask(); 删除
let i = 3;
while(i--) processTask();
- 应用 class 革新。增加办法:工作通过 start 办法开始执行,并且能够通过 addTask 增加工作
// 应用类改写
class FixedThreadPool {constructor({size, tasks}) {
this.size = size;
this.tasks = tasks;
}
start() {
let i = this.size;
while(i--) this.processTask();}
addTask(...tasks) {
tasks.forEach(task => {if (typeof task === 'function') {this.tasks.push(task);
} else {console.error('task can only be function');
}
})
}
processTask() {const task = this.tasks.shift();
const prom = typeof task === 'function' && task();
prom instanceof Promise && prom.then(data => {console.log(data);
}).finally(() => {this.processTask();
});
}
}
// 测试一下
const tasks = [genTask.bind(null, 1),
genTask.bind(null, 2),
genTask.bind(null, 3),
];
// 并发数量为 3
const pool = new FixedThreadPool({
size: 3, // 并发数量为 3
tasks: [...tasks]
});
pool.start(); //(共计 3s)1s 输入 1,2s 输入 2,3s 输入 3
// 并发数量为 1
const pool2 = new FixedThreadPool({
size: 1, // 并发数量为 1
tasks: [...tasks]
});
pool2.start(); //(共计 6s)1s 输入 1,3s 输入 2,6s 输入 3
-
增加工作队列的监听。剖析下面的代码,当所有工作执行完时候,再增加工作,将不会持续被解决。因为当前情况下,如果不满足
prom instanceof Promise
,processTask 也就进行执行了,将不会有新的回调,也不会再次触发 processTask。这里有两种思路解决:- addTasks 时从新执行 start 办法
- 如果无工作,processTask 中生成一个 setTimeout,检测是不是有新工作,检测是不是有新的工作。
a 计划:当执行 start 会从新生成 size 个 processTask,假如 pool 仍在运行或者局部 processTask 依然在运行,这会导致运行数量超过 size 个。当然,能够加若干条件判断。
b 计划:如果没有工作则执行 setTimeout,过一段时间后再执行 processTask
这里抉择了 b 计划,因为实现起来简略,也不容易出错:
class FixedThreadPool {
// ...
processTask() {const task = this.tasks.shift();
const prom = typeof task === 'function' && task();
if (prom instanceof Promise) {
prom.then(data => {console.log(data);
}).finally(() => {this.processTask();
});
}
else { // 如果没有适合的工作,将会在 500ms 后再次执行
setTimeout(() => {this.processTask();
}, 500);
}
}
}
// 测试一下
const pool3 = new FixedThreadPool({
size: 3, // 并发数量为 3
tasks: [...tasks]
});
pool3.start();
setTimeout(() => {pool3.addTask(...tasks);
}, 5000); // 5s 后工作都执行完了,再增加工作
- 增加回调函数。下面收到完结的数据后只是 console.log(data),理论状况下接到返回数据,须要做一些自定义的解决。这里在类外面增加了一个回调函数 defaultCb,用于解决返回的后果
//
class FixedThreadPool {
constructor({
// ...
defaultCb = (data) => {console.log(data)} // 增加回调函数
}) {
// ...
if (typeof defaultCb === 'function') {this.defaultCb = defaultCb;} else {throw new Error('defaultCb can only be function');
}
}
// ...
processTask() {
// ...
if (prom instanceof Promise) {
prom.then(data => {// console.log(data);
this.defaultCb(data); // 替换为自定义的 callback
}).finally(() => {this.processTask();
});
}
// ...
}
}
// 自定义回调函数
const pool4 = new FixedThreadPool({
size: 3, // 并发数量为 3
tasks: [...tasks],
defaultCb: (data) => {console.log('custom callback', data);
}
});
pool4.start();
- 创立 Task 类,能够在池子里执行各种各样的工作。工作可能会有不同的生成函数、不同的参数、不同的回调函数,这时候将每个工作降级一下,生成一个 Task 类,它将领有一个 processor 用于执行异步工作,params 作为入参,callback 作为回调函数。
class Task {constructor({params, processor = () => {}, callback = () => {}}) {
this.params = params;
if (typeof processor === 'function') {this.processor = processor;}
else {throw new Error('processor must be a funtion');
}
if (typeof callback === 'function') {this.callback = callback || (() => {});
}
else {throw new Error('callback must be a funtion');
}
}
}
// 对于 Task 的降级
class FixedThreadPool {
constructor({
size,
tasks
}) {
this.size = size;
this.tasks = [];
this.addTask(...tasks); // 通过 addTask 增加,addTask 增加了工作类型的判断
}
// ...
addTask(...tasks) {
tasks.forEach(task => {if (task instanceof Task) { // 增加对于是否为 Task 实例的判断
this.tasks.push(task);
}
else {console.error('expected to be instanceof Task'); // 文案也改一下
}
})
}
// ...
processTask() {const task = this.tasks.shift();
if (task) {const prom = task.processor(task.params); // promise 将由自定义 processor 生成
if (prom instanceof Promise) {
prom.then(data => {task.callback(data); // 自定义的 callback
}).finally(() => {this.processTask();
});
}
}
else {setTimeout(() => {this.processTask();
}, 500);
}
}
}
// 测试一下
// 工作类型降级
function genTaskObj(i) {
return new Task({
params: i,
processor: (params) => {
return new Promise(resolve => {setTimeout(() => {resolve(params);
}, params * 1000);
});
},
callback: (data) => {console.log(`callback for ${i}, rst is`, data);
}
});
}
const tasks = [genTaskObj(1),
genTaskObj(2),
genTaskObj(3),
];
//
let pool5 = new FixedThreadPool({
size: 3,
tasks: [...tasks]
});
pool5.start();
- 假如 callback 也是个异步工作呢?理论状况中,callback 可能须要进行入库或者接口调用,这也是异步操作,咱们心愿在 callback 执行完结后后再进行相应的操作。如果 callback 执行后也是个异步工作(这里拿 promise 举例)这时须要将执行下一次 processTask 的机会,改为 callback 的 then 中,否则就是间接调用:
class FixedThreadPool {
// ...
processTask() {const task = this.tasks.shift();
if (task) {const prom = task.processor(task.params);
if (prom instanceof Promise) {
let cb;
prom.then(data => {cb = task.callback(data); // 失去 callback 的回调
}).finally(() => {if (cb instanceof Promise) { // 进行是否为异步工作的判断
cb.finally(() => {this.processTask();
});
}
else {this.processTask();
}
});
}
}
else {setTimeout(() => {this.processTask();
}, 500);
}
}
}
// 测试一下
// callback 也是异步的状况
function genTaskObjWithPromCb(i) {
return new Task({
params: i,
processor: (params) => {
return new Promise(resolve => {setTimeout(() => {resolve(params);
}, params * 1000);
});
},
callback: (data) => {
return new Promise(resolve => {console.log('2s later, cb will finish', data);
setTimeout(() => {console.log(data, 'finish');
resolve();}, 2000);
})
}
});
}
const tasksWithPromCb = [genTaskObjWithPromCb(1),
genTaskObjWithPromCb(2),
genTaskObjWithPromCb(3),
];
let pool6 = new FixedThreadPool({
size: 3,
tasks: [...tasksWithPromCb]
});
pool6.start();
-
是时候给工作加个刹车了。假如当结尾提到的 a 场景,用户点击了详情页,咱们的工作池就应该进行了,此处填加 stop 办法。
这里以后正在执行的工作不会被进行,然而剩下的工作将不会被执行了。
// 异步池
class FixedThreadPool {
constructor({
size,
tasks
}) {
// ...
// 增加标识
this.runningFlag = false;
this.runningProcessorCount = 0;
// ...
}
// 是否运行中判断
isRunning() {return this.runningFlag || this.runningProcessorCount > 0;}
start() {if (this.isRunning()) {return;}
this.runningFlag = true;
let i = this.size;
while(i--) {this.processTask();
this.runningProcessorCount++;
}
}
// 通知 processTask,工作不须要继续执行了
stop() {this.runningFlag = false;}
// ...
processTask() {
// 如果 flag 被置为 false,则进行执行
if (!this.runningFlag) {
this.runningProcessorCount--;
console.log('stop');
return;
}
// ...
}
}
// 测试工作进行
function genTaskObj(i) {
return new Task({
params: i,
processor: (params) => {
return new Promise(resolve => {setTimeout(() => {resolve(params);
}, params * 1000);
});
},
callback: (data) => {console.log(`callback for ${i}, rst is`, data);
}
});
}
const tasks7 = [genTaskObj(1),
genTaskObj(2),
genTaskObj(3),
];
let pool7 = new FixedThreadPool({
size: 3,
tasks: [...tasks7, ...tasks7, ...tasks7]
})
pool7.start();
setTimeout(() => {pool7.stop();
}, 3000); // 3s 后进行
三、总结
整体代码如下:
// 工作实体
class Task {constructor({params, processor = () => {}, callback = () => {}}) {
this.params = params;
if (typeof processor === 'function') {this.processor = processor;}
else {throw new Error('processor must be a funtion');
}
if (typeof callback === 'function') {this.callback = callback || (() => {});
}
else {throw new Error('callback must be a funtion');
}
}
}
// 异步池
class FixedThreadPool {
constructor({
size,
tasks
}) {
this.size = size;
this.tasks = [];
this.addTask(...tasks);
this.runningFlag = false;
this.runningProcessorCount = 0;
}
isRunning() {return this.runningFlag || this.runningProcessorCount > 0;}
start() {if (this.isRunning()) {return;}
this.runningFlag = true;
let i = this.size;
while(i--) {this.processTask();
this.runningProcessorCount++;
}
}
stop() {this.runningFlag = false;}
addTask(...tasks) {
tasks.forEach(task => {if (task instanceof Task) {this.tasks.push(task);
}
else {console.error('expected to be instanceof Task');
}
})
}
processTask() {if (!this.runningFlag) {
this.runningProcessorCount--;
console.log('stop');
return;
}
const task = this.tasks.shift();
if (task) {const prom = task.processor(task.params);
if (prom instanceof Promise) {
let cb;
prom.then(data => {cb = task.callback(data);
}).finally(() => {if (cb instanceof Promise) {cb.finally(() => {this.processTask();
});
}
else {this.processTask();
}
});
}
}
else {setTimeout(() => {this.processTask();
}, 500);
}
}
}
正文完
发表至: javascript
2021-06-21