关于javascript:js实现线程池限制异步任务数量

41次阅读

共计 7695 个字符,预计需要花费 20 分钟才能阅读完成。

一、场景

构想上面几个场景:

  1. 商品列表页中,点击商品将会进入详情页。为了实现详情页秒开,在用户点击前要预申请商品信息,假如有 100 个商品条目。如果同时发动 100 个申请,可能会将带宽打满。局部设施可能还会有申请的限度,这样会阻塞本来页面的其余失常申请。
  2. 一台带宽无限的机器须要对网页进行关上时长的统计,假如每天要执行几千个这样的工作,这时候如果能通过申明一个类就能够进行并发管制,那将将是比拟不便的。

基本上本次要实现的就是一个类,用于满足同一时间内,仅有制订异步工作运行的一个工作的队列。

二、实现思路

  1. 工作如何排队执行?只有在工作完结时执行回调就能够了
function genTask(i) {
    return new Promise(resolve => {setTimeout(() => {resolve(i);
        }, i * 1000);
    });
}
const tasks = [genTask.bind(null, 1),
    genTask.bind(null, 2),
    genTask.bind(null, 3),
];

function processTask() {const task = tasks.shift();
    const prom = typeof task === 'function' && task();
    prom instanceof Promise && prom.then(data => {console.log(data);
    }).finally(() => {processTask();
    });
}

processTask();
  1. 多个工作排队如何并行。下面实现了单个工作的排队执行,如果是多个工作呢?其实也很简略,只有循环执行屡次 processTask 就能够简略满足了,这里异步并发量为 3 为例
// ...

// processTask(); 删除
let i = 3;
while(i--) processTask();
  1. 应用 class 革新。增加办法:工作通过 start 办法开始执行,并且能够通过 addTask 增加工作
// 应用类改写
class FixedThreadPool {constructor({size, tasks}) {
        this.size = size;
        this.tasks = tasks;
    }

    start() {
        let i = this.size;
        while(i--) this.processTask();}

    addTask(...tasks) {
        tasks.forEach(task => {if (typeof task === 'function') {this.tasks.push(task);
            } else {console.error('task can only be function');
            }
        })
    }

    processTask() {const task = this.tasks.shift();
        const prom = typeof task === 'function' && task();
        prom instanceof Promise && prom.then(data => {console.log(data);
        }).finally(() => {this.processTask();
        });
    }
}
// 测试一下
const tasks = [genTask.bind(null, 1),
    genTask.bind(null, 2),
    genTask.bind(null, 3),
];
// 并发数量为 3
const pool = new FixedThreadPool({
    size: 3, // 并发数量为 3
    tasks: [...tasks]
});
pool.start(); //(共计 3s)1s 输入 1,2s 输入 2,3s 输入 3

// 并发数量为 1
const pool2 = new FixedThreadPool({
    size: 1, // 并发数量为 1
    tasks: [...tasks]
});
pool2.start(); //(共计 6s)1s 输入 1,3s 输入 2,6s 输入 3 
  1. 增加工作队列的监听。剖析下面的代码,当所有工作执行完时候,再增加工作,将不会持续被解决。因为当前情况下,如果不满足 prom instanceof Promise,processTask 也就进行执行了,将不会有新的回调,也不会再次触发 processTask。这里有两种思路解决:

    1. addTasks 时从新执行 start 办法
    2. 如果无工作,processTask 中生成一个 setTimeout,检测是不是有新工作,检测是不是有新的工作。

    a 计划:当执行 start 会从新生成 size 个 processTask,假如 pool 仍在运行或者局部 processTask 依然在运行,这会导致运行数量超过 size 个。当然,能够加若干条件判断。

    b 计划:如果没有工作则执行 setTimeout,过一段时间后再执行 processTask

    这里抉择了 b 计划,因为实现起来简略,也不容易出错:

class FixedThreadPool {
    // ...
    processTask() {const task = this.tasks.shift();
        const prom = typeof task === 'function' && task();

        if (prom instanceof Promise) {
            prom.then(data => {console.log(data);
            }).finally(() => {this.processTask();
            });
        }
        else { // 如果没有适合的工作,将会在 500ms 后再次执行
            setTimeout(() => {this.processTask();
            }, 500);
        }
    }
}
// 测试一下
const pool3 = new FixedThreadPool({
    size: 3, // 并发数量为 3
    tasks: [...tasks]
});
pool3.start();
setTimeout(() => {pool3.addTask(...tasks);
}, 5000); // 5s 后工作都执行完了,再增加工作 
  1. 增加回调函数。下面收到完结的数据后只是 console.log(data),理论状况下接到返回数据,须要做一些自定义的解决。这里在类外面增加了一个回调函数 defaultCb,用于解决返回的后果
// 
class FixedThreadPool {
    constructor({
        // ...
        defaultCb = (data) => {console.log(data)} // 增加回调函数
    }) {
        // ...
        if (typeof defaultCb === 'function') {this.defaultCb = defaultCb;} else {throw new Error('defaultCb can only be function');
        }
    }

       // ...

    processTask() {
        // ...
        if (prom instanceof Promise) {
            prom.then(data => {// console.log(data); 
                this.defaultCb(data); // 替换为自定义的 callback
            }).finally(() => {this.processTask();
            });
        }

        // ...
    }
}
// 自定义回调函数
const pool4 = new FixedThreadPool({
    size: 3, // 并发数量为 3
    tasks: [...tasks],
    defaultCb: (data) => {console.log('custom callback', data);
    }
});
pool4.start();
  1. 创立 Task 类,能够在池子里执行各种各样的工作。工作可能会有不同的生成函数、不同的参数、不同的回调函数,这时候将每个工作降级一下,生成一个 Task 类,它将领有一个 processor 用于执行异步工作,params 作为入参,callback 作为回调函数。
class Task {constructor({params, processor = () => {}, callback = () => {}}) {
        this.params = params;
        if (typeof processor === 'function') {this.processor = processor;}
        else {throw new Error('processor must be a funtion');
        }

        if (typeof callback === 'function') {this.callback = callback || (() => {});
        }
        else {throw new Error('callback must be a funtion');
        }
    }
}
// 对于 Task 的降级
class FixedThreadPool {
    constructor({
        size,
        tasks
    }) {
        this.size = size;
        this.tasks = [];
        this.addTask(...tasks); // 通过 addTask 增加,addTask 增加了工作类型的判断
    }

    // ...

    addTask(...tasks) {
        tasks.forEach(task => {if (task instanceof Task) { // 增加对于是否为 Task 实例的判断
                this.tasks.push(task);
            }
            else {console.error('expected to be instanceof Task'); // 文案也改一下
            }
        })
    }
    
    // ...
    processTask() {const task = this.tasks.shift();

        if (task) {const prom = task.processor(task.params); // promise 将由自定义 processor 生成
            if (prom instanceof Promise) {
                prom.then(data => {task.callback(data); // 自定义的 callback
                }).finally(() => {this.processTask();
                });
            }
        }
        else {setTimeout(() => {this.processTask();
            }, 500);
        }
    }
}
// 测试一下
// 工作类型降级
function genTaskObj(i) {
    return new Task({
        params: i,
        processor: (params) => {
            return new Promise(resolve => {setTimeout(() => {resolve(params);
                }, params * 1000);
            });
        },
        callback: (data) => {console.log(`callback for ${i}, rst is`, data);
        }
    });
}
const tasks = [genTaskObj(1),
    genTaskObj(2),
    genTaskObj(3),
];

// 
let pool5 = new FixedThreadPool({
    size: 3,
    tasks: [...tasks]
});

pool5.start();
  1. 假如 callback 也是个异步工作呢?理论状况中,callback 可能须要进行入库或者接口调用,这也是异步操作,咱们心愿在 callback 执行完结后后再进行相应的操作。如果 callback 执行后也是个异步工作(这里拿 promise 举例)这时须要将执行下一次 processTask 的机会,改为 callback 的 then 中,否则就是间接调用:
class FixedThreadPool {
    // ...
    processTask() {const task = this.tasks.shift();

        if (task) {const prom = task.processor(task.params);
            if (prom instanceof Promise) {
                let cb;
                prom.then(data => {cb = task.callback(data); // 失去 callback 的回调
                }).finally(() => {if (cb instanceof Promise) { // 进行是否为异步工作的判断
                        cb.finally(() => {this.processTask();
                        });
                    }
                    else {this.processTask();
                    }
                });
            }
        }
        else {setTimeout(() => {this.processTask();
            }, 500);
        }
    }
}
// 测试一下
// callback 也是异步的状况
function genTaskObjWithPromCb(i) {
    return new Task({
        params: i,
        processor: (params) => {
            return new Promise(resolve => {setTimeout(() => {resolve(params);
                }, params * 1000);
            });
        },
        callback: (data) => {
            return new Promise(resolve => {console.log('2s later, cb will finish', data);
                setTimeout(() => {console.log(data, 'finish');
                    resolve();}, 2000);
            })
        }
    });
}

const tasksWithPromCb = [genTaskObjWithPromCb(1),
    genTaskObjWithPromCb(2),
    genTaskObjWithPromCb(3),
];

let pool6 = new FixedThreadPool({
    size: 3,
    tasks: [...tasksWithPromCb]
});

pool6.start();
  1. 是时候给工作加个刹车了。假如当结尾提到的 a 场景,用户点击了详情页,咱们的工作池就应该进行了,此处填加 stop 办法。

    这里以后正在执行的工作不会被进行,然而剩下的工作将不会被执行了。

// 异步池
class FixedThreadPool {
    constructor({
        size,
        tasks
    }) {
        // ...
        
        // 增加标识
        this.runningFlag = false;
        this.runningProcessorCount = 0;

        // ...
    }

    // 是否运行中判断
    isRunning() {return this.runningFlag || this.runningProcessorCount > 0;}

    start() {if (this.isRunning()) {return;}
        
        this.runningFlag = true;

        let i = this.size;
        while(i--) {this.processTask();
            this.runningProcessorCount++;
        }
    }

    // 通知 processTask,工作不须要继续执行了
    stop() {this.runningFlag = false;}

    // ...

    processTask() {
        // 如果 flag 被置为 false,则进行执行
        if (!this.runningFlag) {
            this.runningProcessorCount--;
            console.log('stop');
            return;
        }

        // ...
    }
}
// 测试工作进行
function genTaskObj(i) {
    return new Task({
        params: i,
        processor: (params) => {
            return new Promise(resolve => {setTimeout(() => {resolve(params);
                }, params * 1000);
            });
        },
        callback: (data) => {console.log(`callback for ${i}, rst is`, data);
        }
    });
}
const tasks7 = [genTaskObj(1),
    genTaskObj(2),
    genTaskObj(3),
];

let pool7 = new FixedThreadPool({
    size: 3,
    tasks: [...tasks7, ...tasks7, ...tasks7]
})

pool7.start();

setTimeout(() => {pool7.stop();
}, 3000); // 3s 后进行 

三、总结

整体代码如下:

// 工作实体
class Task {constructor({params, processor = () => {}, callback = () => {}}) {
        this.params = params;
        if (typeof processor === 'function') {this.processor = processor;}
        else {throw new Error('processor must be a funtion');
        }

        if (typeof callback === 'function') {this.callback = callback || (() => {});
        }
        else {throw new Error('callback must be a funtion');
        }
    }
}

// 异步池
class FixedThreadPool {
    constructor({
        size,
        tasks
    }) {
        this.size = size;

        this.tasks = [];
        this.addTask(...tasks);

        this.runningFlag = false;
        this.runningProcessorCount = 0;
    }

    isRunning() {return this.runningFlag || this.runningProcessorCount > 0;}

    start() {if (this.isRunning()) {return;}

        this.runningFlag = true;

        let i = this.size;
        while(i--) {this.processTask();
            this.runningProcessorCount++;
        }
    }

    stop() {this.runningFlag = false;}

    addTask(...tasks) {
        tasks.forEach(task => {if (task instanceof Task) {this.tasks.push(task);
            }
            else {console.error('expected to be instanceof Task');
            }
        })
    }

    processTask() {if (!this.runningFlag) {
            this.runningProcessorCount--;
            console.log('stop');
            return;
        }

        const task = this.tasks.shift();

        if (task) {const prom = task.processor(task.params);
            if (prom instanceof Promise) {
                let cb;
                prom.then(data => {cb = task.callback(data);
                }).finally(() => {if (cb instanceof Promise) {cb.finally(() => {this.processTask();
                        });
                    }
                    else {this.processTask();
                    }
                });
            }
        }
        else {setTimeout(() => {this.processTask();
            }, 500);
        }
    }
}

正文完
 0