关于javascript:JAVA语言异步非阻塞设计模式原理篇

55次阅读

共计 13963 个字符,预计需要花费 35 分钟才能阅读完成。

本系列文章共 2 篇,对 Java 语言的异步非阻塞模式进行科普。《原理篇》解说异步非阻塞模型的原理,以及外围设计模式“Promise”的根本个性。《利用篇》会展现更加丰盛的利用场景,介绍 Promise 的变体,如异样解决、调度策略等,并将 Promise 和现有工具进行比照。

限于集体程度和篇幅,本系列以科普为主,内容更偏重于原理、API 设计、利用实际,然而不会深刻解说并发优化的具体细节。

1. 概述

异步非阻塞 [A] 是一种高性能的线程模型,在 IO 密集型零碎中失去广泛应用。在该模型下,零碎发动耗时申请后不须要期待响应,期间能够执行其余操作;当收到响应后,零碎收到告诉并执行后续解决。因为打消了不必要的期待,这种模型可能充分利用 cpu、线程等资源,进步资源利用率。

然而,异步非阻塞模式在晋升性能的同时,也带来了编码实现上的复杂性。申请和响应可能拆散到不同线程中,须要编写额定代码实现响应后果的传递。Promise 设计模式能够升高这种复杂性,封装数据传递、时序管制、线程平安等实现细节,从而提供简洁的 API 模式。

本文首先介绍异步非阻塞模式,从线程模型的角度剖析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的利用场景及工作流程。最初,提供一种繁难的 Java 实现,可能实现根本的性能需要,并做到线程平安。

在正式摸索技术问题之前,咱们先来看看什么是 异步非阻塞模型。如图 1 - 1 所示,展现了两个君子通信的场景:

  1. 两个君子代表相互通信的两个 线程,如数据库的客户端和服务端;他们能够部署在不同的机器上。
  2. 君子之间相互投递苹果,代表要 传递的音讯。依据具体业务场景,这些音讯可能会称为 request、response、packet、document、record 等。
  3. 君子之间须要 建设信道,音讯才得以传递。依据场景,信道称为 channel、connection 等。

假如左侧君子发动申请,而右侧君子解决申请并发送响应:左侧君子先投出一个苹果 request,被右侧君子接管到;右侧君子进行解决后,再投出苹果 response,被左侧君子接管到。咱们考查左侧君子在期待响应期间的行为,依据他在期待 response 期间是否能解决其余工作,将其演绎为“同步阻塞”和“异步非阻塞”两种模式。


图 1 -1 两个君子通信

首先咱们看看同步阻塞式通信的流程,如图 1 -2a 所示。

  1. 投递。左侧君子投递 request,并期待接管 response。
  2. 期待。在期待接管 response 期间,左侧君子劳动。不管是否还有其余 request 须要投递、是否还有其余工作须要解决,他都视若无睹,相对不会因而打断劳动。
  3. 响应。在收到 response 后,君子从劳动中唤醒并解决 response。


图 1 -2a 同步阻塞式通信

接下来咱们看看异步非阻塞式通信的流程,如图 1 -2b 所示。

  1. 缓存。左侧君子投递 request,并期待接管 response。和同步阻塞模式不同,君子并不需要亲手接住苹果 response,而是在地上搁置一个盘子称为“buffer”;如果君子临时不在场,那么所收到的苹果能够先存在盘子里,稍后再解决。
  2. 暂离。因为有盘子 buffer 的存在,君子投递 request 后就能够临时来到,去解决其余工作,当然也能够去投递下一个 request;如果须要向不同的 channel 投递 request,那么君子能够多摆放几个盘子,和 channel 一一对应。
  3. 响应。君子来到后,一旦某个盘子收到了 response,一只“大喇叭”就会响起,收回“channelRead”告诉,召唤君子回来解决 response。如果要解决多个 response 或多个 channel,那么 channelRead 告诉还须要携带参数,以阐明从哪个 channel 上收到了哪个 response。

这里的大喇叭能够用 NIO 或 AIO 来实现。简略来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时间接触发告诉,而没有轮询的过程。当然,本系列文章的读者并不需要理解更多实现细节,只需晓得异步非阻塞模式依赖于“大喇叭”来实现,它代替君子期待接管 response,从而解放君子去解决其余工作。


图 1 -2b 异步非阻塞式通信

依据下面的剖析,同步模式具备下列重大 毛病

  1. 同步阻塞模式的工作效率非常低下。君子大部分工夫都在劳动,仅当投递申请、解决响应时,才偶然醒来工作一小会;而在异步非阻塞模式下,君子从不劳动,快马加鞭地投递申请、解决响应,或解决其余工作。
  2. 同步阻塞模式会带来提早

咱们思考上面两种状况,如图 1 - 3 所示。

  • channel 复用,即左侧君子在一个 channel 上间断发送多条音讯。在同步阻塞模式下,一轮(即申请 + 响应)只能投递一个申请(苹果 1),而后续申请(苹果 2 -4)都只能排队期待,右侧君子须要期待很多轮能力收到所冀望的全副音讯。此外,左侧君子在期待接管某个 response 期间,没有机会解决收到的其余音讯,造成了数据处理的提早。不得不感叹,左侧君子太懈怠了!
  • 线程复用,即一个线程(君子)向多个 channel 发送音讯(苹果 1 -3,别离发向不同 channel)。左侧君子同一时刻只能做一件事,要么在工作,要么在劳动;他投递了苹果 1 后就躺下劳动,期待响应,全然不顾右侧君子 2、3 还在期待他们想要的苹果 2、3。


图 1 -3a channel 复用


图 1 -3b 线程复用

在这一章里咱们用漫画的模式,初步体验了同步阻塞模式与异步非阻塞模式,并剖析了两种模式的区别。接下来咱们从 Java 线程动手,对两种模式进行更加正式、更加贴近理论的剖析。

2. 异步非阻塞模型

2.1 Java 线程状态

在 Java 程序中,线程是调度执行的单元。线程能够取得 CPU 使用权来执行代码,从而实现有意义的工作。工作进行期间,有时会因为期待获取锁、期待网络 IO 等起因而暂停,通称“同步”或“阻塞”;如果多项工作可能同时进行,之间不存在束缚、不须要相互期待,这种状况就称为“异步”或“非阻塞”。
受限于内存、零碎线程数、上下文切换开销,Java 程序并不能有限创立线程;因而,咱们只能创立无限个线程,并尽量进步线程的利用率,即减少其工作时长、升高阻塞时长。异步非阻塞模型是缩小阻塞、进步线程利用率的无效伎俩。当然,这种模型并不能打消所有的阻塞。咱们首先来看看 Java 线程有哪些状态,其中哪些阻塞是必要的,哪些阻塞能够防止。

Java 线程状态包含:

  • RUNNABLE:线程在执行有意义的工作
    如图 2 -1a,线程如果在执行纯内存运算,那么处于 RUNNABLE 状态
    依据是否取得 cpu 使用权,又分为两个子状态:READY、RUNNING
  • BLOCKED/WAITING/TIMED_WAITING:线程正在阻塞
    如图 2 -1b、2-1c、2-1d,依据阻塞起因,线程处于下列状态之一
    BLOCKED:synchronized 期待获取锁
    WAITING/TIMED_WAITING:Lock 期待获取锁。两种状态的区别为是否设置超时时长


图 2 -1 Java 线程状态

此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,然而实际上也产生了阻塞。以 socket 编程为例,如图 2 - 2 所示,在收到数据之前 InputStream.read() 会阻塞,此时线程状态为 RUNNABLE。


图 2 -2 网络 IO

综上,Java 线程状态包含:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络 IO(阻塞)两种状况,而其余状态都是阻塞的。
依据阻塞起因,本文将 Java 线程状态演绎为以下 3 类:RUNNABLE、IO、BLOCKED

  1. RUNNABLE:Java 线程状态为 RUNNABLE,并且在执行有用的内存计算,无阻塞
  2. IO:Java 线程状态为 RUNNABLE,然而正在进行网络 IO,产生阻塞
  3. BLOCKED:Java 线程状态为 BLOCKED/WAITING/TIMED_WAITING,在并发工具的管制下,线程期待获取某一种锁,产生阻塞

要进步线程利用率,就要减少线程处于 RUNNABLE 状态的时长,升高处于 IO 和 BLOCKED 状态的时长。BLOCKED 状态个别是不可避免的,因为线程间须要通信,须要对临界区进行并发管制;然而,如果采纳适当的线程模型,那么 IO 状态的时长就能够失去升高,而这就是异步非阻塞模型。

2.2 线程模型:阻塞 vs 非阻塞

异步非阻塞模型可能升高 IO 阻塞时长,进步线程利用率。上面以数据库拜访为例,剖析同步和异步 API 的线程模型。如图 3 所示,过程中波及 3 个函数:

  1. writeSync()或 writeAsync():数据库拜访,发送申请
  2. process(result):解决服务器响应(以 result 示意)
  3. doOtherThings():任意其余操作,逻辑上不依赖服务器响应

同步 API 如图 3 -a 所示:调用者首先发送申请,而后在网络连接上期待来自服务器的响应数据。API 会始终阻塞,直至收到响应才返回;期间调用者线程无奈执行其余操作,即便该操作并不依赖服务器响应。理论的 执行程序 为:

  1. writeSync()
  2. process(result)
  3. doOtherThings() // 直至收到后果,以后线程能力执行其余操作

异步 API 如图 2 -3b 所示:调用者发送申请并注册回调,而后 API 立即返回,接下来调用者能够执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。理论的 执行程序 为:

  1. writeAsync()
  2. doOtherThings() // 曾经能够执行其余操作,并不需要期待响应
  3. process(result)


图 2 -3 同步 API & 异步 API

在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上能够和数据库拜访同时执行。然而对于同步 API,调用者被迫期待服务器响应,而后才能够执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无奈执行其余有用的操作,利用率非常低下。而异步 API 就没有这个限度,显得更加紧凑、高效。

在 IO 密集型零碎中,适当应用异步非阻塞模型,能够晋升数据库拜访吞吐量。思考这样一个场景:须要执行多条数据库拜访申请,且申请之间相互独立,无依赖关系。应用同步 API 和异步 API,线程状态随工夫变动的过程如图 2 - 4 所示。
线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交申请、解决响应。在 IO 状态下,线程在网络连接上期待响应数据。在理论零碎中,内存计算的速度十分快,RUNNABLE 状态的时长根本可疏忽;而网络传输的耗时会绝对更长(几十到几百毫秒),IO 状态的时长更加可观。

a.同步 API:调用者线程一次只能提交一个申请;直到申请返回后,能力再提交下一个申请。线程利用率很低,大部分工夫耗费在 IO 状态上。

b.异步 API:调用者线程能够间断提交多个申请,而之前提交的申请都还没有收到响应。调用者线程会注册一些回调,这些回调存储在内存中;稍后网络连接上收到响应数据,某个接管线程被告诉解决响应数据,从内存中取出所注册的回调,并触发回调。这种模型下,申请能够间断地提交、间断的响应,从而节约 IO 状态的耗时。


图 2 -4 线程工夫线:数据库拜访

异步非阻塞模式在 IO 密集型零碎中利用十分宽泛。罕用的中间件,如 http 申请[D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都反对异步 API。各位读者能够在参考文献中,查阅这些异步 API 的样例代码。对于中间件的异步 API,上面有几个注意事项:

  1. redis 的常见客户端有 jedis 和 lettuce [E]。其中 lettuce 提供了异步 API,而 jedis 只能提供同步 API;二者比照参见文章[I]。
  2. kafka producer[J]的 send()办法也反对异步 API,然而该 API 实际上不是纯异步的 [K]:当底层缓存满,或者无奈获取服务器(broker)信息时,send() 办法会产生阻塞。集体认为这是一个十分重大的设计缺点。kafka 罕用于低提早日志采集场景,零碎会将日志通过网络写入到 kafka 服务器,以缩小线程内的阻塞,晋升线程吞吐量;稍后其余过程会从 kafka 生产所写入的日志,进行长久存储。构想一个实时通信零碎,单条线程每秒须要解决几万到几十万条音讯,响应工夫个别为几毫秒到几十毫秒。零碎在解决期间须要常常调用 send() 来上报日志,如果每次调用都产生哪怕 1 秒的提早(理论有可能达几十秒),提早积攒起来也会重大劣化吞吐量和提早。

最初,异步 API 有多种实现,包含线程池、select(如 netty 4.x[L])、epoll 等。其共同点是调用者不须要在某一条网络连接上阻塞,以期待接收数据;相同,API 底层常驻无限数目的线程,当收到数据后,某一线程失去告诉并触发回调。这种模型也称为“响应式”模型,十分贴切。限于篇幅起因,本文次要关注 异步 API 设计,而不深刻解说异步 API 的实现原理。

3.Promise 设计模式

3.1 API 模式:同步、异步 listener、异步 Promise

上一章介绍了异步非阻塞模式和异步 API 的函数模式。异步 API 具备以下特色:

  1. 在提交申请时注册回调;
  2. 提交申请后,函数立即返回,不须要期待收到响应;
  3. 收到响应后,触发所注册的回调;依据底层实现,能够利用无限数目的线程来接管响应数据,并在这些线程中执行回调。

在保留异步个性的根底上,异步 API 的模式能够进一步优化。上一章图 2 -3b 展现了异步 API 的 listener 版本,特点是在提交申请时必须注册恰好一个回调;因此在下列场景下,listener API 会难以满足性能需要,须要调用者做进一步解决:

  1. 多个对象都关注响应数据,即须要注册多个回调;然而 listener 只反对注册一个回调。
  2. 须要将异步调用转为同步调用。例如某些框架(如 spring)须要同步返回,或者咱们心愿主线程阻塞直至操作实现,而后主线程完结、过程退出;然而 listener 只反对纯异步,调用者须要反复编写异步转同步的代码。

为了应答上述场景,咱们能够应用 Promise 设计模式来重构异步 API,以反对多个回调和同步调用。上面对同步 API、异步 listener API、异步 Promise API 的函数模式进行比照,如图 3 - 1 所示:

  • a. 同步:调用 writeSync() 办法并阻塞;收到响应后函数进行阻塞,并返回响应数据
  • b. 异步 listener:调用 writeAsync() 办法并注册 listener,函数立即返回;收到响应后,在其余线程触发所注册的 listener;
  • c. 异步 Promise:调用 writeAsync(),但不须要在函数中注册 listener,函数立即返回 Promise 对象。调用者能够调用异步的 Promise.await(listener),注册任意数目的 listener,收到响应后会按程序触发;此外,也能够调用同步的 Promise.await(),阻塞直至收到响应。


图 3 -1 API 模式:同步、异步 listener、异步 Promise

综上,Promise API 在放弃异步个性的前提下,提供了更高的灵活性。调用者能够自在选择函数是否阻塞,以及注册任意数目的回调。

3.2 Promise 的个性与实现

上一节介绍了 Promise API 的应用样例,其外围是一个 Promise 对象,反对注册 listener,以及同步获取响应 result;而本节将对 Promise 的性能进行更加具体的定义。留神,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展现共有的、必须具备的个性;短少这些个性,Promise 将无奈实现异步传递响应数据的工作。

3.2.1 性能个性

  • Promise 的根本办法

Promise 的基本功能是传递响应数据,须要反对下列办法,如表 3 - 1 所示:

上面以上一大节的数据库拜访 API 为例,演示 Promise 的工作流程,如图 3 - 2 所示:

  • a. 调用者调用 writeAsync() API,提交数据库拜访申请并获取 Promise 对象;而后调用 Promise.await(listener),注册对响应数据的 listener。Promise 对象也能够传递给程序中其余中央,使得关怀响应数据的其余代码,各自注册更多 listener。
  • b.writeAsync()外部,创立 Promise 对象并和这次申请关联起来,假如以 requestId 标识。
  • c.writeAsync()底层常驻无限数目的线程,用于发送申请和接管响应。以 netty 为例,当从网络上收到响应据后,其中一个线程失去告诉,执行 channelRead() 函数进行解决;函数取出响应数据和对应的 Promise 对象,并调用 Promise.signalAll() 进行告诉。留神这里是伪代码,和 netty 中回调函数的理论签名略有区别。


图 3 -2a 提交数据库拜访申请


图 3 -2b 创立 Promise 对象


图 3 -2c 告诉 Promise 对象

– Promise 的时序

Promise 的办法须要保障以下时序。此处以“A 对 B 可见”来形容时序,即:如果先执行操作 A(注册 listener)就会产生某种永恒效应(永恒记录这个 listener),之后再执行操作 B(告诉 result)就必须思考到这种效应,执行相应的解决(触发之前记录的 listener)。

  1. await(listener) 对 signalAll(result) 可见:注册若干 listener 后,告诉 result 时必须触发每一个 listener,不容许脱漏。
  2. signalAll(result) 对 await(listener) 可见:告诉 result 后,再注册 listener 就会立即触发。
  3. 首次 signalAll(result) 对后续 signalAll(result) 可见。首次告诉 result 后,result 即惟一确定,永不扭转。之后再告诉 result 就会疏忽,不产生任何副作用。申请超时是该个性一种典型利用:在提交申请的同时创立一个定时工作;如果能在超时时长内正确收到响应数据,则告诉 Promise 失常完结;否则定时工作超时,告诉 Promise 异样完结。不管上述事件哪个先产生,都保障只驳回首次告诉,使得申请后果惟一确定。

此外,某次 await(listener) 最好对后续 await(listener) 可见,以保障 listener 严格依照注册程序来触发。

– Promise 的非线程平安实现

如不思考线程平安,那么下列代码清单能够实现 Promise 的根本个性;线程平安的实现见下一大节。代码清单顺次展现了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个 注意事项

  1. 字段 listeners 存储 await(listener) 所注册的 listener。字段类型为 LinkedList,以存储任意数目的 listener,同时保护 listener 的触发程序。
  2. 字段 isSignaled 记录是否告诉过 result。如果 isSignaled=true,则后续调用 await(listener) 时立即触发 listener,且后续调用 signalAll(result) 时间接疏忽。此外,咱们以 isSignaled=true 而不是 result=null 来判断是否告诉过 result,因为某些状况下 null 自身也能够作为响应数据。例如,咱们以 Promise<Exception> 示意数据库写入的后果,告诉 null 示意写入胜利,告诉 Exception 对象(或某一子类)示意失败起因。
  3. signalAll(T result)在开端处调用 listeners.clear() 以开释内存,因为 listeners 曾经触发过,不再须要在内存中存储。
public class Promise<T> {

    private boolean isSignaled = false;
    private T result;

    private final List<Consumer<T>> listeners = new LinkedList<>();

    public void await(Consumer<T> listener) {if (isSignaled) {listener.accept(result);
            return;
        }

        listeners.add(listener);
    }

    public void signalAll(T result) {if (isSignaled) {return;}

        this.result = result;
        isSignaled = true;
        for (Consumer<T> listener : listeners) {listener.accept(result);
        }
        listeners.clear();}

    public T await() {// 适当阻塞,直至 signalAll()被调用;理论实现见 3.3 节
        return result;
    }
}

3.2.2 线程平安个性

上一章 3.2.1 节解说了 Promise 的性能,并提供了非线程平安的实现。本节展现如何应用并发工具,实现线程平安的 Promise,如下所示。有下列几个注意事项:

  1. 线程平安。各个字段均被多个线程拜访,因而都属于临界区,须要应用适当的线程平安工具进行上锁,如 synchronized、Lock。一种最简略的实现,是将全副代码纳入临界区内,进入办法时上锁,来到办法时放锁。留神在应用 return 进行提前返回时,不要遗记放锁。
  2. 在临界区外触发 listener,以缩小在临界区内停留的时长,并缩小潜在的死锁危险。
  3. 同步 await()。能够应用任何一种同步期待的工具来实现,如 CountDownLatch、Condition。此处应用 Condition 实现,留神依据 java 语法,操作 Condition 时必须先获取 Condition 所关联的锁。
public class Promise<T> {private final ReentrantLock lock = new ReentrantLock();
    private final Condition resultCondition = lock.newCondition();

    private boolean isSignaled = false;
    private T result;

    private final List<Consumer<T>> listeners = new LinkedList<>();

    public void await(Consumer<T> listener) {lock.lock();
        if (isSignaled) {lock.unlock(); // 不要遗记放锁
            listener.accept(result); // 在临界区外触发 listener
            return;
        }

        listeners.add(listener);
        lock.unlock();}

    public void signalAll(T result) {lock.lock();
        if (isSignaled) {lock.unlock(); // 不要遗记放锁
            return;
        }

        this.result = result;
        isSignaled = true;

        // this.listeners 的正本
        List<Consumer<T>> listeners = new ArrayList<>(this.listeners);
        this.listeners.clear();
        lock.unlock();

        for (Consumer<T> listener : listeners) {listener.accept(result); // 在临界区外触发 listener
        }

/* 操作 Condition 时须上锁 */
        lock.lock();
        resultCondition.signalAll();
        lock.unlock();}

    public T await() {lock.lock();
        if (isSignaled) {lock.unlock(); // 不要遗记放锁
            return result;
        }

        while (!isSignaled) {resultCondition.awaitUninterruptibly();
        }
        lock.unlock();

        return result;
    }
}

上述实现仅做演示应用,仍有较大的改良空间。生产环境的实现原理,读者能够参考 jdk CompletableFutre、netty DefaultPromise。能够改良的中央包含:

  1. 应用 CAS 设置响应数据。字段 isSignaled、result 能够合并为一个数据对象,而后应用 CAS 进行设值,从而进一步升高阻塞时长。
  2. 触发 listener 的时序。在上述代码中,Promise.signalAll() 会顺次触发 listener;在此期间,如果其余线程调用了异步 await(listener),因为 Promise 的响应数据已告诉,该线程也会触发 listener。上述过程中,两个线程同时触发 listener,因而没有严格保障触发程序。作为改良,相似于 netty DefaultPromise,Promise.signalAll() 外部能够设置一个循环,一直触发 listener 直至 listeners 排空,以防期间注册了新的 listener;在此期间,新注册的 listener 能够间接退出到 listeners 中,而不是立即触发。
  3. listener 的移除。在告诉响应数据之前,Promise 长期持有 listener 的援用,导致 listener 对象无奈被 gc。能够增加 remove(listener) 办法,或者容许仅持有 listener 的弱援用。

3.2.3 须防止的个性

后面的大节展现了 Promise 的个性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其该当只实现必要的数据传递个性,而不该当夹杂申请提交、数据处理等逻辑。接下来咱们来看一看,Promise 在实现时应防止哪些个性,以防限度调用者所能做出的决策。

1. 异步 await() 产生阻塞;该规定不仅实用于 Promise,也实用于任何异步 API。异步 API 罕用于实时通信等延时敏感的场景,作用是缩小线程阻塞,防止推延后续其余操作。一旦产生阻塞,零碎的响应速度和吞吐量就会受到重大冲击。

以间断提交数据库申请为例。如图 3 -3a 所示,调用者调用了一个异步 API,间断提交 3 次写入申请,并在所返回的 Promise 上注册回调。

咱们考查 writeAsync()与 await() 如产生阻塞阻塞,将会对调用者造成什么影响,如图 3 -3b 所示。提交申请是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果产生阻塞,则线程处于 BLOCKED 状态,暂停工作而无奈执行后续操作。当产生阻塞时,调用者每提交一个申请就不得不期待一段时间,从而升高了提交申请的频率,进而推延了服务器对这些申请的响应,使得零碎的吞吐量升高、提早回升。特地地,如果零碎采纳了多路复用机制,即一个线程能够解决多个网络连接或多个申请,那么线程阻塞将会重大拖慢后续申请的解决,造成比拟难排查的故障。

常见的 阻塞起因 包含:

  • Thread.sleep()
  • 向队列提交工作,调用了 BlockingQueue.put()和 take();应改为非阻塞的 offer()和 poll()
  • 向线程池提交工作,ExecutorService.submit(),如果线程池回绝策略为 CallerRunsPolicy,而工作自身又是耗时的。
  • 调用了阻塞的函数,包含:InputStream.read()、同步的 Promise.await()、KafkaProducer.send()。留神 KafkaProducer.send() 尽管模式上是异步 API,然而在底层缓存满或者无奈获取服务器(broker)信息时,send()办法仍会产生阻塞。


图 3 -3a 间断提交申请


图 3 -3b 申请解决工夫线

2. 绑定线程池(ExecutorService),用于执行申请。如图 3 - 4 所示,线程池是异步 API 的一种可选模型,但并不是惟一实现。

  • 线程池模型。为了不阻塞调用者,API 内置了线程池来提交申请、解决响应;调用者能够向线程池间断提交多个申请,然而不须要期待响应。调用者提交一条申请后,线程池中的某条线程就会被独占,期待接管响应并进行解决,但在此之前无奈再解决其余申请;实现解决后,该条线程从新变为闲暇,能够持续解决后续申请。
  • 响应式模型。相似地,API 内置了发送和接管线程来提交申请、解决响应,调用者也不须要同步期待。调用者提交一条申请后,发送线程向网络发送申请;实现发送后,线程立即变为闲暇,能够发送后续申请。当收到响应数据时,接管线程失去告诉以解决响应;实现解决后,线程立即变为闲暇,能够解决后续响应数据。上述过程中,任何一条线程都不会被某一申请独占,即线程随时都能够解决申请,而不须要期待之前的申请被响应。

综上,如果绑定了线程池,Promise 就实现了对其余模型(如响应式模型)的兼容性。

图 3 -4 线程工夫线:线程池 vs select

3. 在构造方法创立 Promise 对象时,定义如何提交申请。这种形式只能定义如何解决单条申请,而无奈实现申请的批量解决。

以数据库拜访为例,古代数据库个别反对批量读写,以稍微晋升单次访问的提早为代价,换来吞吐量显著晋升;如果吞吐量失去晋升,那么均匀提早反而会降落。上面的代码片段展现了一个批量申请 API:数据对象 BulkRequest 能够携带多条一般申请 Request,从而实现批量提交。

/* 提交单条申请 */
client.submit(new Request(1));
client.submit(new Request(2));
client.submit(new Request(3));

/* 提交批量申请 */
client.submit(new BulkRequest(new Request(1),
        new Request(2),
        new Request(3)
));

为了充分利用“批量申请”的个性,调用者须要进行逾越多条申请的“宏观调控”。申请产生后能够先缓存起来;期待一段时间后,取出所缓存的多条申请,组装一个批量申请来一并提交。因而,如上面的代码片段所示,在结构 Promise 时指定如何提交单条申请是没有意义的,这部分代码(client.submit(new Request(…)))并不会被执行;而理论心愿执行的代码,其实是提交批量申请(client.submit(new BulkRequest(…)))。

/* Promise:提交单条申请 */
new Promise<>(() -> client.submit(new Request(1)));
new Promise<>(() -> client.submit(new Request(2)));
new Promise<>(() -> client.submit(new Request(3)));

4. 在构造方法创立 Promise 对象时,定义如何解决响应数据,而不容许后续对响应数据注册回调。如上面的代码片段所示,在结构 Promise 对象时,注册了对响应数据的解决 process(result);然而除此以外,其余代码也有可能关怀响应数据,须要注册回调 process1(result)、process2(result)。如果 Promise 只能在结构时注册惟一回调,那么其余关注者就无奈注册所需回调函数,即 Promise API 进化回 listener API。

/* 定义如何解决响应数据 */
Promise<String> promise = new Promise<>(result -> process(result));

/* 其余代码也关怀响应数据 */
promise.await(result -> process1(result));
promise.await(result -> process2(result));

综上,Promise 应该是一个纯正的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序管制,保障触发回调函数无脱漏、保障触发程序。除此以外,Promise 不应该和任何实现策略相耦合,不应该杂糅提交申请、解决响应的逻辑。

4. 总结

本文解说了异步非阻塞设计模式,并对同步 API、异步 listener API、异步 Promise API 进行了比照。相比于其余两种 API,Promise API 具备无可比拟的灵活性,调用者能够自在决定同步返回还是异步返回,并容许对响应数据注册多个回调函数。最初,本文解说了 Promise 基本功能的实现,并初步实现了线程平安个性。

本系列共 2 篇文章,本文为第 1 篇《原理篇》。在下一篇《利用篇》中,咱们将看到 Promise 设计模式丰盛的利用场景,将其和现有工具进行联合或比照,以及对 Promise API 进行进一步变形和封装,提供异样解决、调度策略等个性。

参考文献

[A] 异步非阻塞 IO
https://en.wikipedia.org/wiki…

[B] Promise
https://en.wikipedia.org/wiki…

[C] java 线程状态
https://segmentfault.com/a/11…

[D] http 异步 API 样例:apache HttpAsyncClient
https://hc.apache.org/httpcom…

[E] redis 异步 API 样例:lettuce
https://github.com/lettuce-io…

[F] mongo DB 异步 API 样例:AsyncMongoClient
https://mongodb.github.io/mon…

[G] elasticsearch 异步 API 样例:RestHighLevelClient
https://www.elastic.co/guide/…

[H] influx DB 异步 API 样例:influxdb-java
https://github.com/influxdata…

[I] jedis vs lettuce
https://redislabs.com/blog/je…

[J] kafka
http://cloudurable.com/blog/k…

[K] KafkaProducer.send()阻塞
https://stackoverflow.com/que…

[L] netty
https://netty.io/wiki/user-gu…

正文完
 0