在es6中的async
的语法中,能够参照java并发包实现一些有意思的异步工具,辅助在异步场景(个别指申请)下的开发。
因为js是单线程,上面的实现都比java中实现简略 (抛除线程概念)。同时波及到js的执行机制,宏工作,微工作,async
,promise
相干内容,须要提前具备这些常识。
wait(期待)
异步函数中,期待(相当于java线程中的阻塞)一段时间。
实现代码:
async function wait(time = 0) {
await new Promise(resolve => setTimeout(resolve, time));
// 防止转译成return await, 会在一些safari版本外面报错
return undefined;
}
模仿应用代码:
(async () => {
console.time();
await wait(1000);
console.timeEnd(); // 输入: default: 1.002s
})();
Lock(锁)
模仿java并发包中的Lock
类实现锁操作。 保障同一个锁突围的异步代码执行过程中,同一时刻只有一段代码在执行。
该锁不合乎html5的的异步锁接口,而是提供一个java异步包中
Lock
接口的简略实现
举荐应用场景:多个申请执行过程前,保障同一时刻只有一个token
生效验证转换操作。
实现代码:
export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;
export type Reject = (reason?: any) => void;
export interface FlatPromise<T = any> {
promise: Promise<T>;
resolve: Resolve<T>;
reject: Reject;
};
interface Key {
key: number,
resolve: Resolve,
};
/**
* 创立一个扁平的promise
* @returns Prmise
*/
function flatPromise<T = any>(): FlatPromise<T> {
const result: any = {};
const promise = new Promise<T>((resolve, reject) => {
result.resolve = resolve;
result.reject = reject;
});
result.promise = promise;
return result as FlatPromise<T>;
}
class Lock {
keys: Key[] = [];
hasLock: boolean = false;
idCount: number = 0;
constructor() {
this.keys = [];
this.hasLock = false;
this.idCount = 0;
}
_pushKey(resolve: Resolve) {
this.idCount += 1;
const key: Key = {
key: this.idCount,
resolve,
};
this.keys.push(key);
return key;
}
_removeKey(key: Key) {
const index = this.keys.findIndex(item => item.key === key.key);
if (index >= 0) {
this.keys.splice(index, 1);
}
}
/**
* 获取锁.
* 如果以后锁曾经锁定,那么就阻塞以后操作
*/
async lock() {
if (this.keys.length || this.hasLock) {
const { promise, resolve } = flatPromise();
this._pushKey(resolve);
await promise;
return null;
}
this.hasLock = true;
return null;
}
/**
* 尝试获取锁.
* 该函数如果没有指定一个无效的time,则立马返回一个后果:如果获取到锁则为true,反之为false.
* 如果指定一个无效的time(time=0无效),则返回一个promise对象,改对象返回的后果为是否获取到锁
* @param time 最长等待时间
*/
tryLock(time?: number) {
if (time === undefined ||
Number.isNaN(Math.floor(time)) || time < 0) {
if (this.hasLock) {
return false;
}
this.lock();
return Promise.resolve(true);
}
if (!this.hasLock && !this.keys.length) {
this.hasLock = true;
return Promise.resolve(true);
}
const asyncFn = async () => {
const { promise, resolve: res } = flatPromise();
const key = this._pushKey(res);
setTimeout(() => {
this._removeKey(key);
key.resolve(false);
}, time);
const isTimeout = await promise;
return isTimeout !== false;
};
return asyncFn();
}
async lockFn(asyncFn: () => Promise<void>) {
await this.lock();
try {
await asyncFn();
} finally {
this.unlock();
}
}
/**
* 开释锁
*/
unlock() {
if (this.keys.length === 0 && this.hasLock === true) {
this.hasLock = false;
return;
}
if (this.keys.length === 0) {
return;
}
const index = Math.floor(Math.random() * this.keys.length);
const key = this.keys[index];
this._removeKey(key);
key.resolve(undefined);
}
toString() {
return `${this.keys.length}-${this.hasLock}`;
}
}
模仿应用代码:
function delay(callback: () => void, time: number) {
return new Promise<void>((resolve) => setTimeout(() => {
callback();
resolve(undefined);
}, time));
}
(async () => {
const lock = new Lock();
const syncResult: string[] = [];
const unSyncResult: string[] = [];
const withLockAsync = async () => {
await lock.lock();
await delay(() => {
syncResult.push('1');
}, Math.random() * 10);
await delay(() => {
syncResult.push('2');
}, Math.random() * 10);
await delay(() => {
syncResult.push('3');
}, Math.random() * 10);
lock.unlock();
};
const withoutLockAsync = async () => {
await delay(() => {
unSyncResult.push('1');
}, Math.random() * 3);
await delay(() => {
unSyncResult.push('2');
}, Math.random() * 3);
await delay(() => {
unSyncResult.push('3');
}, Math.random() * 3);
};
const taskList = [];
for (let i = 0; i < 10; i += 1) {
taskList.push(withLockAsync(), withoutLockAsync());
}
await Promise.all(taskList);
// 输入1,2,3,1,2,3...
// 证实withLockAsync函数中的代码同一时刻只有一个执行,不会被打算
console.log(syncResult);
// 输入的值不肯定依照1,2,3,1,2,3...
// 证实在一般的async函数中,await后的代码会被打乱
console.log(unSyncResult);
})();
Semaphore(信号量)
Semaphore
(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。
举荐应用场景:个别用于流量的管制,特地是公共资源无限的利用场景。例如管制同一时刻申请的连贯数量,假如浏览器的申请连接数下限为10个,多个异步并发申请能够应用Semaphore
来管制申请的异步执行个数最多为10个。
简略实现代码:
class Semaphore {
constructor(permits) {
this.permits = permits;
this.execCount = 0;
this.waitTaskList = [];
}
async acquire() {
const that = this;
this.execCount += 1;
if (this.execCount <= this.permits) {
// 为了保障依照调用程序执行
// 如果有期待的,那么先执行期待的,以后的挂起
// 没有则疾速通过
if (that.waitTaskList.length !== 0) {
const waitTask = this.waitTaskList.pop();
waitTask();
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
return;
}
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
release() {
this.execCount -= 1;
if (this.execCount < 0) {
this.execCount = 0;
}
if (this.waitTaskList.length === 0) {
return;
}
const waitTask = this.waitTaskList.pop();
waitTask();
}
}
模仿一个简单的页面中简单的申请场景:
(async () => {
const semaphore = new Semaphore(5);
let doCount = 0;
let currntCount = 0;
const request = async (id) => {
await semaphore.acquire();
currntCount++;
console.log(`执行申请${id}, 正在执行的个数${currntCount}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
semaphore.release();
currntCount--;
doCount++;
console.log(`执行申请${id}完结,曾经执行${doCount}次`);
};
const arr = new Array(10).fill(1);
setTimeout(() => {
// 顺次执行10个申请
arr.forEach((_, index) => {
request(`1-${index}`);
});
}, 300)
// 随机触发10个申请
let index = 0;
const timerId = setInterval(() => {
if (index > 9) {
clearInterval(timerId);
return;
}
request(`2-${index}`);
index++;
}, Math.random() * 300 + 300);
// 同时执行10个申请
await Promise.all(arr.map(async (_, index) => {
await request(`3-${index}`);
}));
// 期待下面的内容全副执行, 资源开释后,在执行4个申请
setTimeout(() => {
const lastArr = new Array(4).fill(1);
lastArr.forEach((_, index) => {
request(`4-${index}`);
});
}, 5000);
})();
执行后果:
执行申请3-0, 正在执行的个数1
执行申请3-1, 正在执行的个数2
执行申请3-2, 正在执行的个数3
执行申请3-3, 正在执行的个数4
执行申请3-4, 正在执行的个数5
执行申请3-2完结,曾经执行1次
执行申请2-1, 正在执行的个数5
执行申请3-3完结,曾经执行2次
执行申请2-0, 正在执行的个数5
...
执行申请3-8完结,曾经执行29次
执行申请3-6完结,曾经执行30次
执行申请4-0, 正在执行的个数1
执行申请4-1, 正在执行的个数2
执行申请4-2, 正在执行的个数3
执行申请4-3, 正在执行的个数4
执行申请4-3完结,曾经执行31次
执行申请4-0完结,曾经执行32次
执行申请4-2完结,曾经执行33次
执行申请4-1完结,曾经执行34次
CountDownLatch(倒计时闭锁)和CyclicBarrier(循环栅栏)
在es的async
中,个别情景的CountDownLatch
能够间接用Promise.all
代替。
应用场景:简单的Promise.all
需要场景,反对在离散的多个异步函数中灵便应用(自己还没有碰到过),从而解脱Promise.all
在应用时须要一个promise
的iterable
类型的输出。
代码实现:
class CountDownLatch {
constructor(count) {
this.count = count;
this.waitTaskList = [];
}
countDown() {
this.count--;
if (this.count <= 0) {
this.waitTaskList.forEach(task => task());
}
}
// 防止应用关键字,所以命名跟java中不一样
async awaitExec() {
if (this.count <= 0) {
return;
}
const that = this;
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
}
模仿应用代码:
(async () => {
const countDownLatch = new CountDownLatch(10);
const request = async (id) => {
console.log(`执行申请${id}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
console.log(`执行申请${id}完结`);
countDownLatch.countDown();
};
const task = new Array(10).fill(1);
// 后续的代码等同于
// await Promise.all(task.map(async (_, index) => {
// await request(index);
// }));
task.forEach((_1, index) => {
request(index);
});
await countDownLatch.awaitExec();
console.log('执行结束');
})();
而CyclicBarrier
抛除线程相干概念后,外围性能就是一个能够重复使用的CountDownLatch
,这里就不实现了。
发表回复