在es6中的async
的语法中,能够参照java并发包实现一些有意思的异步工具,辅助在异步场景(个别指申请)下的开发。
因为js是单线程,上面的实现都比java中实现简略 (抛除线程概念)。同时波及到js的执行机制,宏工作,微工作,async
,promise
相干内容,须要提前具备这些常识。
wait(期待)
异步函数中,期待(相当于java线程中的阻塞)一段时间。
实现代码:
async function wait(time = 0) { await new Promise(resolve => setTimeout(resolve, time)); // 防止转译成return await, 会在一些safari版本外面报错 return undefined;}
模仿应用代码:
(async () => { console.time(); await wait(1000); console.timeEnd(); // 输入: default: 1.002s})();
Lock(锁)
模仿java并发包中的Lock
类实现锁操作。 保障同一个锁突围的异步代码执行过程中,同一时刻只有一段代码在执行。
该锁不合乎html5的的异步锁接口,而是提供一个java异步包中Lock
接口的简略实现
举荐应用场景:多个申请执行过程前,保障同一时刻只有一个token
生效验证转换操作。
实现代码:
export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;export type Reject = (reason?: any) => void;export interface FlatPromise<T = any> { promise: Promise<T>; resolve: Resolve<T>; reject: Reject;};interface Key { key: number, resolve: Resolve,};/*** 创立一个扁平的promise* @returns Prmise*/function flatPromise<T = any>(): FlatPromise<T> { const result: any = {}; const promise = new Promise<T>((resolve, reject) => { result.resolve = resolve; result.reject = reject; }); result.promise = promise; return result as FlatPromise<T>;}class Lock { keys: Key[] = []; hasLock: boolean = false; idCount: number = 0; constructor() { this.keys = []; this.hasLock = false; this.idCount = 0; } _pushKey(resolve: Resolve) { this.idCount += 1; const key: Key = { key: this.idCount, resolve, }; this.keys.push(key); return key; } _removeKey(key: Key) { const index = this.keys.findIndex(item => item.key === key.key); if (index >= 0) { this.keys.splice(index, 1); } } /** * 获取锁. * 如果以后锁曾经锁定,那么就阻塞以后操作 */ async lock() { if (this.keys.length || this.hasLock) { const { promise, resolve } = flatPromise(); this._pushKey(resolve); await promise; return null; } this.hasLock = true; return null; } /** * 尝试获取锁. * 该函数如果没有指定一个无效的time,则立马返回一个后果:如果获取到锁则为true,反之为false. * 如果指定一个无效的time(time=0无效),则返回一个promise对象,改对象返回的后果为是否获取到锁 * @param time 最长等待时间 */ tryLock(time?: number) { if (time === undefined || Number.isNaN(Math.floor(time)) || time < 0) { if (this.hasLock) { return false; } this.lock(); return Promise.resolve(true); } if (!this.hasLock && !this.keys.length) { this.hasLock = true; return Promise.resolve(true); } const asyncFn = async () => { const { promise, resolve: res } = flatPromise(); const key = this._pushKey(res); setTimeout(() => { this._removeKey(key); key.resolve(false); }, time); const isTimeout = await promise; return isTimeout !== false; }; return asyncFn(); } async lockFn(asyncFn: () => Promise<void>) { await this.lock(); try { await asyncFn(); } finally { this.unlock(); } } /** * 开释锁 */ unlock() { if (this.keys.length === 0 && this.hasLock === true) { this.hasLock = false; return; } if (this.keys.length === 0) { return; } const index = Math.floor(Math.random() * this.keys.length); const key = this.keys[index]; this._removeKey(key); key.resolve(undefined); } toString() { return `${this.keys.length}-${this.hasLock}`; }}
模仿应用代码:
function delay(callback: () => void, time: number) { return new Promise<void>((resolve) => setTimeout(() => { callback(); resolve(undefined); }, time));}(async () => { const lock = new Lock(); const syncResult: string[] = []; const unSyncResult: string[] = []; const withLockAsync = async () => { await lock.lock(); await delay(() => { syncResult.push('1'); }, Math.random() * 10); await delay(() => { syncResult.push('2'); }, Math.random() * 10); await delay(() => { syncResult.push('3'); }, Math.random() * 10); lock.unlock(); }; const withoutLockAsync = async () => { await delay(() => { unSyncResult.push('1'); }, Math.random() * 3); await delay(() => { unSyncResult.push('2'); }, Math.random() * 3); await delay(() => { unSyncResult.push('3'); }, Math.random() * 3); }; const taskList = []; for (let i = 0; i < 10; i += 1) { taskList.push(withLockAsync(), withoutLockAsync()); } await Promise.all(taskList); // 输入1,2,3,1,2,3... // 证实withLockAsync函数中的代码同一时刻只有一个执行,不会被打算 console.log(syncResult); // 输入的值不肯定依照1,2,3,1,2,3... // 证实在一般的async函数中,await后的代码会被打乱 console.log(unSyncResult);})();
Semaphore(信号量)
Semaphore
(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。
举荐应用场景:个别用于流量的管制,特地是公共资源无限的利用场景。例如管制同一时刻申请的连贯数量,假如浏览器的申请连接数下限为10个,多个异步并发申请能够应用Semaphore
来管制申请的异步执行个数最多为10个。
简略实现代码:
class Semaphore { constructor(permits) { this.permits = permits; this.execCount = 0; this.waitTaskList = []; } async acquire() { const that = this; this.execCount += 1; if (this.execCount <= this.permits) { // 为了保障依照调用程序执行 // 如果有期待的,那么先执行期待的,以后的挂起 // 没有则疾速通过 if (that.waitTaskList.length !== 0) { const waitTask = this.waitTaskList.pop(); waitTask(); await new Promise((resolve) => { that.waitTaskList.push(resolve); }); } return; } await new Promise((resolve) => { that.waitTaskList.push(resolve); }); } release() { this.execCount -= 1; if (this.execCount < 0) { this.execCount = 0; } if (this.waitTaskList.length === 0) { return; } const waitTask = this.waitTaskList.pop(); waitTask(); }}
模仿一个简单的页面中简单的申请场景:
(async () => { const semaphore = new Semaphore(5); let doCount = 0; let currntCount = 0; const request = async (id) => { await semaphore.acquire(); currntCount++; console.log(`执行申请${id}, 正在执行的个数${currntCount}`); await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500)); semaphore.release(); currntCount--; doCount++; console.log(`执行申请${id}完结,曾经执行${doCount}次`); }; const arr = new Array(10).fill(1); setTimeout(() => { // 顺次执行10个申请 arr.forEach((_, index) => { request(`1-${index}`); }); }, 300) // 随机触发10个申请 let index = 0; const timerId = setInterval(() => { if (index > 9) { clearInterval(timerId); return; } request(`2-${index}`); index++; }, Math.random() * 300 + 300); // 同时执行10个申请 await Promise.all(arr.map(async (_, index) => { await request(`3-${index}`); })); // 期待下面的内容全副执行, 资源开释后,在执行4个申请 setTimeout(() => { const lastArr = new Array(4).fill(1); lastArr.forEach((_, index) => { request(`4-${index}`); }); }, 5000);})();
执行后果:
执行申请3-0, 正在执行的个数1执行申请3-1, 正在执行的个数2执行申请3-2, 正在执行的个数3执行申请3-3, 正在执行的个数4执行申请3-4, 正在执行的个数5执行申请3-2完结,曾经执行1次执行申请2-1, 正在执行的个数5执行申请3-3完结,曾经执行2次执行申请2-0, 正在执行的个数5...执行申请3-8完结,曾经执行29次执行申请3-6完结,曾经执行30次执行申请4-0, 正在执行的个数1执行申请4-1, 正在执行的个数2执行申请4-2, 正在执行的个数3执行申请4-3, 正在执行的个数4执行申请4-3完结,曾经执行31次执行申请4-0完结,曾经执行32次执行申请4-2完结,曾经执行33次执行申请4-1完结,曾经执行34次
CountDownLatch(倒计时闭锁)和CyclicBarrier(循环栅栏)
在es的async
中,个别情景的CountDownLatch
能够间接用Promise.all
代替。
应用场景:简单的Promise.all
需要场景,反对在离散的多个异步函数中灵便应用(自己还没有碰到过),从而解脱Promise.all
在应用时须要一个promise
的iterable
类型的输出。
代码实现:
class CountDownLatch { constructor(count) { this.count = count; this.waitTaskList = []; } countDown() { this.count--; if (this.count <= 0) { this.waitTaskList.forEach(task => task()); } } // 防止应用关键字,所以命名跟java中不一样 async awaitExec() { if (this.count <= 0) { return; } const that = this; await new Promise((resolve) => { that.waitTaskList.push(resolve); }); }}
模仿应用代码:
(async () => { const countDownLatch = new CountDownLatch(10); const request = async (id) => { console.log(`执行申请${id}`); await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500)); console.log(`执行申请${id}完结`); countDownLatch.countDown(); }; const task = new Array(10).fill(1); // 后续的代码等同于 // await Promise.all(task.map(async (_, index) => { // await request(index); // })); task.forEach((_1, index) => { request(index); }); await countDownLatch.awaitExec(); console.log('执行结束');})();
而CyclicBarrier
抛除线程相干概念后,外围性能就是一个能够重复使用的CountDownLatch
,这里就不实现了。