一、场景

构想上面几个场景:

  1. 商品列表页中,点击商品将会进入详情页。为了实现详情页秒开,在用户点击前要预申请商品信息,假如有100个商品条目。如果同时发动100个申请,可能会将带宽打满。局部设施可能还会有申请的限度,这样会阻塞本来页面的其余失常申请。
  2. 一台带宽无限的机器须要对网页进行关上时长的统计,假如每天要执行几千个这样的工作,这时候如果能通过申明一个类就能够进行并发管制,那将将是比拟不便的。

基本上本次要实现的就是一个类,用于满足同一时间内,仅有制订异步工作运行的一个工作的队列。

二、实现思路

  1. 工作如何排队执行?只有在工作完结时执行回调就能够了
function genTask(i) {    return new Promise(resolve => {        setTimeout(() => {            resolve(i);        }, i * 1000);    });}const tasks = [    genTask.bind(null, 1),    genTask.bind(null, 2),    genTask.bind(null, 3),];function processTask() {    const task = tasks.shift();    const prom = typeof task === 'function' && task();    prom instanceof Promise && prom.then(data => {        console.log(data);    }).finally(() => {        processTask();    });}processTask();
  1. 多个工作排队如何并行。下面实现了单个工作的排队执行,如果是多个工作呢?其实也很简略,只有循环执行屡次 processTask就能够简略满足了,这里异步并发量为3为例
// ...// processTask(); 删除let i = 3;while(i--) processTask();
  1. 应用class革新。增加办法:工作通过start办法开始执行,并且能够通过addTask增加工作
// 应用类改写class FixedThreadPool {    constructor({size, tasks}) {        this.size = size;        this.tasks = tasks;    }    start() {        let i = this.size;        while(i--) this.processTask();    }    addTask(...tasks) {        tasks.forEach(task => {            if (typeof task === 'function') {                this.tasks.push(task);            } else {                console.error('task can only be function');            }        })    }    processTask() {        const task = this.tasks.shift();        const prom = typeof task === 'function' && task();        prom instanceof Promise && prom.then(data => {            console.log(data);        }).finally(() => {            this.processTask();        });    }}
// 测试一下const tasks = [    genTask.bind(null, 1),    genTask.bind(null, 2),    genTask.bind(null, 3),];// 并发数量为3const pool = new FixedThreadPool({    size: 3, // 并发数量为3    tasks: [...tasks]});pool.start(); // (共计3s)1s输入1,2s输入2,3s输入3// 并发数量为1const pool2 = new FixedThreadPool({    size: 1, // 并发数量为1    tasks: [...tasks]});pool2.start(); // (共计6s)1s输入1,3s输入2,6s输入3
  1. 增加工作队列的监听。剖析下面的代码,当所有工作执行完时候,再增加工作,将不会持续被解决。因为当前情况下,如果不满足prom instanceof Promise,processTask也就进行执行了,将不会有新的回调,也不会再次触发 processTask。这里有两种思路解决:

    1. addTasks时从新执行 start办法
    2. 如果无工作,processTask中生成一个 setTimeout,检测是不是有新工作,检测是不是有新的工作。

    a计划:当执行start会从新生成 size个processTask,假如pool 仍在运行或者局部processTask 依然在运行,这会导致运行数量超过size个。当然,能够加若干条件判断。

    b 计划:如果没有工作则执行 setTimeout,过一段时间后再执行processTask

    这里抉择了b计划,因为实现起来简略,也不容易出错 :

class FixedThreadPool {    // ...    processTask() {        const task = this.tasks.shift();        const prom = typeof task === 'function' && task();        if (prom instanceof Promise) {            prom.then(data => {                console.log(data);            }).finally(() => {                this.processTask();            });        }        else { // 如果没有适合的工作,将会在500ms后再次执行            setTimeout(() => {                this.processTask();            }, 500);        }    }}
// 测试一下const pool3 = new FixedThreadPool({    size: 3, // 并发数量为3    tasks: [...tasks]});pool3.start();setTimeout(() => {    pool3.addTask(...tasks);}, 5000); // 5s后工作都执行完了,再增加工作
  1. 增加回调函数。下面收到完结的数据后只是 console.log(data),理论状况下接到返回数据,须要做一些自定义的解决。这里在类外面增加了一个回调函数defaultCb,用于解决返回的后果
// class FixedThreadPool {    constructor({        // ...        defaultCb = (data) => {console.log(data)} // 增加回调函数    }) {        // ...        if (typeof defaultCb === 'function') {            this.defaultCb = defaultCb;        } else {            throw new Error('defaultCb can only be function');        }    }       // ...    processTask() {        // ...        if (prom instanceof Promise) {            prom.then(data => {                // console.log(data);                 this.defaultCb(data); // 替换为自定义的callback            }).finally(() => {                this.processTask();            });        }        // ...    }}
// 自定义回调函数const pool4 = new FixedThreadPool({    size: 3, // 并发数量为3    tasks: [...tasks],    defaultCb: (data) => {        console.log('custom callback', data);    }});pool4.start();
  1. 创立 Task 类,能够在池子里执行各种各样的工作。工作可能会有不同的生成函数、不同的参数、不同的回调函数,这时候将每个工作降级一下,生成一个Task类,它将领有一个processor用于执行异步工作,params作为入参,callback 作为回调函数。
class Task {    constructor({params, processor = () => {}, callback = () => {}}) {        this.params = params;        if (typeof processor === 'function') {            this.processor = processor;        }        else {            throw new Error('processor must be a funtion');        }        if (typeof callback === 'function') {            this.callback = callback || (() => {});        }        else {            throw new Error('callback must be a funtion');        }    }}
// 对于Task的降级class FixedThreadPool {    constructor({        size,        tasks    }) {        this.size = size;        this.tasks = [];        this.addTask(...tasks); // 通过addTask增加,addTask增加了工作类型的判断    }    // ...    addTask(...tasks) {        tasks.forEach(task => {            if (task instanceof Task) { // 增加对于是否为 Task 实例的判断                this.tasks.push(task);            }            else {                console.error('expected to be instanceof Task'); // 文案也改一下            }        })    }        // ...    processTask() {        const task = this.tasks.shift();        if (task) {            const prom = task.processor(task.params); // promise将由自定义processor生成            if (prom instanceof Promise) {                prom.then(data => {                    task.callback(data); // 自定义的callback                }).finally(() => {                    this.processTask();                });            }        }        else {            setTimeout(() => {                this.processTask();            }, 500);        }    }}
// 测试一下// 工作类型降级function genTaskObj(i) {    return new Task({        params: i,        processor: (params) => {            return new Promise(resolve => {                setTimeout(() => {                    resolve(params);                }, params * 1000);            });        },        callback: (data) => {            console.log(`callback for ${i}, rst is`, data);        }    });}const tasks = [    genTaskObj(1),    genTaskObj(2),    genTaskObj(3),];// let pool5 = new FixedThreadPool({    size: 3,    tasks: [...tasks]});pool5.start();
  1. 假如callback也是个异步工作呢?理论状况中,callback可能须要进行入库或者接口调用,这也是异步操作,咱们心愿在callback执行完结后后再进行相应的操作。如果 callback 执行后也是个异步工作(这里拿 promise 举例)这时须要将执行下一次processTask的机会,改为callback的 then 中,否则就是间接调用:
class FixedThreadPool {    // ...    processTask() {        const task = this.tasks.shift();        if (task) {            const prom = task.processor(task.params);            if (prom instanceof Promise) {                let cb;                prom.then(data => {                    cb = task.callback(data); // 失去callback的回调                }).finally(() => {                    if (cb instanceof Promise) { // 进行是否为异步工作的判断                        cb.finally(() => {                            this.processTask();                        });                    }                    else {                        this.processTask();                    }                });            }        }        else {            setTimeout(() => {                this.processTask();            }, 500);        }    }}
// 测试一下// callback也是异步的状况function genTaskObjWithPromCb(i) {    return new Task({        params: i,        processor: (params) => {            return new Promise(resolve => {                setTimeout(() => {                    resolve(params);                }, params * 1000);            });        },        callback: (data) => {            return new Promise(resolve => {                console.log('2s later, cb will finish', data);                setTimeout(() => {                    console.log(data, 'finish');                    resolve();                }, 2000);            })        }    });}const tasksWithPromCb = [    genTaskObjWithPromCb(1),    genTaskObjWithPromCb(2),    genTaskObjWithPromCb(3),];let pool6 = new FixedThreadPool({    size: 3,    tasks: [...tasksWithPromCb]});pool6.start();
  1. 是时候给工作加个刹车了。假如当结尾提到的a场景,用户点击了详情页,咱们的工作池就应该进行了,此处填加stop办法。

    这里以后正在执行的工作不会被进行,然而剩下的工作将不会被执行了。

// 异步池class FixedThreadPool {    constructor({        size,        tasks    }) {        // ...                // 增加标识        this.runningFlag = false;        this.runningProcessorCount = 0;        // ...    }    // 是否运行中判断    isRunning() {        return this.runningFlag || this.runningProcessorCount > 0;    }    start() {        if (this.isRunning()) {            return;        }                this.runningFlag = true;        let i = this.size;        while(i--) {            this.processTask();            this.runningProcessorCount++;        }    }    // 通知processTask,工作不须要继续执行了    stop() {        this.runningFlag = false;    }    // ...    processTask() {        // 如果flag被置为false,则进行执行        if (!this.runningFlag) {            this.runningProcessorCount--;            console.log('stop');            return;        }        // ...    }}
// 测试工作进行function genTaskObj(i) {    return new Task({        params: i,        processor: (params) => {            return new Promise(resolve => {                setTimeout(() => {                    resolve(params);                }, params * 1000);            });        },        callback: (data) => {            console.log(`callback for ${i}, rst is`, data);        }    });}const tasks7 = [    genTaskObj(1),    genTaskObj(2),    genTaskObj(3),];let pool7 = new FixedThreadPool({    size: 3,    tasks: [...tasks7, ...tasks7, ...tasks7]})pool7.start();setTimeout(() => {    pool7.stop();}, 3000); // 3s后进行

三、总结

整体代码如下:

// 工作实体class Task {    constructor({params, processor = () => {}, callback = () => {}}) {        this.params = params;        if (typeof processor === 'function') {            this.processor = processor;        }        else {            throw new Error('processor must be a funtion');        }        if (typeof callback === 'function') {            this.callback = callback || (() => {});        }        else {            throw new Error('callback must be a funtion');        }    }}// 异步池class FixedThreadPool {    constructor({        size,        tasks    }) {        this.size = size;        this.tasks = [];        this.addTask(...tasks);        this.runningFlag = false;        this.runningProcessorCount = 0;    }    isRunning() {        return this.runningFlag || this.runningProcessorCount > 0;    }    start() {        if (this.isRunning()) {            return;        }        this.runningFlag = true;        let i = this.size;        while(i--) {            this.processTask();            this.runningProcessorCount++;        }    }    stop() {        this.runningFlag = false;    }    addTask(...tasks) {        tasks.forEach(task => {            if (task instanceof Task) {                this.tasks.push(task);            }            else {                console.error('expected to be instanceof Task');            }        })    }    processTask() {        if (!this.runningFlag) {            this.runningProcessorCount--;            console.log('stop');            return;        }        const task = this.tasks.shift();        if (task) {            const prom = task.processor(task.params);            if (prom instanceof Promise) {                let cb;                prom.then(data => {                    cb = task.callback(data);                }).finally(() => {                    if (cb instanceof Promise) {                        cb.finally(() => {                            this.processTask();                        });                    }                    else {                        this.processTask();                    }                });            }        }        else {            setTimeout(() => {                this.processTask();            }, 500);        }    }}