关于async-await:async异步工具

5次阅读

共计 6495 个字符,预计需要花费 17 分钟才能阅读完成。

在 es6 中的 async 的语法中,能够参照 java 并发包实现一些有意思的异步工具,辅助在异步场景 (个别指申请) 下的开发。
因为 js 是单线程,上面的实现都比 java 中实现简略 (抛除线程概念)。同时波及到 js 的执行机制,宏工作,微工作,asyncpromise相干内容,须要提前具备这些常识。

wait(期待)

异步函数中,期待 (相当于 java 线程中的阻塞) 一段时间。

实现代码:

async function wait(time = 0) {await new Promise(resolve => setTimeout(resolve, time));
  // 防止转译成 return await, 会在一些 safari 版本外面报错
  return undefined;
}

模仿应用代码:

(async () => {console.time();
  await wait(1000);
  console.timeEnd(); // 输入: default: 1.002s})();

Lock(锁)

模仿 java 并发包中的 Lock 类实现锁操作。保障同一个锁突围的异步代码执行过程中,同一时刻只有一段代码在执行。

该锁不合乎 html5 的的异步锁接口, 而是提供一个 java 异步包中 Lock 接口的简略实现

举荐应用场景:多个申请执行过程前,保障同一时刻只有一个 token 生效验证转换操作。

实现代码:

export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;

export type Reject = (reason?: any) => void;

export interface FlatPromise<T = any> {
  promise: Promise<T>;
  resolve: Resolve<T>;
  reject: Reject;
};

interface Key {
  key: number,
  resolve: Resolve,
};

/**
* 创立一个扁平的 promise
* @returns Prmise
*/
function flatPromise<T = any>(): FlatPromise<T> {const result: any = {};
  const promise = new Promise<T>((resolve, reject) => {
    result.resolve = resolve;
    result.reject = reject;
  });
  result.promise = promise;
  return result as FlatPromise<T>;
}

class Lock {keys: Key[] = [];
  hasLock: boolean = false;
  idCount: number = 0;
  
  constructor() {this.keys = [];
    this.hasLock = false;
    this.idCount = 0;
  }
  
  _pushKey(resolve: Resolve) {
    this.idCount += 1;
    const key: Key = {
      key: this.idCount,
      resolve,
    };
    this.keys.push(key);
    return key;
  }
  
  _removeKey(key: Key) {const index = this.keys.findIndex(item => item.key === key.key);
    if (index >= 0) {this.keys.splice(index, 1);
    }
  }
  
  /**
  * 获取锁.
  * 如果以后锁曾经锁定, 那么就阻塞以后操作
  */
  async lock() {if (this.keys.length || this.hasLock) {const { promise, resolve} = flatPromise();
      this._pushKey(resolve);
      await promise;
      return null;
    }
    
    this.hasLock = true;
    return null;
  }
  
  /**
  * 尝试获取锁.
  * 该函数如果没有指定一个无效的 time, 则立马返回一个后果: 如果获取到锁则为 true, 反之为 false.
  * 如果指定一个无效的 time(time= 0 无效), 则返回一个 promise 对象, 改对象返回的后果为是否获取到锁
  * @param time 最长等待时间
  */
  tryLock(time?: number) {
    if (time === undefined ||
        Number.isNaN(Math.floor(time)) || time < 0) {if (this.hasLock) {return false;}
      this.lock();
      return Promise.resolve(true);
    }
    
    if (!this.hasLock && !this.keys.length) {
      this.hasLock = true;
      return Promise.resolve(true);
    }
    
    const asyncFn = async () => {const { promise, resolve: res} = flatPromise();
      const key = this._pushKey(res);
      
      setTimeout(() => {this._removeKey(key);
        key.resolve(false);
      }, time);
      
      const isTimeout = await promise;
      
      return isTimeout !== false;
    };
    
    return asyncFn();}
  
  async lockFn(asyncFn: () => Promise<void>) {await this.lock();
    try {await asyncFn();
    } finally {this.unlock();
    }
  }
  
  /**
  * 开释锁
  */
  unlock() {if (this.keys.length === 0 && this.hasLock === true) {
      this.hasLock = false;
      return;
    }
    
    if (this.keys.length === 0) {return;}
    
    const index = Math.floor(Math.random() * this.keys.length);
    const key = this.keys[index];
    this._removeKey(key);
    key.resolve(undefined);
  }
  
  toString() {return `${this.keys.length}-${this.hasLock}`;
  }
}

模仿应用代码:

function delay(callback: () => void, time: number) {return new Promise<void>((resolve) => setTimeout(() => {callback();
    resolve(undefined);
  }, time));
}

(async () => {const lock = new Lock();
  const syncResult: string[] = [];
  const unSyncResult: string[] = [];
  const withLockAsync = async () => {await lock.lock();
    
    await delay(() => {syncResult.push('1');
    }, Math.random() * 10);
    
    await delay(() => {syncResult.push('2');
    }, Math.random() * 10);
    
    await delay(() => {syncResult.push('3');
    }, Math.random() * 10);
    
    lock.unlock();};
  
  const withoutLockAsync = async () => {await delay(() => {unSyncResult.push('1');
    }, Math.random() * 3);
    
    await delay(() => {unSyncResult.push('2');
    }, Math.random() * 3);
    
    await delay(() => {unSyncResult.push('3');
    }, Math.random() * 3);
  };
  
  const taskList = [];
  for (let i = 0; i < 10; i += 1) {taskList.push(withLockAsync(), withoutLockAsync());
  }
  await Promise.all(taskList);
  
  // 输入 1,2,3,1,2,3...
  // 证实 withLockAsync 函数中的代码同一时刻只有一个执行,不会被打算
  console.log(syncResult);
  // 输入的值不肯定依照 1,2,3,1,2,3...
  // 证实在一般的 async 函数中,await 后的代码会被打乱
  console.log(unSyncResult);
})();

Semaphore(信号量)

Semaphore(信号量)是用来管制同时拜访特定资源的线程数量,它通过协调各个线程,以保障正当的应用公共资源。

举荐应用场景:个别用于流量的管制,特地是公共资源无限的利用场景。例如管制同一时刻申请的连贯数量,假如浏览器的申请连接数下限为 10 个,多个异步并发申请能够应用 Semaphore 来管制申请的异步执行个数最多为 10 个。

简略实现代码:

class Semaphore {constructor(permits) {
    this.permits = permits;
    this.execCount = 0;
    this.waitTaskList = [];}
  
  async acquire() {
    const that = this;
    this.execCount += 1;
    if (this.execCount <= this.permits) {
      // 为了保障依照调用程序执行
      // 如果有期待的,那么先执行期待的,以后的挂起
      // 没有则疾速通过
      if (that.waitTaskList.length !== 0) {const waitTask = this.waitTaskList.pop();
        waitTask();
        await new Promise((resolve) => {that.waitTaskList.push(resolve);
        });
      }
      return;
    }
    await new Promise((resolve) => {that.waitTaskList.push(resolve);
    });
  }
  
  release() {
    this.execCount -= 1;
    if (this.execCount < 0) {this.execCount = 0;}
    if (this.waitTaskList.length === 0) {return;}
    const waitTask = this.waitTaskList.pop();
    waitTask();}
}

模仿一个简单的页面中简单的申请场景:

(async () => {const semaphore = new Semaphore(5);
  
  let doCount = 0;
  let currntCount = 0;
  const request = async (id) => {await semaphore.acquire();
    currntCount++;
    console.log(` 执行申请 ${id}, 正在执行的个数 ${currntCount}`);
    await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
    semaphore.release();
    currntCount--;
    doCount++;
    console.log(` 执行申请 ${id}完结,曾经执行 ${doCount}次 `);
  };
  const arr = new Array(10).fill(1);
  setTimeout(() => {
    // 顺次执行 10 个申请
    arr.forEach((_, index) => {request(`1-${index}`);
    });
  }, 300)
  // 随机触发 10 个申请
  let index = 0;
  const timerId = setInterval(() => {if (index > 9) {clearInterval(timerId);
      return;
    }
    request(`2-${index}`);
    index++;
  }, Math.random() * 300 + 300);
  // 同时执行 10 个申请
  await Promise.all(arr.map(async (_, index) => {await request(`3-${index}`);
  }));
  // 期待下面的内容全副执行, 资源开释后,在执行 4 个申请
  setTimeout(() => {const lastArr = new Array(4).fill(1);
    lastArr.forEach((_, index) => {request(`4-${index}`);
    });
  }, 5000);
})();

执行后果:

执行申请 3 -0, 正在执行的个数 1
执行申请 3 -1, 正在执行的个数 2
执行申请 3 -2, 正在执行的个数 3
执行申请 3 -3, 正在执行的个数 4
执行申请 3 -4, 正在执行的个数 5
执行申请 3 - 2 完结,曾经执行 1 次
执行申请 2 -1, 正在执行的个数 5
执行申请 3 - 3 完结,曾经执行 2 次
执行申请 2 -0, 正在执行的个数 5
...
执行申请 3 - 8 完结,曾经执行 29 次
执行申请 3 - 6 完结,曾经执行 30 次
执行申请 4 -0, 正在执行的个数 1
执行申请 4 -1, 正在执行的个数 2
执行申请 4 -2, 正在执行的个数 3
执行申请 4 -3, 正在执行的个数 4
执行申请 4 - 3 完结,曾经执行 31 次
执行申请 4 - 0 完结,曾经执行 32 次
执行申请 4 - 2 完结,曾经执行 33 次
执行申请 4 - 1 完结,曾经执行 34 次

CountDownLatch(倒计时闭锁)和 CyclicBarrier(循环栅栏)

在 es 的 async 中,个别情景的 CountDownLatch 能够间接用 Promise.all 代替。

应用场景:简单的 Promise.all 需要场景,反对在离散的多个异步函数中灵便应用 (自己还没有碰到过),从而解脱Promise.all 在应用时须要一个 promiseiterable类型的输出。

代码实现:

class CountDownLatch {constructor(count) {
    this.count = count;
    this.waitTaskList = [];}
  
  countDown() {
    this.count--;
    if (this.count <= 0) {this.waitTaskList.forEach(task => task());
    }
  }
  
  // 防止应用关键字,所以命名跟 java 中不一样
  async awaitExec() {if (this.count <= 0) {return;}
    const that = this;
    await new Promise((resolve) => {that.waitTaskList.push(resolve);
    });
  }
}

模仿应用代码:

(async () => {const countDownLatch = new CountDownLatch(10);
  
  const request = async (id) => {console.log(` 执行申请 ${id}`);
    await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
    console.log(` 执行申请 ${id}完结 `);
    countDownLatch.countDown();};
  
  const task = new Array(10).fill(1);
  // 后续的代码等同于
  // await Promise.all(task.map(async (_, index) => {//   await request(index);
  // }));
  task.forEach((_1, index) => {request(index);
  });
  await countDownLatch.awaitExec();
  console.log('执行结束');
})();

CyclicBarrier 抛除线程相干概念后,外围性能就是一个能够重复使用的CountDownLatch,这里就不实现了。

正文完
 0