前一篇文章我們分析了訂閱事件的方法register
EventBus.getDefault().register(this);
Android 輕量級執行緒間通信EventBus
Android EventBus保姆級原始碼決議(一)注冊方法register
這篇就繼續從發送時間的post方法剖析原始碼,
EventBus.getDefault().post(new MessageEvent())
EventBus.post
- 其中 PostingThreadState類保存了事件佇列、post狀態、當前執行緒等資訊,使用ThreadLocal包裝保證了執行緒一致性,
/** Posts the given event to the event bus. */
public void post(Object event) {
// 獲取當前執行緒的PostingThreadState物件,該物件包含事件佇列,保存在ThreadLocal中,
PostingThreadState postingState = currentPostingThreadState.get();
//獲取當前執行緒的事件佇列,將post的事件添加到佇列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 判斷事件是否在分發中,如果沒有則遍歷事件佇列進行實際分發,
if (!postingState.isPosting) {
//判斷當前執行緒是否是主執行緒
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//遍歷佇列,按順序出隊事件并進行分發
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post方法中主要是對分發狀態的控制,將當前Event事件入隊,然后遍歷事件佇列呼叫postSingleEvent方法對Event事件進行進一步的處理
EventBus.postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//這個flag上一篇說過,表示Event是否可以被繼承,如果是就查找所有的父類和介面,
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//查找所有的父類和介面
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//查找并分發事件到該Event事件的所有訂閱者,并且保存查找是否找到的flag
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//查找并分發事件到該Event事件的所有訂閱者
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果沒有查找訂閱者,則事件由Event包裝為NoSubscriberEvent重新發送
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
顯然,分發事件的具體邏輯又還在postSingleEventForEventType方法中,我們繼續看原始碼
EventBus.postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 上一篇說過subscriptionsByEventType,它是<事件型別(EventClass),訂閱(訂閱者,訂閱者方法)list>的鍵值對
//獲取事件型別(EventClass)對應的訂閱(訂閱者,訂閱者方法)list
subscriptions = subscriptionsByEventType.get(eventClass);
}
//如果存在訂閱者&訂閱者方法則繼續處理,否則回傳false表示沒有找到
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//遍歷到訂閱者后設定postThreadState
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//重點來了,正是這個方法真正去處理了Event事件,進行了訂閱方法的回呼
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postToSubscription這個方法是不是很熟悉,正是我們在上一篇的結尾中沒有講到的黏性事件的額外處理,關于為什么黏性事件先去執行我們后面再分析,來繼續跟進postToSubscription的代碼:
EventBus.postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//判斷訂閱方法的執行緒模式,這正是我們寫訂閱方法時在Subscribe注解中指定的執行緒模式ThreadMode
switch (subscription.subscriberMethod.threadMode) {
//當前執行緒
case POSTING:
invokeSubscriber(subscription, event);
break;
//主執行緒
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
由此我們可以看到真正的處理Event事件的真正操作,通過判斷當前訂閱方法的threadMode的值,按照訂閱方法的執行緒值分為了四個主要部分處理,POSTING代表著當前執行緒,在哪個執行緒發送事件,就在哪個執行緒處理事件;MAIN代表只在主執行緒處理事件;BACKGROUND代表只在非主執行緒處理事件;ASYNC也是代表在非主執行緒處理事件,
POSTINNG當然是最簡單的,不需要欄位外的處理,當前執行緒直接用invokeSubscriber通過反射呼叫訂閱方法:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//反射呼叫包裝里的訂閱方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
MAIN判斷當前執行緒是不是主執行緒,如果是就直接呼叫訂閱方法,不是就通過mainThreadPoster.enqueue來執行事件的處理,這個時候又出線了陌生的家伙mainThreadPoster,找一下它是怎么來的
// 在EventBus里面是這么定義的
private final HandlerPoster mainThreadPoster;
//在建構式里實體化
EventBus(EventBusBuilder builder) {
...
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
...
}
HandlerPoster
看看HandlerPoster這個類
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
//構造時傳入的引數是Looper.getMainLooper(),主執行緒的looper,又是繼承自Handler,明顯是主執行緒處理事物的handler
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
//這里又使用了一個PendingPostQueue的類
queue = new PendingPostQueue();
}
//剛才呼叫的就是這個方法了,obtainPendingPost這個方法好像又對subscription&event做了一個包裝
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//queue入隊?看起來像是一個處理事件的訊息佇列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
}
看到HanlderPoster.enqueue方法,引入了一個PendingPost和它的佇列PendingPostQueue,直覺告訴我應該是類似handler的messageQueue訊息佇列的東西,接著往里看
//平平無奇的節點類,包含了一個構造物件和清除物件的方法,pendingPostPool用來作為快取池和鎖物件
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
...
}
static void releasePendingPost(PendingPost pendingPost) {
...
}
}
//平平無奇的佇列,包含了出隊和入隊的方法
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
...
}
synchronized PendingPost poll() {
...
}
}
HandlerPoster.enqueue
平平無奇的事件佇列,繼續回到HandlerPoster的enqueue
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//pendingPost入隊
queue.enqueue(pendingPost);
//handlerActive 默認為false
if (!handlerActive) {
handlerActive = true;
//在主執行緒呼叫HandlerMessage
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//死回圈遍歷事件佇列
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
//當取到的時間為空時,同步再取一次避免執行緒不安全(有另一個執行緒此時入隊事件就可以同時處理)的情況
pendingPost = queue.poll();
//事件仍為空退出回圈
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//執行訂閱方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果處理回圈耗時超過maxMillisInsideHandleMessage,則會拋出例外
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
所以這就通過這個HanlderPoster的handleMessage方法就可以在主執行緒遍歷并且處理事件佇列了,再看看處理這個時間佇列節點的invokeSubscriber
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//清空pendingPost
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//最后還是用invokeSubscriber反射呼叫了訂閱方法
invokeSubscriber(subscription, event);
}
}
所以 MAIN這個case的流程就走完了,BackgroundPoster、ASYNC后面的兩個case和MAIN同理,BACKGROUND的邏輯是如果在主執行緒,那么會開啟一條新的執行緒處理事件,如果不在主執行緒,那么直接處理事件;而ASYNC的邏輯則是不管你處于什么執行緒,我都新開一條執行緒處理事件,
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
自此post的流程也講完了,那黏性事件是怎么實作的呢,相信大家心里也有頭緒了,下篇見~
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/384312.html
標籤:其他
