乐趣区

关于javascript:JAVA语言异步非阻塞设计模式应用篇

1. 概述

本系列文章共 2 篇。在上一篇《原理篇》中,咱们看到了异步非阻塞模型,它可能无效升高线程 IO 状态的耗时,晋升资源利用率和零碎吞吐量。异步 API 能够体现为 listener 或 Promise 模式;其中 Promise API 提供了更强的灵活性,反对同步返回和异步回调,也容许注册任意数目的回调。

在本文《利用篇》中,咱们将进一步摸索异步模式和 Promise 的利用:

第 2 章:Promise 与线程池。 在异步执行耗时申请时,ExecutorService+Future 是一个备选计划;然而相比于 Future,Promise 反对纯异步获取响应数据,可能打消更多阻塞。

第 3 章:异样解决。 Java 程序并不总能胜利执行申请,有时会遇到网络问题等不可抗力。对于无奈防止的异常情况,异步 API 必须提供异样解决机制,以晋升程序的容错性。

第 4 章:申请调度。 Java 程序有时须要提交多条申请,这些申请之间可能存在肯定的关联关系,包含程序执行、并行执行、批量执行。异步 API 须要对这些束缚提供反对。

本文不限定 Promise 的具体实现,读者在生产环境能够抉择一个 Promise 工具类(如 netty DefaultPromise[A]、jdk CompletableFuture[B]等);此外,因为 Promise 的原理并不简单,读者也能够自行实现所需性能。

2.Promise 与线程池

Java 程序有时须要执行耗时的 IO 操作,如数据库拜访;在此期间,相比于纯内存计算,IO 操作的持续时间显著更长。为了缩小 IO 阻塞、进步资源利用率,咱们应该应用异步模型,将申请提交到其余线程中执行,从而间断提交多条申请,而不用期待之前的申请返回。

本章对几种 IO 模型进行比照(见 2.1 节),考查调用者线程的阻塞状况。其中,Promise 反对纯异步的申请提交及响应数据处理,可能最大水平地打消不必要的阻塞。在理论我的项目中,如果底层 API 不反对纯异步,那么咱们也能够进行适当重构,使其和 Promise 兼容(见 2.2 节)。

2.1 比照:同步、Future、Promise

本节对几种 IO 模型进行比照,包含同步 IO、基于线程池(ExecutorService)的异步 IO、基于 Promise 的异步 IO,考查调用者线程的阻塞状况。假如咱们要执行数据库拜访申请。因为须要逾越网络,单条申请须要进行耗时的 IO 操作,能力最终收到响应数据;然而申请之间没有束缚,容许随时提交新的申请,而不须要收到之前的响应数据。

首先咱们来看看 几种模型的样例代码

1. 同步 IO。db.writeSync()办法是同步阻塞的。函数阻塞,直至收到响应数据。因而,调用者一次只能提交一个申请,必须期待该申请返回,能力再提交下一个申请。

/* 提交申请并阻塞,直至收到响应数据 */
String result = db.writeSync("data");
process(result);

2. 基于线程池(ExecutorService)的异步 IO。db.writeSync()办法不变;然而将其提交到线程池中来执行,使得调用者线程不会阻塞,从而能够间断提交多条申请 data1-3。

提交申请后,线程池返回 Future 对象,调用者调用 Future.get() 以获取响应数据。Future.get() 办法却是阻塞的,因而调用者在取得响应数据之前无奈再提交后续申请。

/* 提交申请 */
// executor: ExecutorService
Future<String> resultFuture1 = executor.submit(() -> db.writeSync("data1"));
Future<String> resultFuture2 = executor.submit(() -> db.writeSync("data2"));
Future<String> resultFuture3 = executor.submit(() -> db.writeSync("data3"));

/* 获取响应:同步 */
String result1 = resultFuture1.get();
String result2 = resultFuture2.get();
String result3 = resultFuture3.get();
process(result1);
process(result2);
process(result3);

3. 基于 Promise 的异步 IO。db.writeAsync()办法是纯异步的,提交申请后返回 Promise 对象;调用者调用 Promise.await() 注册回调,当收到响应数据后触发回调。

《原理篇》 中,咱们看到了 Promise API 能够基于线程池或响应式模型实现;不管哪种形式,回调函数能够在接管响应的线程中执行,而不须要调用者线程阻塞地期待响应数据。

/* 提交申请 */
Promise<String> resultPromise1 = db.writeAsync("data1");
Promise<String> resultPromise2 = db.writeAsync("data2");
Promise<String> resultPromise3 = db.writeAsync("data3");

/* 获取响应:异步 */
resultPromise1.await(result1 -> process(result1));
resultPromise2.await(result2 -> process(result2));
resultPromise3.await(result3 -> process(result3));

接下来咱们看看以上几种模型中,调用者线程状态随工夫变动的过程,如图 2 - 1 所示。

a.同步 IO。调用者一次只能提交一个申请,在收到响应之前不能提交下一个申请。

b.基于线程池的异步 IO。同一组申请(申请 1 -3,以及申请 4 -6)能够间断提交,而不须要期待前一条申请返回。然而,一旦调用者应用 Future.get() 获取响应数据(result1-3),就会阻塞而无奈再提交下一组申请(申请 4 -6),直至理论收到响应数据。

c.基于 Promise 的异步 IO。 调用者随时能够提交申请,并向 Promise 注册对响应数据的回调函数;稍后接管线程向 Promise 告诉响应数据,以触发回调函数。上述过程中,调用者线程不须要期待响应数据,始终不会阻塞。

图 2 -1a 线程工夫线:同步 IO

图 2 -1b 线程工夫线:基于线程池的异步 IO

图 2 -1c 线程工夫线:基于 Promise 的异步 IO

2.2 Promise 联合线程池

和 ExecutorService+Future 相比,Promise 具备纯异步的长处;然而在某些场景下也须要把 Promise 和线程池联合应用。例如:1. 底层 API 只反对同步阻塞模型,不反对纯异步;此时只能在线程池中调用 API,能力做到非阻塞。2. 须要重构一段遗留代码,将其线程模型从线程池模型改为响应式模型;能够先将对外接口改为 Promise API,而底层实现临时应用线程池。

上面的代码片段展现了 Promise 和线程池联合的用法:

  1. 创立 Promise 对象作为返回值。留神这里应用了 PromiseOrException,以防期间遇到异样;其能够告诉响应数据,也能够在失败时告诉抛出的 Exception。详见 3.1 大节。
  2. 在线程池中执行申请(2a),并在收到响应数据后向 Promise 告诉(2b)
  3. 解决线程池满异样。线程池底层关联一个 BlockingQueue 来存储待执行的工作,个别设置为有界队列以防有限占用内存,当队列满时会抛弃某个工作。为了向调用者告诉该异样,线程池的回绝策略须设置为 AbortPolicy,当队列满时抛弃所提交的工作,并抛出 RejectedExecutionException;一旦捕捉该异样,就要向 Promise 告诉申请失败。
public PromiseOrException<String, Exception> writeAsync() {
// 1. 创立 Promise 对象
    PromiseOrException<String, Exception> resultPromise = new PromiseOrException<>();
    try {executor.execute(() -> {String result = db.writeSync("data"); // 2a. 执行申请。只反对同步阻塞
            resultPromise.signalAllWithResult(result); // 2b. 告诉 Promise
        });

    }catch (RejectedExecutionException e){ // 3. 异样:线程池满
        resultPromise.signalAllWithException(e); 
    }

    return resultPromise;
}

3. 异样解决:PromiseOrException

Java 程序有时会遇到不可避免的异常情况,如网络连接断开;因而,程序员须要设计适当的异样解决机制,以晋升程序的容错性。本章介绍异步 API 的异样解决,首先介绍 Java 语言异样解决标准;而后介绍 Promise 的变体 PromiseOrException,使得 Promise API 反对标准的异样解决。

3.1 异样解决标准

集体认为,Java 代码的异样解决 该当合乎下列标准:

  1. 显式辨别失常进口和异样进口。
  2. 反对编译时刻查看,强制调用者解决不可避免的异样。

 

辨别失常进口和异样进口

异样是 Java 语言的重要个性,是一种根本的控制流。Java 语言中,一个函数容许有一个返回值,以及抛出多个不同类型的异样。函数的返回值是失常进口,函数返回阐明函数可能失常工作,并计算出正确的后果;相同,一旦函数遇到异常情况无奈持续工作,如网络连接断开、申请非法等,就要抛出相应的异样。

尽管 if-else 和异样都是控制流,然而程序员必须 辨析二者的应用场景。if-else 的各个分支个别是对等的,都用于解决失常状况;而函数的返回值和异样是不对等的,抛出异样示意函数遇到无奈解决的故障,曾经无奈失常计算结果,其与函数失常工作所产生的返回值有本质区别。在 API 设计中,混同失常进口(返回值)与异样进口(抛出异样),或者在无奈持续工作时不抛异样,都是重大的设计缺点。

以数据库拜访为例,上面的代码比照了 API 进行异样解决的两种模式。数据库拜访过程中,如果网络连接顺畅,并且服务端可能正确处理申请,那么 db.write() 应该返回服务端的响应数据,如服务端为所写数据生成的自增 id、条件更新理论影响的数据条数等;如果网络连接断开,或者客户端和服务端版本不匹配导致申请无奈解析,从而无奈失常工作,那么 db.write() 应该抛出异样以阐明具体起因。从“是否失常工作”的角度看,上述两种状况的性质是截然不同的,显然应该选用异样作为控制流,而不是 if-else。

/* 正确 */
try {String result = db.write("data");
    process(result); // 失常进口
} catch (Exception e) {log.error("write fails", e); // 异样进口
}

/* 谬误 */
String resultOrError = db.write("data");
if (resultOrError.equals("OK")) {process(resultOrError); // 失常进口
} else {log.error("write fails, error:" + resultOrError); // 异样进口
} 

强制解决不可避免的异样

Java 语言的异样解决体系中,异样次要分为以下几类:Exception、RuntimeException、Error;三者都是 Throwable 的子类,即能够被函数抛出。留神,因为 RuntimeException 是 Exception 的子类,本文为防止混同,“Exception”特指“是 Exception 但不是 RuntimeException”的那些异样。

集体认为,几种异样类型别离用于下列场景

1. Exception:程序内部的不可抗力造成的异常情况,如网络连接断开。即便 Java 代码完美无瑕,也相对不可能防止这类异样(拔掉网线试试!)。既然无奈防止,这种异样就该当强制解决,以晋升零碎的容错能力。
2. RuntimeException:编程谬误造成的异常情况,如数组下标越界 ArrayOutOfBoundException、参数不合乎取值范畴 IllegalArgumentException 等。如果程序员对 API 的输出束缚一目了然,并在调用 API 之前对函数参数进行适当校验,那么 RuntimeException 是能够相对防止的(除非被调 API 在该当抛 Exception 处,理论抛出了 RuntimeException)。既然能够防止,这种异样就没有必要强制解决。

当然,人无完人。假如程序员真的违反了某些束缚,函数抛出 RuntimeException 且未被解决,那么作为惩办,线程或过程会退出,从而揭示程序员改正错误代码。如果线程或过程必须常驻,就要对 RuntimeException 进行兜底,如上面的代码所示。这里将代码缺点视为无奈防止的异常情况,捕捉异样后能够记录日志、触发告警,揭示稍起初修改缺点。

new Thread(()->{while (true){
       try{doSomething();
       }catch (RuntimeException e){  // 对 RuntimeException 进行兜底,以防线程中断
 log.error("error occurs", e);
       }
   }
});

3. Error:jvm 外部定义的异样,如 OutOfMemoryError。业务逻辑个别不抛出 Error,而是抛出某种 Exception 或 RuntimeException。

上述几类异样中,只有 Exception 是强制解决的,称为 checked exception[C]。如下所示是一个 checked exception 的例子。数据库拜访 DB.write()抛出 Exception,示意遇到网络断开、音讯解析失败等不可抗状况。异样类型为 Exception 而不是 RuntimeException,以强制调用者增加 catch 子句解决上述情况;如调用者脱漏了 catch 子句,则编译器会报错,从而提醒调用者“这里肯定会遇到异常情况,必须进行解决”,以欠缺程序容错能力。

 /**
 * 抛出异样,如果:* 1. 网络连接断开
 * 2. 音讯无奈解析
 * 3. 业务逻辑相干,如服务端扣款时发现余额有余
 * 4. …… // 任何无奈防止的状况,都应该抛出 Exception!*/
 public String write(Object data) throws Exception {return "";}

 /**
 * 解决异样
 */
try {String result = db.write("data");
    process(result); 
 } catch (Exception e) {  // 如脱漏 catch 子句,则编译不通过
    log.error("write fails, db: ..., data: ...", e);
 }

3.2 Promise API 的异样解决

上一大节探讨了异样解决的标准:

  • 显式辨别失常进口和异样进口;
  • 不可抗的异样,要在编译时刻强制解决。上面的代码展现了 Promise API 要如何设计异样解决机制,以合乎上述标准。
  1. 应用 PromiseOrException 来告诉响应数据和异样。PromiseOrException<T, E> 是 Promise<X> 的子类,泛型模版 X 为数据对象 ResultOrException<T, E extends Exception>,其含有 2 个字段 result 和 e:e==null 示意失常,此时字段 result 无效;e!=null 示意异样,此时不要应用字段 result。
  2. 在“重载 1”中,调用者从回调函数中取得 ResultOrException 对象 。调用 ResultOrException.get() 获取响应数据 result,或者 get()办法抛出异样 e。这种形式的代码构造和传统的异样解决统一,能够应用多个 catch 子句别离解决不同类型的异样。
  3. 在“重载 2”中,调用者从回调函数中间接取得 result 和 e 。含意同上。这种形式省去了 ResultOrException.get();然而如果须要处不同类型的异样,则须要用 e instanceof MyException 来判断异样类型。
// extends Promise<ResultOrException<String, Exception>>
 PromiseOrException<String, Exception> resultPromsie = db.writeAsync("data");

 /* 重载 1 */ 
resultPromsie.await(resultOrException -> { 
    try {String result = resultOrException.get();
        process(result);  // 失常进口
     } catch (Exception e) {log.error("write fails", e);  // 异样进口
 }
});
 
 /* 重载 2 */
 resultPromsie.await((result, e) -> {if (e == null) {process(result);  // 失常进口
 } else {log.error("write fails", e);  // 异样进口
 }
});

PromiseOrException 合乎上一大节提出的异样解决标准,具备如下长处

  1. 辨别失常进口和异样进口。响应数据和异样别离应用 result 和 e 两个变量来传递,能够靠 e ==null 来判断是否失常。留神 result==null 不能作为判断条件,因为 null 有可能是响应数据的非法值。
  2. 强制解决异样。不管应用哪一种回调,不存在一种代码构造可能只取得 result 而不取得 e,因而语法上不会脱漏 e 的异样解决。
  3. 容许定义异样类型。PromiseOrException 的泛型模版 E 填为 Excetion 不是必须的,也能够填为任意其余类型。留神,受限于 Java 语法,泛型模版处只容许填写一种异样类型,而不像函数抛异样那样容许抛出多种异样。为应答这种限度,咱们只能为 API 定义一个异样父类,调用者用 catch 子句或 instanceof 进行向下转型。当然,这种“定义异样父类”的做法也是能够承受的,并在现有工具中广泛应用,因为能够将工具所抛异样区别于 Java 语言内置的异样类型。

最初,在异样解决构造方面集体提出 一个倡议 :全副异样通过 PromiseOrException 来告诉,而 API 自身不要抛出异样。以数据库拜访 API writeAsync() 为例,面的代码比照了两种抛异样的形式。正确的做法 是 PromiseOrException 作为惟一进口,如果 API 底层实现抛出异样(submit() throws Exception),则应该将异样封装于 PromiseOrException 对象,而不应该间接从 API 函数抛出(writeAsync() throws Exception)。

/* 正确:惟一进口 PromiseOrException*/
public PromiseOrException<String, Exception> writeAsync(Object data)  {
    try {submit(data);  // throws exception
    } catch (Exception e) {return PromiseOrException.immediatelyException(e);
    }
    PromiseOrException<String, Exception> resultPromise = new PromiseOrException<>();
    return resultPromise;
}

/* 谬误:两个进口 throws Exception 和 PromiseOrException*/
public PromiseOrException<String, Exception> writeAsync(Object data) throws Exception {submit(data);  // throws exception

    PromiseOrException<String, Exception> resultPromise = new PromiseOrException<>();
    return resultPromise;
}

如果谬误地设计了含有两个异样进口的 API,调用者就不得不反复书写异样解决逻辑,如上面的代码所示。

try {PromiseOrException<String, Exception> resultPromise = db.writeAsync("data");
    resultPromise.await((result, e) -> {if (e == null) {process(result);  // 失常进口
 } else {log.error("write fails", e);  // 异样进口 2
 }
    });
} catch (Exception e) {log.error("write fails", e);  // 异样进口 1
}

4. 申请调度

Java 程序中有时须要提交多条异步申请,且这些申请之间存在肯定的关联关系。在异步非阻塞场景下,这些关联关系都能够借助 Promise 来实现。

1. 程序申请,如图 4 - 1 所示。后一条申请依赖前一条申请的响应数据;因而,必须期待前一条申请返回,能力结构并提交下一条申请。

图 4 -1 程序申请

2. 并行申请 ,如图 4 - 2 所示。一次提交多条申请,而后期待全副申请返回。所提交的申请之间没有依赖关系,因而能够同时执行;然而必须收到每条申请的响应数据(产生 channelRead() 事件,事件参数为响应数据),能力执行理论的解决 process(result1,2,3)。


图 4 -2 并行申请

3. 批量申请,如图 4 - 3 所示。调用者间断提交多条申请,然而暂存在队列中(offer()),而不是立即执行。一段时间后,从队列中取出若干条申请,组装为批量申请来提交(writeBulk());当收到批量申请的响应音讯时,能够从中取出每条申请的响应数据。因为每次网络 IO 都带来额定开销,故理论利用中常常应用批量申请来缩小网络 IO 频率,以晋升总体吞吐量。


图 4 -3 批量申请

4.1. 程序申请:Promise.then()

假如一系列操作须要顺次实现,即前一操作实现后,能力开始执行下一操作;如果这些操作均体现为 Promise API,咱们能够对 Promise.await(listener)进行封装,使代码构造更加简洁。

如下所示是一个异步 Promise API。submit 办法提交申请 request 并返回 Promise 对象;当收到响应数据时,该 Promise 对象被告诉。

/**
 * 异步 Promise API
 */
 public static Promise<String> submit(Object request) {Promise<String> resultPromise = new Promise<>();
     // ……
 return resultPromise;
}

现假如有 5 个申请称为“A”-“E”,这些申请必须顺次提交。例如,因为申请 B 的参数依赖申请 A 的响应数据,故提交申请 A 后必须先解决其响应数据 resultA,而后能力再提交申请 B。这种场景能够用如下所示的代码来实现。某次调用 submit(“X”)函数后,咱们在其返回的 Promise 对象上注册回调;回调函数内解决响应数据 resultX,并调用 submit(“X+1”)来提交下一申请。

这种形式尽管能实现性能需要,然而嵌套式的代码构造可读性十分差——每减少一个申请,代码就要多嵌套、缩进一个层级。当调用逻辑简单、申请数较多时,代码会十分难以保护。

这种状况也称为 “回调天堂”[D],在 JavaScript 语言中相干探讨颇多,能够作为参考。

submit("A").await(resultA -> {submit("B").await(resultB -> {submit("C").await(resultC -> {submit("D").await(resultD -> {submit("E").await(resultE -> {process(resultE);
                });
            });
        });
    });
});

为改良代码构造,咱们对 Promise<T>.await(Consumer<T>) 办法进行封装,提供 Promise<T>.then(Function<T, Promise<Next>>)办法,如下所示。相似于 await(),then()也能够注册一个回调函数 resultX->submit(“X+1”),回调函数解决响应数据 resultX,并提交下一申请 submit(“X+1”);then()的返回值即 submit(“X+1”)的返回值,用于告诉下一申请的响应数据 resultX+1。

Promise<String> resultPromiseA = submit("A");
Promise<String> resultPromiseB = resultPromiseA.then(resultA -> submit("B"));
Promise<String> resultPromiseC = resultPromiseB.then(resultB -> submit("C"));
Promise<String> resultPromiseD = resultPromiseC.then(resultC -> submit("D"));
Promise<String> resultPromiseE = resultPromiseD.then(resultD -> submit("E"));
resultPromiseE.await(resultE -> process(resultE));

接下来,咱们将两头变量 resultPromiseA-E 内联,即失去基于 then()的链式调用构造。相比于 await(),then()打消了套娃般的嵌套回调。

submit("A")
        .then(resultA -> submit("B")) // 返回 resultPromiseB
        .then(resultB -> submit("C")) // 返回 resultPromiseC
        .then(resultC -> submit("D")) // 返回 resultPromiseD
        .then(resultD -> submit("E")) // 返回 resultPromiseE
        .await(resultE -> process(resultE));

最初,咱们来看一下 Promise<T>.then() 的一种简略实现,如下所示:

  1. then()办法提供一个泛型模版 Next,以阐明下一申请的响应数据类型。
  2. 依据泛型模版 Next,then()外部创立 Promise<Next> 作为返回值,用于告诉下一申请的响应数据。
  3. 对于以后申请,调用 await()注册响应数据的回调 result;当收到响应数据后,执行函数 func,以提交下一申请:func.apply(result)。
  4. 当收到下一申请的响应数据后,Promise<Next> 被告诉:nextPromise::signalAll。
public <Next> Promise<Next> then(Function<T, Promise<Next>> func) {Promise<Next> nextPromise = new Promise<>();
    await(result -> {Promise<Next> resultPromiseNext = func.apply(result);
        resultPromiseNext.await(nextPromise::signalAll);
    });
    return nextPromise;
}

留神,这里只展现了纯异步重载 Promise<T>.then(Function<T, Promise<Next>>)。依据回调函数是否有返回值、同步执行还是异步执行,Promise 能够提供 then() 的更多种重载;受限于 Java 语法,如编译器无奈辨析各个重载,则能够应用函数名称进行显式区别,如:

thenRun(Runnable)

thenAccept(Consumer<T>)

thenApply(Function<T, Next>)

thenApplyAsync(Function<T, Promise<Next>>)

4.2. 并行申请:LatchPromise

上一大节介绍了“程序申请”的场景,即多条申请须要顺次执行;而“并行申请”场景下,多条申请之间没有程序束缚,然而咱们依然须要期待全副申请返回,能力执行后续操作。例如,咱们须要查问多张数据库表,这些查问语句能够同时执行;然而必须期待每条查问都返回,咱们能力取得残缺信息。jdk 提供 CountDownLatch 来实现这一场景,然而其只反对同步期待;作为改良,咱们采纳 LatchPromise 实现雷同的性能,并且反对纯异步 API。

以数据库拜访为例,如下所示的代码 展现了 LatchPromise 的应用

  1. 提交 3 条申请,并获取每个申请所对应的 Promise 对象 resultPromise1-3,以获取响应数据。
  2. 创立 LatchPromise 对象,并向其注册须要期待的 Promise 对象 resultPromise1-3。
  3. LatchPromise.untilAllSignaled()返回一个 Promise 对象 allSignaled。当所注册的 resultPromise1- 3 均被告诉后,allSignaled 会被告诉。allSignaled 的类型为 VoidPromise,示意 allSignaled 被告诉时没有须要解决的响应数据。
  4. 在 allSignaled 上注册回调,在回调函数中调用 resultPromiseX.await()获取理论的响应数据;此时因为申请已执行结束,故 await()立即返回而不阻塞。
/* 创立 Promise 对象 */
Promise<String> resultPromise1 = db.writeAsync("a");
Promise<String> resultPromise2 = db.writeAsync("b");
Promise<String> resultPromise3 = db.writeAsync("c");

/* 向 LatchPromise 注册要期待的 Promise*/
LatchPromise latch = new LatchPromise();
latch.add(resultPromise1);
latch.add(resultPromise2);
latch.add(resultPromise3);

/* 期待全副 Promise 被告诉 */
VoidPromise allSignaled = latch.untilAllSignaled();
allSignaled.await(() -> {String result1 = resultPromise1.await();
    String result2 = resultPromise2.await();
    String result3 = resultPromise3.await();
    process(result1, result2, result3);
});

作为比照,上面的代码应用 CountDownLatch 实现雷同性能,然而 存在以下缺点

  1. CountDownLatch.await() 只反对同步期待。在纯异步场景下是无奈承受的。
  2. CountDownLatch 对业务逻辑有侵入性。程序员须要在业务逻辑中增加对 CountDownLatch.countDown()的调用,以管制 CountDownLatch 的时序;相同,LatchPromise 依赖原本就曾经存在的 resultPromise 对象,而不须要编写额定的时序控制代码。
  3. CountDownLatch 引入了冗余逻辑。创立 CountDownLatch 时,必须在结构参数中填写要期待的申请数;因而,一旦所提交的申请的数目扭转,就必须相应地更新创立 CountDownLatch 的代码,批改结构参数。
CountDownLatch latch = new CountDownLatch(3);
resultPromise1.await(result1 -> latch.countDown());
resultPromise2.await(result2 -> latch.countDown());
resultPromise3.await(result3 -> latch.countDown());

latch.await();
String result1 = resultPromise1.await();
String result2 = resultPromise2.await();
String result3 = resultPromise3.await();
process(result1, result2, result3); 

最初,咱们来看一下 LatchPromise 的参考实现。代码原理如下所示:

  1. 设立 countUnfinished 变量,记录还没有被告诉的 Promise 对象的数目。每当注册一个 Promise 对象,countUnfinished 递增;每当一个 Promise 被告诉,countUnfinished 递加。当 countUnfinished 减到 0 时,阐明所注册全副 Promise 对象都被告诉了,故告诉 allSignaled。
  2. 设立 noMore 变量 ,记录是否还须要持续注册新的 Promise 对象,仅当调用了 untilAllSignaled() 才认为实现注册;在此之前,即便 countUnfinished 减至 0,也不应该告诉 allSignaled。思考这样一种状况:须要注册并期待 resultPromise1-3,其中 resultPromise1、2 在注册期间即已被告诉,而 resultPromise3 未被告诉。如果不判断 noMore,那么注册完 resultPromise1、2 后,countUnfinished 即已减至 0,导致提前告诉 allSignaled;这是一个时序谬误,因为实际上 resultPromise3 还没有实现。
  3. 为保障线程平安,拜访变量时须上锁,此处应用 synchronized 来实现。
  4. 留神 ,调用 untilAllSignaled() 时,如果 countUnfinished 的初值曾经为 0,则应立即告诉 allSignaled;因为 countUnfinished 曾经不可能再递加,之后没有机会再告诉 allSignaled 了。
// private static class Lock。无成员,仅用于 synchronized(lock)
private final Lock lock = new Lock();
private int countUnfinished = 0;
private final VoidPromise allSignaled = new VoidPromise();

public void add(Promise<?> promise) {if (promise.isSignaled()) {return;}

    synchronized (lock) {countUnfinished++;}

    promise.await(unused -> {synchronized (lock) {
            countUnfinished--;
            if (countUnfinished == 0 && noMore) {allSignaled.signalAll();
            }
        }
    });
}

public VoidPromise untilAllSignaled() {synchronized (lock) {if (countUnfinished == 0) {allSignaled.signalAll();
        } else {noMore = true;}
    }

    return allSignaled;
}

4.3. 批量申请:ExecutorAsync

批量申请的个性

“批量申请”(也称“bulk”、“batch”)是指发送一条音讯即可携带多条申请,次要用于数据库拜访和近程调用等场景。因为缩小了网络 IO 次数、节约了结构和传输音讯的开销,批量申请能无效晋升吞吐量。

很多数据库 API 都反对批量读写,如 JDBC PreparedStatement[E]、elasticsearch bulk API[F]、mongo DB insertMany()[G]、influx DB BatchPoints[H],读者能够查阅参考文献进一步理解。为了晋升性能,局部 API 会就义易用性。其中,elasticsearch bulk API 对调用者的限度起码,容许混淆增删改等不同类型的申请,容许写入不同的数据库表(index);mongo DB、influx DB 次之,一个批量申请只能写入同一个数据库表,然而能够自定义每条数据的字段;PreparedStatement 的灵活性最低,其定义了 SQL 语句的模版,调用者只能填写模版参数,而不能批改语句构造。

尽管数据库 API 曾经反对批量拜访,然而很多原生 API 依然须要调用者本人结构批量申请,须要调用者解决申请组装、批量大小、并发申请数等简单的细节。

在此,咱们设计出通用组件 ExecutorAsync,封装申请调度策略以提供更简洁的 API。ExecutorAsync 的应用流程 如上面的代码片段所示:

  1. 相似于线程池 ExecutorService.submit(),调用者能够调用 ExecutorAsync.submit()来提交一个申请。其中,申请以数据对象 Request 示意,用于存储申请类型和申请参数。
  2. 提交申请后,调用者取得 Promise 对象,以获取响应数据。因为应用了 Promise,ExecutorAsync 反对纯异步操作,提交申请和获取响应数据都不须要阻塞。
  3. ExecutorAsync 外部对申请进行调度,并非提交一条申请就立即执行,而是每隔固定工夫收集一批申请,将其组装为一个批量申请,再调用理论的数据库拜访 API。如果数据库拜访 API 容许,那么一批申请能够混淆不同的申请类型,或者操作不同的数据库表。
ExecutorAsync executor = new ExecutorAsync();
Promise<...> resultPromise1 = executor.submit(new Request("data1"));
Promise<...> resultPromise2 = executor.submit(new Request("data2"));
Promise<...> resultPromise3 = executor.submit(new Request("data3"));

具体而言,ExecutorAsync 反对如下调度策略:

1. 排队,如图 4 -4a 所示。调用者提交申请 Request 后不要立即执行,而是将其缓存在队列 queue 中。


图 4 -4a ExecutorAsync 个性:排队

2. 批量,如图 4 -4b 所示。每隔固定工夫距离,ExecutorAsync 从队列中取出若干条申请,将其组装为批量申请 bulk,并调用底层数据库 API 提交给服务端。如果队列长度增长得很快,咱们也能够定义一个批量大小 bulk size,当队列长度达到该值时立即组装一个批量申请并提交。


图 4 -4b ExecutorAsync 个性:批量

3. 并发,如图 4 -4c 所示。如果底层数据库 API 反对异步提交申请,那么 ExecutorAsync 就能够充分利用这种个性,间断提交多个批量申请,而不须要期待之前的批量申请返回。为防止数据库服务器超载,咱们能够定义并发度 parallelism,限度正在执行(in flight)的批量申请的数目;当达到限度时,如果调用者再提交新的申请,就暂存在队列 queue 中期待执行,而不会组装新的批量申请。


图 4 -4c ExecutorAsync 个性:并发

4. 抛弃。如图 4 -4d 所示。在上文提到的 bulk size 和 parallelism 的限度下,如果提交申请的速率远高于服务端响应的速率,那么大量申请就会沉积在队列中期待解决,最终导致超时失败。在这种状况下,将申请发送给服务端曾经没有意义,因为调用者曾经认定申请失败,而不再关怀响应数据。

图 4 -4d 申请超时

因而,ExecutorAsync 应该及时从队列中移除有效申请,而残余申请依然“陈腐”。这种策略可能强制缩短队列长度,以升高后续申请在队列中的沉积时长、预防申请超时;同时,因为防止存储和发送有效申请,这种策略也能节约内存和 IO 开销。

图 4 -4e ExecutorAsync 个性:抛弃

批量申请的实现

上一大节咱们看到了 ExecutorAsync 的调度策略,包含排队、批量、并发、抛弃。如上面的代码所示,ExecutorAsync 只需对外提供 submit(Request) 办法,用于提交单条申请。申请以数据对象 Request 示意,其字段 Request.resultPromise 是 Promise 对象,用于告诉响应数据;在须要进行异样解决的场景下,咱们应用 PromiseOrException<T, Exception> 作为 Promise 的实现,其中泛型模版 T 改为响应数据的理论类型。

public class ExecutorAsync {public PromiseOrException<T, Exception> submit(Request<T> request) {return request.resultPromise;}
}

接下来咱们来看看 ExecutorAsync 的实现原理。因为源码细节较多、篇幅较长,故本节用流程图的模式,来解说更高层的设计,如图 4 - 5 所示。

图 4 -5 ExecutorAsync 原理

1. 提交申请。调用者调用 ExecutorAsync.submit(Request),每次调用提交一条申请。该条申请存入队列 queue,期待后续调度执行。参数 Request 的构造如上面的代码所示,包含下列字段:

predicate:函数,判断申请是否无效,有效申请(如超时的申请)将被抛弃。详见步骤 2。

resultPromise:告诉响应数据。

public class Request<Response> {
    public final PredicateE predicate; 
    public final PromiseOrException<Response, Exception> resultPromise;
}

2. 每隔固定距离,或者 queue.size()达到 bulk size,尝试组装批量申请 。从队列 queue 中顺次取出申请,每条申请执行函数 Request.predicate,以判断是否依然要提交该申请;取出的无效申请的条数,不超过 bulk size。
predicate 是一个函数,相似于 jdk Predicate 接口,模式如上面的代码所示。接口函数 test() 能够失常返回,示意申请依然无效;也能够抛出异样,阐明申请有效的起因,如期待超时。如果抛出异样,则该条申请间接抛弃,并将产生的异样将告诉给 Request.resultPromise,使得调用者执行异样解决逻辑。

public interface PredicateE {void test() throws Exception;
}

3. 提交批量申请 。第 2 步从队列 queue 中取出了至少 bulk size 条申请,将其作为参数调用 RequestFunc.execute(requests),以提交批量申请。接口 RequestFunc 的模式如上面的代码所示。接口办法 execute(requests) 以若干条申请为参数,将其组装为批量申请,调用底层的数据库 API 来提交。

public interface RequestFunc<T>{void execute(List<Request<T>> requests);
}

4. 当收到响应后,对于每条申请,顺次向 Request.resultPromise 告诉响应数据

5. 为避免服务端超载,ExecutorAsync 可限度并发申请数不超过 parallelism。咱们设置计数变量 inFlight=0,以统计正在执行的批量申请数:

a. 当尝试组装批量申请(步骤 2)时,首先判断 inFlight<parallelism,满足条件能力从队列 queue 中取出待执行的申请。

b. 当提交批量申请(步骤 3,RequestFunc.execute())后,inFlight++。

c. 当一批申请均收到响应数据(步骤 4,Request.resultPromise 被告诉)后,inFlight–。此时如 queue 中的申请数仍超过 bulk size,则回到步骤 2,再取出一批申请来执行。

综上,ExecutorAsync 应用队列 queue 来暂存待执行的申请;当须要提交批量申请时,以 PredicateE 筛选无效申请、抛弃有效申请;对于一批申请,调用 RequestFunc.execute()来批量提交,收到响应数据后 Request.resultPromise 告诉。上述过程满足束缚,以防服务端超载:一批申请的数目至少为 bulk size;同时正在执行的批量申请数不超过 parallelism。以上即为 ExecutorAsync 的基本原理;理论利用中还须要解决配置参数、泛型模版等细节,限于篇幅起因本文不再具体解说。

5. 总结

本文介绍了 异步模型 Promise 设计模式 的理论利用场景,探讨了异步 API 的设计准则,并介绍了相应的解决方案。异步模式不仅仅是“提交申请 - 解决响应”的简略过程,有时也须要设计异样解决机制,以及依据申请之间的关联关系进行调度。在解决这些简单场景的时候,API 须要放弃纯异步的个性,在提交申请、解决响应的过程中都不能阻塞;须要充分利用编译时刻查看,避免调用者脱漏分支,尤其是不可避免的异样分支;API 须要封装简单的、反复的实现细节,尽量放弃调用者的代码构造简洁易懂。

本系列文章旨在对异步模式进行科普,心愿能起到抛砖引玉的作用,帮忙读者了解异步模式的基本原理,对有可能遇到的理论问题有所理解,并初步摸索异步模式的实现机制。然而,事实中的我的项目和工具远比本文介绍的简单,还请读者做好调研、选型工作。考查现有的各种异步 API,读者会发现异步模式目前仍无统一标准。以数据库客户端为例,各种异步 API 的函数模式都不尽相同,listener API 和 Promise API 都有采纳,也有一些 API 模式上是异步的,然而在某些状况下会产生阻塞;异步 API 底层有基于线程池 / 连接池实现的,也有基于响应式模型(如 netty)实现的。因而,请读者务必充沛理解异步工具的 API 模式、阻塞个性、线程模型,而后能力在我的项目中利用;如果现有工具不合乎开发标准,亦可大胆地进行封装,或者自行实现所需个性。对于异步工具的封装和实现,也十分欢送读者交换与斧正。

参考文献

[A] jdk CompletableFuture

https://www.baeldung.com/java…

[B] netty DefaultPromise

https://www.tabnine.com/code/… 

[C] checked exception

https://www.geeksforgeeks.org… 

[D] 回调天堂(JavaScript)

https://blog.avenuecode.com/c… 

[E] 批量申请:JDBC PreparedStatement

https://www.tutorialspoint.co… 

[F] 批量申请:elasticsearch bulk API

https://www.elastic.co/guide/… 

[G] 批量申请:mongo DB insertMany()

https://mongodb.github.io/mon… 

[H] 批量申请:influx DB BatchPoints

https://github.com/influxdata…

退出移动版