前言
为什么要老药换新汤
作为Android中 至关重要 的机制之一,十多年来,剖析它的文章一直,大量的内容曾经被开掘过了。所以:
- 曾经对这一机制熟稔于心的读者,在这篇文章中,看不到新货色了。
- 但对于还不太熟悉音讯机制的读者,能够在文章的根底上,持续挖一挖。
个别,诸如此类无关Android的音讯机制的文章,通过简略的检索和剖析,大部分是围绕:
- Handler,Looper,MQ的关系
- 下层的Handler,Looper、MQ 源码剖析
开展的。单纯的从这些角度学习的话,并不能 齐全了解 音讯机制。
这篇文章实质还是一次脑暴 ,一来:防止脑暴跑偏 ,二来:帮忙读者 捋清内容脉络 。先放出脑图:
脑暴:OS解决过程间通信问题
程序世界中,存在着大量的 通信 场景。搜寻咱们的常识,解决 过程间通信 问题有以下几种形式:
这段内容能够泛读,理解就行,不影响往下浏览
管道
- 一般管道pipe:一种 半双工 的通信形式,数据只能 单向流动 ,而且只能在具备 亲缘关系 的过程间应用。
- 命令流管道s\_pipe: 全双工,能够同时双向传输
- 命名管道FIFO:半双工 的通信形式,容许 在 无亲缘关系 的过程间通信。
音讯队列 MessageQueue:
音讯的链表,寄存在内核 中 并由 音讯队列标识符 标识。音讯队列克服了 信号传递信息少、管道 只能承载 无格局字节流 以及 缓冲区大小受限 等毛病。
共享存储 SharedMemory:
映射一段 能被其余过程所拜访 的内存,这段共享内存由 一个过程创立,但 多个过程都能够拜访。共享内存是 最快的 IPC 形式,它是针对 其余 过程间通信形式 运行效率低 而专门设计的。往往与其余通信机制一起应用,如 信号量 配合应用,来实现过程间的同步和通信。
信号量 Semaphore:
是一个 计数器 ,能够用来管制多个过程对共享资源的拜访。它常作为一种 锁机制,避免某过程正在访问共享资源时, 其余过程也拜访该资源,实现 资源的过程独占。因而,次要作为 过程间 以及 同一过程内线程间 的同步伎俩。
套接字Socket:
与其余通信机制不同的是,它能够 通过网络 ,在 不同机器之间 进行过程通信。
信号 signal:
用于告诉接管过程 某事件已产生。机制比较复杂。
咱们能够设想,Android之间也有大量的 过程间通信场景,OS必须采纳 至多一种 机制,以实现过程间通信。
认真钻研上来,咱们发现,Android OS用了不止一种形式。而且,Android 还基于 OpenBinder 开发了 Binder 用于 用户空间 内的过程间通信。
这里咱们留一个问题当前探索:
Android 有没有应用 Linux内核中的MessageQueue机制 干事件
基于音讯队列的音讯机制设计有很多劣势,Android 在很多通信场景内,采纳了这一设计思路。
音讯机制的三要素
不论在哪,咱们谈到音讯机制,都会有这三个因素:
- 音讯队列
- 音讯循环(散发)
- 音讯解决
音讯队列 ,是 音讯对象 的队列,根本规定是 FIFO。
音讯循环(散发), 根本是通用的机制,利用 死循环 一直的取出音讯队列头部的音讯,派发执行
音讯解决,这里不得不提到 音讯 有两种模式:
- Enrichment 本身信息齐备
- Query-Back 本身信息不齐备,须要回查
这两者的取舍,次要看零碎中 生成音讯的开销 和 回查信息的开销 两者的博弈。
在信息齐备后,接收者即可解决音讯。
Android Framework
Android 的Framework中的音讯队列有两个:
Java层 frameworks/base/core/java/android/os/MessageQueue.java
Native层 frameworks/base/core/jni/android\_os\_MessageQueue.cpp
Java层的MQ并不是 List 或者 Queue 之类的 Jdk内的数据结构实现。
Native层的源码我下载了一份 Android 10 的 源码(https://github.com/leobert-la...\_os\_MessageQueue.cpp) ,并不长,大家能够残缺的读一读。
并不难理解:用户空间 会接管到来自 内核空间 的 音讯 , 从 下图 咱们可知,这部分音讯先被 Native层 获知,所以:
- 通过 Native层 建设音讯队列,它领有音讯队列的各种根本能力
- 利用JNI 买通 Java层 和 Native层 的 Runtime屏障,在Java层 映射 出音讯队列
- 利用建设在Java层之上,在Java层中实现音讯的 散发 和 解决
PS:在Android 2.3那个时代,音讯队列的实现是在Java层的,至于10年前为何改成了 native实现, 揣测和CPU空转无关,笔者没有持续探索上来,如果有读者理解,心愿能够留言帮我解惑。
PS:还有一张经典的 系统启动架构图 没有找到,这张图更加直观
代码解析
咱们简略的 浏览、剖析 下Native中的MQ源码
Native层音讯队列的创立:
static jlong android\_os\_MessageQueue\_nativeInit(JNIEnv\* env, jclass clazz) { NativeMessageQueue\* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret\_cast<jlong>(nativeMessageQueue); }
很简略,创立一个Native层的音讯队列,如果创立失败,抛异样信息,返回0,否则将指针转换为Java的long型值返回。当然,会被Java层的MQ所持有。
NativeMessageQueue 类的构造函数
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
这里的Looper是native层Looper,通过静态方法 Looper::getForThread() 获取对象实例,如果未获取到,则创立实例,并通过静态方法设置。
看一下Java层MQ中会应用到的native办法
class MessageQueue { private long mPtr; // used by native code private native static long nativeInit(); private native static void nativeDestroy(long ptr); private native void nativePollOnce(long ptr, int timeoutMillis); /\*non-static for callbacks\*/ private native static void nativeWake(long ptr); private native static boolean nativeIsPolling(long ptr); private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events); }
对应签名:
static const JNINativeMethod gMessageQueueMethods\[\] = { /\* name, signature, funcPtr \*/ { "nativeInit", "()J", (void\*)android\_os\_MessageQueue\_nativeInit }, { "nativeDestroy", "(J)V", (void\*)android\_os\_MessageQueue\_nativeDestroy }, { "nativePollOnce", "(JI)V", (void\*)android\_os\_MessageQueue\_nativePollOnce }, { "nativeWake", "(J)V", (void\*)android\_os\_MessageQueue\_nativeWake }, { "nativeIsPolling", "(J)Z", (void\*)android\_os\_MessageQueue\_nativeIsPolling }, { "nativeSetFileDescriptorEvents", "(JII)V", (void\*)android\_os\_MessageQueue\_nativeSetFileDescriptorEvents }, };
mPtr 是Native层MQ的内存地址在Java层的映射。
Java层判断MQ是否还在工作:
private boolean isPollingLocked() { // If the loop is quitting then it must not be idling. // We can assume mPtr != 0 when mQuitting is false. return !mQuitting && nativeIsPolling(mPtr); }
static jboolean android\_os\_MessageQueue\_nativeIsPolling(JNIEnv\* env, jclass clazz, jlong ptr) { NativeMessageQueue\* nativeMessageQueue = reinterpret\_cast<NativeMessageQueue\*>(ptr); return nativeMessageQueue->getLooper()->isPolling(); }
/\*\* \* Returns whether this looper's thread is currently polling for more work to do. \* This is a good signal that the loop is still alive rather than being stuck \* handling a callback. Note that this method is intrinsically racy, since the \* state of the loop can change before you get the result back. \*/ bool isPolling() const;
唤醒 Native层MQ:
static void android\_os\_MessageQueue\_nativeWake(JNIEnv\* env, jclass clazz, jlong ptr) { NativeMessageQueue\* nativeMessageQueue = reinterpret\_cast<NativeMessageQueue\*>(ptr); nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { mLooper->wake(); }
Native层Poll:
static void android\_os\_MessageQueue\_nativePollOnce(JNIEnv\* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue\* nativeMessageQueue = reinterpret\_cast<NativeMessageQueue\*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv\* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } }
这里比拟重要,咱们先大略看下 Native层的Looper是 如何散发音讯 的
//Looper.h int pollOnce(int timeoutMillis, int\* outFd, int\* outEvents, void\*\* outData); inline int pollOnce(int timeoutMillis) { return pollOnce(timeoutMillis, NULL, NULL, NULL); } //实现 int Looper::pollOnce(int timeoutMillis, int\* outFd, int\* outEvents, void\*\* outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void\* data = response.request.data; #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) \*outFd = fd; if (outEvents != NULL) \*outEvents = events; if (outData != NULL) \*outData = data; return ident; } } if (result != 0) { #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) \*outFd = 0; if (outEvents != NULL) \*outEvents = 0; if (outData != NULL) \*outData = NULL; return result; } result = pollInner(timeoutMillis); } }
先解决Native层滞留的Response,而后调用pollInner。这里的细节比较复杂,稍后咱们在 Native Looper解析 中进行脑暴。
先于此处细节剖析,咱们晓得,调用一个办法,这是阻塞的 ,用大白话形容即在办法返回前,调用者在 期待。
Java层调动 native void nativePollOnce(long ptr, int timeoutMillis); 过程中是阻塞的。
此时咱们再浏览下Java层MQ的音讯获取:代码比拟长,间接在代码中进行要点正文。
在看之前,咱们先单纯从 TDD的角度 思考下,有哪些 次要场景 :当然,这些场景不肯定都合乎Android现有的设计
音讯队列是否在工作中
- 工作中,冀望返回音讯
- 不工作,冀望返回null
工作中的音讯队列 以后 是否有音讯
- 非凡的 外部功能性音讯,冀望MQ外部自行处理
- 曾经到解决工夫的音讯, 返回音讯
- 未到解决工夫,如果都是排过序的,冀望 空转放弃阻塞 or 返回静默并设置唤醒?依照后面的探讨,是冀望 放弃空转
- 不存在音讯,阻塞 or 返回null?-- 如果返回null,则在内部须要须要 放弃空转 或者 唤醒机制,以反对失常运作。从封装角度登程,该当 放弃空转,本人解决问题
- 存在音讯
class MessageQueue { Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. // 1. 如果 native音讯队列指针映射曾经为0,即虚援用,阐明音讯队列曾经退出,没有音讯了。 // 则返回 null final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; // 2. 死循环,当为获取到须要 \`散发解决\` 的音讯时,放弃空转 for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // 3. 调用native层办法,poll message,留神,音讯还存在于native层 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; //4. 如果发现 barrier ,即同步屏障,则寻找队列中的下一个可能存在的异步音讯 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { // 5. 发现了音讯, // 如果是还没有到约定工夫的音讯,则设置一个 \`下次唤醒\` 的最大时间差 // 否则 \`保护单链表信息\` 并返回音讯 if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX\_VALUE); } else { // 寻找到了 \`到解决工夫\` 的音讯。 \`保护单链表信息\` 并返回音讯 // Got a message. mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // 解决 是否须要 进行音讯队列 // Process the quit message now that all pending messages have been handled. if (mQuitting) { dispose(); return null; } // 保护 接下来须要解决的 IDLEHandler 信息, // 如果没有 IDLEHandler,则间接进入下一轮音讯获取环节 // 否则解决 IDLEHandler // If first time idle, then get the number of idlers to run. // Idle handles only run if the queue is empty or if the first message // in the queue (possibly a barrier) is due to be handled in the future. if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler\[Math.max(pendingIdleHandlerCount, 4)\]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // 解决 IDLEHandler // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers\[i\]; mPendingIdleHandlers\[i\] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } } }
Java层压入音讯
这就比较简单了,当音讯自身非法,且音讯队列还在工作中时。仍旧从 TDD角度 登程:
如果音讯队列没有头,冀望间接作为头
如果有头
- 音讯解决工夫 先于 头音讯 或者是须要立刻解决的音讯,则作为新的头
- 否则依照 解决工夫 插入到适合地位
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } synchronized (this) { if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
同步屏障 barrier前面独自脑暴, 其余局部就先不看了
Java层音讯散发
这一节开始,咱们脑暴音讯散发,后面咱们曾经看过了 MessageQueue ,音讯散发就是 不停地 从 MessageQueue 中取出音讯,并指派给解决者。 实现这一工作的,是Looper。
在后面,咱们曾经晓得了,Native层也有Looper,然而不难理解:
- 音讯队列须要 桥梁 连通 Java层和Native层
- Looper只须要 在本人这一端,解决本人的音讯队列散发即可
所以,咱们看Java层的音讯散发时,看Java层的Looper即可。关注三个次要办法:
- 出门下班
- 工作
- 上班回家
出门下班 prepare
class Looper { public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); } }
这里有两个留神点:
- 曾经出了门,除非再进门,否则没法再出门了。同样,一个线程有一个Looper就够了,只有它还活着,就没必要再建一个。
- 责任到人,一个Looper服务于一个Thread,这须要 注册 ,代表着 某个Thread 曾经由本人服务了。利用了ThreadLocal,因为多线程拜访汇合,总须要思考
竞争,这很不人道主义,罗唆分家,每个Thread操作本人的内容互不烦扰,也就没有了竞争,于是封装了 ThreadLocal
下班 loop
留神工作性质是 散发,并不需要本人解决
- 没有 注册 天然就找不到负责这份工作的人。
- 曾经在工作了就不要催,催了会导致工作出错,程序呈现问题。
- 工作就是一直的取出 老板-- MQ 的 指令 -- Message,并交给 相干负责人 -- Handler 去解决,并记录信息
- 007,不眠不休,当MQ再也不收回音讯了,没活干了,大家都散了吧,上班回家
class Looper { public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } if (me.mInLoop) { Slog.w(TAG, "Loop again would have the queued messages be executed" + " before this one completed."); } me.mInLoop = true; final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); // Allow overriding a threshold with a system prop. e.g. // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start' final int thresholdOverride = SystemProperties.getInt("log.looper." + Process.myUid() + "." + Thread.currentThread().getName() + ".slow", 0); boolean slowDeliveryDetected = false; for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger final Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } // Make sure the observer won't change while processing a transaction. final Observer observer = sObserver; final long traceTag = me.mTraceTag; long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs; long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs; if (thresholdOverride > 0) { slowDispatchThresholdMs = thresholdOverride; slowDeliveryThresholdMs = thresholdOverride; } final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0); final boolean logSlowDispatch = (slowDispatchThresholdMs > 0); final boolean needStartTime = logSlowDelivery || logSlowDispatch; final boolean needEndTime = logSlowDispatch; if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); } final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0; final long dispatchEnd; Object token = null; if (observer != null) { token = observer.messageDispatchStarting(); } long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid); try { //留神这里 msg.target.dispatchMessage(msg); if (observer != null) { observer.messageDispatched(token, msg); } dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0; } catch (Exception exception) { if (observer != null) { observer.dispatchingThrewException(token, msg, exception); } throw exception; } finally { ThreadLocalWorkSource.restore(origWorkSource); if (traceTag != 0) { Trace.traceEnd(traceTag); } } if (logSlowDelivery) { if (slowDeliveryDetected) { if ((dispatchStart - msg.when) <= 10) { Slog.w(TAG, "Drained"); slowDeliveryDetected = false; } } else { if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery", msg)) { // Once we write a slow delivery log, suppress until the queue drains. slowDeliveryDetected = true; } } } if (logSlowDispatch) { showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg); } if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } } }
上班 quit/quitSafely
这是比拟粗犷的行为,MQ来到了Looper就没法失常工作了,即上班即意味着辞职
class Looper { public void quit() { mQueue.quit(false); } public void quitSafely() { mQueue.quit(true); } }
/ Handler /
这里就比拟清晰了。API根本分为以下几类:
- 面向使用者:
- 创立Message,通过Message的 享元模式
- 发送音讯,留神postRunnable也是一个音讯
- 移除音讯,
- 退出等
面向音讯解决:
class Handler { /\*\* \* Subclasses must implement this to receive messages. \*/ public void handleMessage(@NonNull Message msg) { } /\*\* \* Handle system messages here. \* Looper散发时调用的API \*/ public void dispatchMessage(@NonNull Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } }
如果有 Handler callback,则交给callback解决,否则本人解决,如果没覆写 handleMessage ,音讯相当于被 drop 了。
音讯发送局部能够联合下图梳理:
阶段性小结,至此,咱们曾经对 Framework层的音讯机制 有一个残缺的理解了。 后面咱们梳理了:
- Native层 和 Java层均有音讯队列,并且通过JNI和指针映射,存在对应关系
- Native层 和 Java层MQ 音讯获取时的大抵过程
- Java层 Looper 如何工作
- Java层 Handler 大抵概览
依据后面梳理的内容,能够总结:从 Java Runtime 看:
- 音讯队列机制服务于 线程级别,即一个线程有一个工作中的音讯队列即可,当然,也能够没有。
- 即,一个Thread 至少有 一个工作中的Looper。
- Looper 和 Java层MQ 一一对应
- Handler 是MQ的入口,也是 音讯 的解决者
- 音讯-- Message 利用了 享元模式,本身信息足够,满足 自洽,创立音讯的开销性对较大,所以利用享元模式对音讯对象进行复用。
上面咱们再持续探索细节,解决后面语焉不详处留下的纳闷:
- 音讯的类型和实质
- Native层Looper 的pollInner
类型和实质
message中的几个重要成员变量:
class Message { public int what; public int arg1; public int arg2; public Object obj; public Messenger replyTo; /\*package\*/ int flags; public long when; /\*package\*/ Bundle data; /\*package\*/ Handler target; /\*package\*/ Runnable callback; }
其中 target是 指标,如果没有指标,那就是一个非凡的音讯: 同步屏障 即 barrier;
what 是音讯标识 arg1 和 arg2 是开销较小的 数据,如果 不足以表白信息 则能够放入 Bundle data 中。
replyTo 和 obj 是跨过程传递音讯时应用的,暂且不看。
flags 是 message 的状态标识,例如 是否在应用中,是否是同步音讯
下面提到的同步屏障,即 barrier,其作用是拦挡前面的 同步音讯 不被获取,在后面浏览Java层MQ的next办法时读到过。
咱们还记得,next办法中,应用死循环,尝试读出一个满足解决条件的音讯,如果取不到,因为死循环的存在,调用者(Looper)会被始终阻塞。
此时能够印证一个论断,音讯依照 性能分类 能够分为 三种:
- 一般音讯
- 同步屏障音讯
- 异步音讯
其中同步音讯是一种外部机制。设置屏障之后须要在适合工夫勾销屏障,否则会导致 一般音讯永远无奈被解决,而勾销时,须要用到设置屏障时返回的token。
Native层Looper
置信大家都对 Native层 的Looper产生趣味了,想看看它在Native层都干些什么。
对残缺源码感兴趣的能够看 这里(https://github.com/leobert-la...) ,上面咱们节选局部进行浏览。
后面提到了Looper的pollOnce,解决完搁置的Response之后,会调用pollInner获取音讯
int Looper::pollInner(int timeoutMillis) { #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG\_MAX) { nsecs\_t now = systemTime(SYSTEM\_TIME\_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = ALOOPER\_POLL\_WAKE; mResponses.clear(); mResponseIndex = 0; struct epoll\_event eventItems\[EPOLL\_MAX\_EVENTS\]; //留神 1 int eventCount = epoll\_wait(mEpollFd, eventItems, EPOLL\_MAX\_EVENTS, timeoutMillis); // Acquire lock. mLock.lock(); // 留神 2 // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = ALOOPER\_POLL\_ERROR; goto Done; } // 留神 3 // Check for poll timeout. if (eventCount == 0) { #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = ALOOPER\_POLL\_TIMEOUT; goto Done; } //留神 4 // Handle all events. #if DEBUG\_POLL\_AND\_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems\[i\].data.fd; uint32\_t epollEvents = eventItems\[i\].events; if (fd == mWakeReadPipeFd) { if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); } } else { ssize\_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= ALOOPER\_EVENT\_INPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER\_EVENT\_OUTPUT; if (epollEvents & EPOLLERR) events |= ALOOPER\_EVENT\_ERROR; if (epollEvents & EPOLLHUP) events |= ALOOPER\_EVENT\_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // 留神 5 // Invoke pending message callbacks. mNextMessageUptime = LLONG\_MAX; while (mMessageEnvelopes.size() != 0) { nsecs\_t now = systemTime(SYSTEM\_TIME\_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted \*before\* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG\_POLL\_AND\_WAKE || DEBUG\_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = ALOOPER\_POLL\_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); //留神 6 // Invoke all response callbacks. for (size\_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == ALOOPER\_POLL\_CALLBACK) { int fd = response.request.fd; int events = response.events; void\* data = response.request.data; #if DEBUG\_POLL\_AND\_WAKE || DEBUG\_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = ALOOPER\_POLL\_CALLBACK; } } return result; }
下面标记了留神点
- 1 epoll机制,期待 mEpollFd 产生事件, 这个期待具备超时工夫。
- 2,3,4 是期待的三种后果,goto 语句能够间接跳转到 标记 处
- 2 检测poll 是否出错,如果有,跳转到 Done
- 3 检测pool 是否超时,如果有,跳转到 Done
- 4 解决epoll后所有的事件
- 5 解决 pending 音讯的回调
- 6 解决 所有 Response的回调
并且咱们能够发现返回的后果有以下几种:
- ALOOPER\_POLL\_CALLBACK
有 pending message 或者 request.ident 值为 ALOOPER\_POLL\_CALLBACK 的 Response被解决了。 如果没有:
- ALOOPER\_POLL\_WAKE 失常唤醒
- ALOOPER\_POLL\_ERROR epoll谬误
- ALOOPER\_POLL\_TIMEOUT epoll超时
查找了一下枚举值:
ALOOPER\_POLL\_WAKE = -1, ALOOPER\_POLL\_CALLBACK = -2, ALOOPER\_POLL\_TIMEOUT = -3, ALOOPER\_POLL\_ERROR = -4
阶段性小结, 咱们对 音讯 和 Native层的pollInner 进行了一次脑暴,引出了epoll机制。
其实Native层的 Looper散发还有不少值得脑暴的点,但咱们先缓缓,曾经急不可待的要对 epoll机制进行脑暴了。
脑暴:Linux中的I/O模型
PS:本段中,存在局部图片间接援用自该文,我偷了个懒,没有去找原版内容并标记出处
阻塞I/O模型图:在调用recv()函数时,产生在内核中期待数据和复制数据的过程
实现十分的 简略,然而存在一个问题,阻塞导致线程无奈执行其余任何计算,如果是在网络编程背景下,须要应用多线程进步解决并发的能力。
留神,不要用 Android中的 点击屏幕等硬件被触发事件 去对应这里的 网络并发,这是两码事。
如果采纳了 多过程 或者 多线程 实现 并发应答,模型如下:
到这里,咱们看的都是 I/O 阻塞 模型。
脑暴,阻塞为调用办法后始终在期待返回值,线程内执行的内容就像 卡顿 在这里。
如果要打消这种卡顿,那就不能调用办法期待I/O后果,而是要 立刻返回 !举个例子:
- 去西装店定制西装,确定好样式和尺寸后,你坐在店里始终等着,等到做好了拿给你,这就是阻塞型的,这能等死你;
- 去西装店定制西装,确定好样式和尺寸后,店员通知你别干等着,好多天呢,等你有空了来看看,这就是非阻塞型的。
扭转为非阻塞模型后,应答模型如下:
不难理解,这种形式须要顾客去 轮询 。对客户不敌对,然而对店家可是一点损失都没有,还让等待区没那么挤了。
有些西装店进行了改革,对客户更加敌对了:
去西装店定制西装,确定好样式和尺寸后,留下联系方式,等西服做好了分割客户,让他来取。
这就变成了 select or poll 模型:
留神:进行改革的西装店须要减少一个员工,图中标识的用户线程,他的工作是:
- 在前台记录客户订单和联系方式
- 拿记录着 订单 的小本子去找制作间,一直查看 订单是否竣工,竣工的就能够提走并分割客户了。
而且,他去看订单竣工时,无奈在前台记录客户信息,这象征他 阻塞 了,其余工作只能先搁置着。
这个做法,对于制作间而言,和 非阻塞模型 并没有多大区别。还减少了一个店员,然而,用 一个店员 就解决了之前 很多店员 都会跑去 制作间 帮客户问"订单好了没有?" 的问题。
值得一提的是,为了进步服务质量,这个员工每次去制作间询问一个订单时,都须要记录一些信息:
- 订单完成度询问时,是否被应答;
- 应答有没有说谎;等
有些店对每种不同的考核项均筹备了记录册,这和 select模型相似
有些店只用一本记录册,然而册子上能够利用表格记录各种考核项,这和 poll 模型相似
select 模型 和 poll 模型的近似度比拟高。
没多久,老板就发现了,这个店员的工作效率有点低下,他每次都要拿着一本订单簿,去把订单都问一遍,倒不是员工不勤快,是这个模式有点问题。
于是老板又进行了改革:
- 在 前台 和 制作间 之间加一个送信管道。
- 制作间有进度须要汇报了,就送一份信到前台,信上写着订单号。
- 前台员工间接去问对应的订单。
这就变成了 epoll模型解决了 select/poll 模型的遍历效率问题。
这样改革后,前台员工就不再须要按着订单簿从上到下挨个问了。进步了效率,前台员工只有无事产生,就能够优雅的划水了。
咱们看一下NativeLooper的构造函数:
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mResponseIndex(0), mNextMessageUptime(LLONG\_MAX) { int wakeFds\[2\]; int result = pipe(wakeFds); LOG\_ALWAYS\_FATAL\_IF(result != 0, "Could not create wake pipe. errno=%d", errno); mWakeReadPipeFd = wakeFds\[0\]; mWakeWritePipeFd = wakeFds\[1\]; result = fcntl(mWakeReadPipeFd, F\_SETFL, O\_NONBLOCK); LOG\_ALWAYS\_FATAL\_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d", errno); result = fcntl(mWakeWritePipeFd, F\_SETFL, O\_NONBLOCK); LOG\_ALWAYS\_FATAL\_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", errno); // Allocate the epoll instance and register the wake pipe. mEpollFd = epoll\_create(EPOLL\_SIZE\_HINT); LOG\_ALWAYS\_FATAL\_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll\_event eventItem; memset(& eventItem, 0, sizeof(epoll\_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeReadPipeFd; result = epoll\_ctl(mEpollFd, EPOLL\_CTL\_ADD, mWakeReadPipeFd, & eventItem); LOG\_ALWAYS\_FATAL\_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", errno); }
总结
置信看到这里,大家曾经本人悟透了各种问题。依照常规,还是要总结下,因为 这篇是脑暴,所以 思路 是比拟 跳跃 的,内容前后关系不太显著。
咱们联合一个问题来点明内容前后关系。
Java层 Looper和MQ 会什么应用了死循环然而 不会"阻塞"UI线程 / 没造成ANR / 仍旧能够响应点击事件
- Android是基于 事件驱动 的,并建设了 欠缺的 音讯机制
- Java层的音讯机制只是一个部分,其负责的就是面向音讯队列,解决 音讯队列治理,音讯散发,音讯解决
- Looper的死循环保障了 音讯队列 的 音讯散发 始终处于无效运行中,不循环就进行了散发。
- MessageQueue的 死循环 保障了 Looper能够获取无效的音讯,保障了Looper 只有有音讯,就始终运行,发现无效音讯,就跳出了死循环。
- 而且Java层MessageQueue在 next() 办法中的死循环中,通过JNI调用了 Native层MQ的 pollOnce,驱动了Native层去解决Native层音讯
- 值得一提的是,UI线程解决的事件也都是基于音讯的,无论是更新UI还是响应点击事件等。
所以,正是Looper 进行loop()之后的死循环,保障了UI线程的各项工作失常执行。
再说的ANR,这是Android 确认主线程 音讯机制 失常 且 衰弱 运行的一种检测机制。
因为主线程Looper须要利用 音讯机制 驱动UI渲染和交互事件处理, 如果某个音讯的执行,或者其衍生出的业务,在主线程占用了大量的工夫,导致主线程长期阻塞,会影响用户体验。
所以ANR检测采纳了一种 埋定时炸弹 的机制,必须依附Looper的高效运行来打消之前装的定时炸弹。而这种定时炸弹比拟有意思,被发现了才会炸。
在说到 响应点击事件,相似的事件总是从硬件登程的,在到内核,再过程间通信到用户空间,这些事件以音讯的模式存在于Native层,通过解决后,体现出:
ViewRootImpl收到了InputManager的输出,并进行了事件处理
这里咱们借用一张图总结整个音讯机制流程:
最初,喜爱的敌人能够点个关注,感觉本文不错的敌人能够点个赞,你的反对就是我更新最大的能源。