共计 6495 个字符,预计需要花费 17 分钟才能阅读完成。
在 es6 中的 async
的语法中,能够参照 java 并发包实现一些有意思的异步工具,辅助在异步场景 (个别指申请) 下的开发。
因为 js 是单线程,上面的实现都比 java 中实现简略 (抛除线程概念)。同时波及到 js 的执行机制,宏工作,微工作,async
,promise
相干内容,须要提前具备这些常识。
wait(期待)
异步函数中,期待 (相当于 java 线程中的阻塞) 一段时间。
实现代码:
async function wait(time = 0) {await new Promise(resolve => setTimeout(resolve, time));
// 防止转译成 return await, 会在一些 safari 版本外面报错
return undefined;
}
模仿应用代码:
(async () => {console.time();
await wait(1000);
console.timeEnd(); // 输入: default: 1.002s})();
Lock(锁)
模仿 java 并发包中的 Lock
类实现锁操作。保障同一个锁突围的异步代码执行过程中,同一时刻只有一段代码在执行。
该锁不合乎 html5 的的异步锁接口, 而是提供一个 java 异步包中
Lock
接口的简略实现
举荐应用场景:多个申请执行过程前,保障同一时刻只有一个 token
生效验证转换操作。
实现代码:
export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;
export type Reject = (reason?: any) => void;
export interface FlatPromise<T = any> {
promise: Promise<T>;
resolve: Resolve<T>;
reject: Reject;
};
interface Key {
key: number,
resolve: Resolve,
};
/**
* 创立一个扁平的 promise
* @returns Prmise
*/
function flatPromise<T = any>(): FlatPromise<T> {const result: any = {};
const promise = new Promise<T>((resolve, reject) => {
result.resolve = resolve;
result.reject = reject;
});
result.promise = promise;
return result as FlatPromise<T>;
}
class Lock {keys: Key[] = [];
hasLock: boolean = false;
idCount: number = 0;
constructor() {this.keys = [];
this.hasLock = false;
this.idCount = 0;
}
_pushKey(resolve: Resolve) {
this.idCount += 1;
const key: Key = {
key: this.idCount,
resolve,
};
this.keys.push(key);
return key;
}
_removeKey(key: Key) {const index = this.keys.findIndex(item => item.key === key.key);
if (index >= 0) {this.keys.splice(index, 1);
}
}
/**
* 获取锁.
* 如果以后锁曾经锁定, 那么就阻塞以后操作
*/
async lock() {if (this.keys.length || this.hasLock) {const { promise, resolve} = flatPromise();
this._pushKey(resolve);
await promise;
return null;
}
this.hasLock = true;
return null;
}
/**
* 尝试获取锁.
* 该函数如果没有指定一个无效的 time, 则立马返回一个后果: 如果获取到锁则为 true, 反之为 false.
* 如果指定一个无效的 time(time= 0 无效), 则返回一个 promise 对象, 改对象返回的后果为是否获取到锁
* @param time 最长等待时间
*/
tryLock(time?: number) {
if (time === undefined ||
Number.isNaN(Math.floor(time)) || time < 0) {if (this.hasLock) {return false;}
this.lock();
return Promise.resolve(true);
}
if (!this.hasLock && !this.keys.length) {
this.hasLock = true;
return Promise.resolve(true);
}
const asyncFn = async () => {const { promise, resolve: res} = flatPromise();
const key = this._pushKey(res);
setTimeout(() => {this._removeKey(key);
key.resolve(false);
}, time);
const isTimeout = await promise;
return isTimeout !== false;
};
return asyncFn();}
async lockFn(asyncFn: () => Promise<void>) {await this.lock();
try {await asyncFn();
} finally {this.unlock();
}
}
/**
* 开释锁
*/
unlock() {if (this.keys.length === 0 && this.hasLock === true) {
this.hasLock = false;
return;
}
if (this.keys.length === 0) {return;}
const index = Math.floor(Math.random() * this.keys.length);
const key = this.keys[index];
this._removeKey(key);
key.resolve(undefined);
}
toString() {return `${this.keys.length}-${this.hasLock}`;
}
}
模仿应用代码:
function delay(callback: () => void, time: number) {return new Promise<void>((resolve) => setTimeout(() => {callback();
resolve(undefined);
}, time));
}
(async () => {const lock = new Lock();
const syncResult: string[] = [];
const unSyncResult: string[] = [];
const withLockAsync = async () => {await lock.lock();
await delay(() => {syncResult.push('1');
}, Math.random() * 10);
await delay(() => {syncResult.push('2');
}, Math.random() * 10);
await delay(() => {syncResult.push('3');
}, Math.random() * 10);
lock.unlock();};
const withoutLockAsync = async () => {await delay(() => {unSyncResult.push('1');
}, Math.random() * 3);
await delay(() => {unSyncResult.push('2');
}, Math.random() * 3);
await delay(() => {unSyncResult.push('3');
}, Math.random() * 3);
};
const taskList = [];
for (let i = 0; i < 10; i += 1) {taskList.push(withLockAsync(), withoutLockAsync());
}
await Promise.all(taskList);
// 输入 1,2,3,1,2,3...
// 证实 withLockAsync 函数中的代码同一时刻只有一个执行,不会被打算
console.log(syncResult);
// 输入的值不肯定依照 1,2,3,1,2,3...
// 证实在一般的 async 函数中,await 后的代码会被打乱
console.log(unSyncResult);
})();
Semaphore(信号量)
Semaphore
(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。
举荐应用场景:个别用于流量的管制,特地是公共资源无限的利用场景。例如管制同一时刻申请的连贯数量,假如浏览器的申请连接数下限为 10 个,多个异步并发申请能够应用 Semaphore
来管制申请的异步执行个数最多为 10 个。
简略实现代码:
class Semaphore {constructor(permits) {
this.permits = permits;
this.execCount = 0;
this.waitTaskList = [];}
async acquire() {
const that = this;
this.execCount += 1;
if (this.execCount <= this.permits) {
// 为了保障依照调用程序执行
// 如果有期待的,那么先执行期待的,以后的挂起
// 没有则疾速通过
if (that.waitTaskList.length !== 0) {const waitTask = this.waitTaskList.pop();
waitTask();
await new Promise((resolve) => {that.waitTaskList.push(resolve);
});
}
return;
}
await new Promise((resolve) => {that.waitTaskList.push(resolve);
});
}
release() {
this.execCount -= 1;
if (this.execCount < 0) {this.execCount = 0;}
if (this.waitTaskList.length === 0) {return;}
const waitTask = this.waitTaskList.pop();
waitTask();}
}
模仿一个简单的页面中简单的申请场景:
(async () => {const semaphore = new Semaphore(5);
let doCount = 0;
let currntCount = 0;
const request = async (id) => {await semaphore.acquire();
currntCount++;
console.log(` 执行申请 ${id}, 正在执行的个数 ${currntCount}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
semaphore.release();
currntCount--;
doCount++;
console.log(` 执行申请 ${id}完结,曾经执行 ${doCount}次 `);
};
const arr = new Array(10).fill(1);
setTimeout(() => {
// 顺次执行 10 个申请
arr.forEach((_, index) => {request(`1-${index}`);
});
}, 300)
// 随机触发 10 个申请
let index = 0;
const timerId = setInterval(() => {if (index > 9) {clearInterval(timerId);
return;
}
request(`2-${index}`);
index++;
}, Math.random() * 300 + 300);
// 同时执行 10 个申请
await Promise.all(arr.map(async (_, index) => {await request(`3-${index}`);
}));
// 期待下面的内容全副执行, 资源开释后,在执行 4 个申请
setTimeout(() => {const lastArr = new Array(4).fill(1);
lastArr.forEach((_, index) => {request(`4-${index}`);
});
}, 5000);
})();
执行后果:
执行申请 3 -0, 正在执行的个数 1
执行申请 3 -1, 正在执行的个数 2
执行申请 3 -2, 正在执行的个数 3
执行申请 3 -3, 正在执行的个数 4
执行申请 3 -4, 正在执行的个数 5
执行申请 3 - 2 完结,曾经执行 1 次
执行申请 2 -1, 正在执行的个数 5
执行申请 3 - 3 完结,曾经执行 2 次
执行申请 2 -0, 正在执行的个数 5
...
执行申请 3 - 8 完结,曾经执行 29 次
执行申请 3 - 6 完结,曾经执行 30 次
执行申请 4 -0, 正在执行的个数 1
执行申请 4 -1, 正在执行的个数 2
执行申请 4 -2, 正在执行的个数 3
执行申请 4 -3, 正在执行的个数 4
执行申请 4 - 3 完结,曾经执行 31 次
执行申请 4 - 0 完结,曾经执行 32 次
执行申请 4 - 2 完结,曾经执行 33 次
执行申请 4 - 1 完结,曾经执行 34 次
CountDownLatch(倒计时闭锁)和 CyclicBarrier(循环栅栏)
在 es 的 async
中,个别情景的 CountDownLatch
能够间接用 Promise.all
代替。
应用场景:简单的 Promise.all
需要场景,反对在离散的多个异步函数中灵便应用 (自己还没有碰到过),从而解脱Promise.all
在应用时须要一个 promise
的iterable
类型的输出。
代码实现:
class CountDownLatch {constructor(count) {
this.count = count;
this.waitTaskList = [];}
countDown() {
this.count--;
if (this.count <= 0) {this.waitTaskList.forEach(task => task());
}
}
// 防止应用关键字,所以命名跟 java 中不一样
async awaitExec() {if (this.count <= 0) {return;}
const that = this;
await new Promise((resolve) => {that.waitTaskList.push(resolve);
});
}
}
模仿应用代码:
(async () => {const countDownLatch = new CountDownLatch(10);
const request = async (id) => {console.log(` 执行申请 ${id}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
console.log(` 执行申请 ${id}完结 `);
countDownLatch.countDown();};
const task = new Array(10).fill(1);
// 后续的代码等同于
// await Promise.all(task.map(async (_, index) => {// await request(index);
// }));
task.forEach((_1, index) => {request(index);
});
await countDownLatch.awaitExec();
console.log('执行结束');
})();
而 CyclicBarrier
抛除线程相干概念后,外围性能就是一个能够重复使用的CountDownLatch
,这里就不实现了。