Eventbus使用及源码分析

EventBus基础

何为EventBus

EventBus是Android和Java的publish/subscribe事件总线。

EventBus框架

对于EventBus除了greenrobot的EventBus,实际上还有Google出品的Guava以及square的otto(修改自Guava),
Guava是一个庞大的库,EventBus只是它附带的一个小功能,实际项目使用较少。用的最多的是greenrobot
的EventBus,该库优点是接口简洁、集成方便。

EventBus基本使用

本文主要讨论greenrobot的EventBus(3.1.1)库。

EventBus中的三个主要角色

  1. Event:事件,它可以是任意类型,EventBus会根据事件类型进行全局的通知;
  2. Subscribe:事件订阅者,而在3.0之后事件处理的方法名可以随意取,不过需要加上注解@subscribe,
    并且指定线程模型,默认是POSTING。
  3. Publisher:事件的发布者,可以在任意线程里发布事件。一般情况下,使用 EventBus.getDefault()
    就可以得到一个EventBus对象,然后再调用 post(Object) 方法发布事件即可。
    各角色协作流程如下:
    角色协作

    EventBus的5种线程模型,分别是:

  4. POSTING :订阅者将直接在同一个线程中调用,该线程将发布该事件,这是默认值。事件传递意
    味着开销最小,因为它完全避免了线程切换。 因此,这是已知在很短的时间内完成而不需要主线程的简单任
    务的推荐模式。 使用此模式的事件处理程序必须快速返回以避免阻止发布线程,这可能是主线程;
  5. MAIN:在Android上,将在Android的主线程(UI线程)中调用订阅者。 如果发布线程是主线程,
    订阅者方法将直接调用,阻止发布线程。 否则事件排队等待交付(非阻塞)。 使用此模式的订户必须快速返
    回以避免阻止主线程。如果不在Android上,则行为与{@link #POSTING}相同;
  6. MAIN_ORDERED:在Android上,将在Android的主线程(UI线程)中调用订阅者。 与{@link
    #MAIN}不同,该事件将始终排队等待传递。 这可确保后置调用是非阻塞的;
  7. BACKGROUND:在Android上,将在后台线程中调用订阅者。 如果发布线程不是主线程,则将在发
    布线程中直接调用订阅者方法。 如果发布线程是主线程,则EventBus使用单个后台线程,该线程将按顺序传
    递其所有事件。 使用此模式的订阅者应尝试快速返回以避免阻止后台线程。 如果不在Android上,则始终使
    用后台线程。
  8. ASYNC: 订阅者将在单独的线程中调用。 这始终独立于发布线程和主线程。 发布事件永远不会等
    待使用此模式的订阅者方法。 订阅者方法如果执行可能需要一些时间,则应使用此模式,例如 用于网络访问。
    避免同时触发大量长时间运行的异步订阅者方法以限制并发线程数。 EventBus使用线程池从已完成的异步订
    阅者通知中有效地重用线程;

具体使用

  1. 在项目中引入依赖

    1
    implementation 'org.greenrobot:eventbus:3.1.1'
  2. 订阅/解除订阅

    • 订阅

      1
      EventBus.getDefault().register(this);//订阅
    • 解除订阅

      1
      EventBus.getDefault().unregister(this);//解除订阅
  3. 发布事件

1
2
//事件是POJO(普通旧Java对象),没有任何特定要求。如此处的BeanObject。
EventBus.getDefault().post(new BeanObject());
  1. 订阅事件处理

    1
    2
    3
    4
    5
    //BeanObject为发布的对象,这个要对应发布的对象(或称事件)
    @Subscribe(threadMode = ThreadMode.MAIN) //在ui线程执行
    public void onDataSynEvent(BeanObject event) {
    Log.e(TAG, "event---->");
    }
  2. 订阅事件的优先级
    事件的优先级类似广播的优先级,优先级越高优先获得消息

    1
    2
    3
    4
    @Subscribe(threadMode = ThreadMode.MAIN,priority = 100) //在ui线程执行 优先级100
    public void onDataSynEvent(BeanObject event) {
    Log.e(TAG, "event---->");
    }
  3. 中止事件往下传递
    发送有序广播可以终止广播的继续往下传递,EventBus也实现了此功能

    1
    EventBus.getDefault().cancelEventDelivery(event) ;//优先级高的订阅者可以终止事件往下传递
  4. 代码混淆

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    -keepattributes *Annotation*
    -keepclassmembers class ** {
    @org.greenrobot.eventbus.Subscribe <methods>;
    }
    -keep enum org.greenrobot.eventbus.ThreadMode { *; }

    # Only required if you use AsyncExecutor
    -keepclassmembers class * extends org.greenrobot.eventbus.util.ThrowableFailureEvent {
    <init>(java.lang.Throwable);
    }

EventBus黏性事件

EventBus除了普通事件也支持粘性事件,这个有点类似广播分类中的粘性广播。本身粘性广播用的就比较少,为了方便理解成订阅在发布事件之后,但同样可以收到事件。订阅/解除订阅和普通事件一样,但是处理订阅函数有所不同,需要注解中添加sticky = true

1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true) //在ui线程执行
public void onDataSynEvent(BeanObject event) {
Log.e(TAG, "event---->");
}

  • 发送粘性事件

    1
    EventBus.getDefault().postSticky(new BeanObject());
  • 取消粘性事件
    对于 粘性广播其属于常驻广播(事件一直会保留,除非用户主动移除,且只要有新的订阅者订阅就会把这个事件发送给新的订阅者) ,对于EventBus粘性事件也类似,因此如果不再需要该粘性事件时可以移除

    1
    2
    //⚠️一般在订阅者消费过该事件后调用,其还有一个参数为class的方法,取消class这类事件
    EventBus.getDefault().removeStickyEvent(new DataSynEvent());

或者移除所有粘性事件

1
EventBus.getDefault().removeAllStickyEvents();

EventBus processor使用

EventBus提供了一个EventBusAnnotationProcessor注解处理器来在编译期通过读取@Subscribe()注解并解析,
处理其中所包含的信息,然后生成java类来保存所有订阅者关于订阅的信息,这样就 比在运行时使用反射来获得这些订阅者的
信息速度要快
.

  1. 具体使用:在模块下的build.gradle文件中加入(以下针对Android Gradle Plugin version 2.2.0 及更高版本):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    android {
    defaultConfig {
    javaCompileOptions {
    annotationProcessorOptions {
    arguments = [ eventBusIndex : '项目包名.MyEventBusIndex' ] //参数设置
    }
    }
    }
    }

    dependencies {
    implementation 'org.greenrobot:eventbus:3.1.1'
    annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.1.1' //注解
    }
  2. 使用索引

此时编译一次,自动生成生成索引类。在\build\generated\source\apt\debug(或release)\PakageName\下看到通过注解分析生成的索引类,这样便可以在初始化EventBus时应用已经生成的索引了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();

putIndex(new SimpleSubscriberInfo(com.mugwort.lockscreen.ui.BaseActivity.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("doEventMessage", com.mugwort.lockscreen.entities.EventMessage.class,
ThreadMode.MAIN),
}));

}

private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}

添加索引到EventBus默认的单例中

1
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();

  1. 添加后注册效率会得到很大提升。

EventBus优缺点

  • 优点:简化组件之间的通信方式,实现解耦让业务代码更加简洁,可以动态设置事件处理线程以及优先级
  • 缺点:目前发现唯一的缺点就是类似之前策略模式一样的诟病,每个事件都必须自定义一个事件类,造成事件类太多,无形中加大了维护成本

参考:
[0]. 官网文档 http://greenrobot.org/eventbus/documentation/
[1]. https://www.cnblogs.com/whoislcj/p/5595714.html
[2]. https://juejin.im/post/5b6859706fb9a04fbc2218d2

EventBus源码解析

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

/** Convenience singleton for apps using a process-wide EventBus instance. */
/** 通过使用一个进程范围的实例生成一个便捷单例*/
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
* central bus, consider {@link #getDefault()}.
*/
public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

EventBus 初始化的三个步骤,直观上看用到 单例模式和Builder模式 ,将构造参数给分离了出来,实际上还用到了 策略模式 ,其中 Builder 中有些参数用于代码执行的策略,即传的参数不一样,执行的方式也就不一样,像 ignoreGeneratedIndex 作用就是让 EventBus 如何查找出订阅方法的策略。这些布尔类型的参数,在分析代码中可以逐步的了解到,先了解一些缓存对象,以更容易的了解源码:

  • subscriptionsByEventType: 内部是一个Map集合,可以根据 EventType 查找订阅事件。
  • typesBySubscriber: 根据订阅对象找到 EventType
  • stickyEvents: 粘性事件的并发缓存。
  • 事件投递者 : mainThreadPoster, backgroundPoster, asyncPoster 根据订阅注解 ThreadMode 去选择不同的投递者,不同投递者投递事件,接收函数会执行在不同的线程中。
  • subscriberMethodFinder:查找方法用的,内部维护了一个订阅方法的集合。

注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they
* are no longer interested in receiving events.
* <p/>
* Subscribers have event handling methods that must be annotated by {@link Subscribe}.
* The {@link Subscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

调用 register(this) 的时候就把订阅者给传了进来,代码量很少,主要就两个步骤,第一个 findSubscriberMethods 找出一个 SubscriberMethod 的集合,然后就遍历 SubscriberMethod 去订阅事件,先看看 findSubscriberMethods() 里面到底做了什么,返回的是什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

这里是通过 订阅者的类文件 查找里面对事件处理的订阅方法即添加了注解 @Subscribe 的方法。首先从缓存中查找(缓存的key是类文件class),如果找到了就立马返回。如果缓存中没有的话,则根据 ignoreGeneratedIndex 选择如何查找订阅方法,最后,找到订阅方法后,放入缓存,以免下次继续查找。ignoreGeneratedIndex 默认就是 false ,执行 findUsingInfo() 方法,但是这里先分析 findUsingReflection() ,因为默认配置的情况下还是会执行上面的 findUsingReflection(),就是通过反射来解析注解。

1
2
3
4
5
6
7
8
9
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState(); // 1
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState); // 2
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

在真正执行反射查找方法时,会传递一个 FindState,跟进 prepareFindState() 看下:

1
2
3
4
5
6
7
8
9
10
11
12
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

这里第一步从池中拿出一个 FindState 对象,FindState 中维护的是对订阅方法查找结果的封装。其实,往后面会发现作者这里设计的非常精妙。第二步,initForSubscriber() 就是将订阅者传给 FindState 对象。第三步做的就是不断从订阅者和订阅者的父类去查找订阅方法,一起看 findUsingReflectionInSingleClass()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

这里才是真正的查找订阅者下的订阅方法。通过对订阅者方法的遍历,看有没有注解,有的话就解析注解,然后将找到的订阅方法的集合封装到 FindState 对象中的 subscriberMethods 集合中。解析完了之后,在看 findUsingReflection() 方法的最后,返回了 getMethodsAndRelease(FindState),将 FindState 传给了 getMethodsAndRelease(FindState) 方法,跟进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}

从这里,可以知道作者设计 FindState池 的初心了,解析完了之后,将订阅方法赋给List集合,再回收 FindState ,继续接收解析,内存没有半点浪费。最后返回的是一个订阅方法的集合。这样,通过反射解析注解,找到订阅方法的方式已经分析完了。再看看通过apt处理器来找,apt处理是针对源码的处理,是执行在编译过程中的。所以性能要比反射好的多,也是推荐大家使用的方式。回到 findUsingInfo(),方法在没有配置时还是使用反射呢,一起看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
//将订阅类交给findState
findState.initForSubscriber(subscriberClass);
//扫描完当前类和其父类的订阅方法
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//根据eventType对SubscriberMethod检查
//如果有这个类型的方法,或者有这个方法类型的子类就返回false
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//没有EventBusIndex的信息时使用反射
findUsingReflectionInSingleClass(findState);
}
//上移到父类
findState.moveToSuperclass();
}
//置空FindState池
return getMethodsAndRelease(findState);
}

上面查找订阅方法,和通过反射查找基本一致,主要看看 getSubscriberInfo(findstate)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private SubscriberInfo getSubscriberInfo(FindState findState) {
//查找是否有订阅信息,无则直接放回
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

上述代码主要针对缓存,从缓存中获取订阅信息, 当使用apt处理时从 subscriberInfoIndexes
查找订阅信息的。而subscriberInfoIndexes是从 EventBus Builder 对象中获取:

1
2
3
4
5
6
SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
boolean ignoreGeneratedIndex) {
this.subscriberInfoIndexes = subscriberInfoIndexes;
this.strictMethodVerification = strictMethodVerification;
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
}

其中 subscriberInfoIndexes 默认为空,结合之前的代码,还是执行了 findUsingReflection() 方法,
那么其何时才得到赋值。要使 subscriberInfoIndexes 得到赋值需要引入 EventBusAnnotationProcessor 库。这个在 EventBus processor使用 片段已经做了介绍。

继续分析 findUsingInfo() 方法,需要注意:

1
2
3
4
5
6
7
for (SubscriberMethod subscriberMethod : array) {
//根据eventType对SubscriberMethod检查
//如果有这个类型的方法,或者有这个方法类型的子类就返回false
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}

这里添加订阅方法的时候做了各检查,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean checkAdd(Method method, Class<?> eventType) {
// 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
// Usually a subscriber doesn't have methods listening to the same event type.
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
//只有子类中没有发现这种类型的方法才返回true
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
// Paranoia check
throw new IllegalStateException();
}
// Put any non-Method object to "consume" the existing Method
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}

这里做了两步检查,第一步类型检查,第二步签名检查:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());
//使用方法名称和事件类型做key,保持方法
String methodKey = methodKeyBuilder.toString();
Class<?> methodClass = method.getDeclaringClass();
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
//如果传递过来的methodclass为父类,则直接返回
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
// Only add if not already found in a sub class
return true;
} else {
// Revert the put, old class is further down the class hierarchy
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

其实作者,这里又做了一个优化,将方法名和事件类型当作key,来保存方法,将传来的方法类型和我们签名的保存的比较,如果保存的是父类,就返回 true,如果是子类,就将传来的方法保存起来,返回 false。这样做的意图是,如果有父类的方法了,就没有必要添加子类的方法了,因为继承会执行到的。至此查找订阅方法的过程已经完全分析完了。看懂了之后,非常的过瘾。无论哪种方式查找,都返回了 SubscriberMethod 对象,我们看看它维护了什么属性:

1
2
3
4
5
6
7
8
9
/** Used internally by EventBus and generated subscriber indexes. */
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;

订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//根据订阅者和订阅方法构造一个订阅事件
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据eventType查找Subscription集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
//查看是否缓存过该订阅事件,没有则放进缓存
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//遍历订阅事件,找到比subsecriptions中订阅事件的位置,然后根据priority插进队列
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//根据订阅者查找Eventype的缓存
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果是粘性事件。立马处理
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

其实里面就做了两件事,将订阅方法和订阅者,封装到 subscriptionsByEventTypeypesBySubscriber ,至于这两个对象是干什么的呢?第一个是投递订阅事件的时候,就是根据 EventType 找到订阅事件,从而去分发事件,处理事件的;第二个是在调用 unregister(this) 的时候,根据订阅者找到 EventType,又根据 EventType 找到订阅事件,从而解绑用的。第二件事,就是如果是粘性事件的话,就立马投递、执行。

发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Posts the given event to the event bus. */
public void post(Object event) {
//每个线程维护一个投递状态
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

post(Object object) 的方法的时候就执行了上面的代码,PostingThreadState 是维护了投递的状态,最后循环投递,直到 PostingThreadState 中的 EventQueue 为空。那么代码最终执行到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

这里根据 ThreadMode 去处理事件了。由于篇幅的问题,就分析一种了,当线程模式是主线程的时候,意味着,需要执行的代码在主线程中操作。如果是主线程,就是通过 反射,直接运行订阅的方法,如果不是主线程,需要 mainThreadPoster 将订阅事件入队列,一起看看 mainThreadPoster 的工作原理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class HandlerPoster extends Handler implements Poster {

private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;

protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}

public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

其实,在 EventBus 初始化的时候,mainThreadPoster 就已经获取主线程的Looper了,
就是用到了我们Android的消息处理机制:Looper,Handler 。至于消息队列是自己维护的一个
单向的链表。每次向Andorid的主线程Looper投递一个空消息,然后在 HandlerMessage() 方法里
面从自己维护的队列中取出 PendingPost 进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

Object event;
Subscription subscription;
PendingPost next;
}
```
而 PendingPost 中维护的是订阅事件,EventType 和下一个 PendingPost 的地址。

### 反注册

```java
/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

反注册就是通过EventBus中 typesBySubscriber 这个属性,通过订阅者去查找订阅事件,然后去一一解绑的。当然,反注册主要是为了提高效率的,不然订阅的事件太多,非常影响性能。

新特性-粘性事件

在订阅方法后半部分,关于粘性事件的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}

这里看下 checkPostStickyEventToSubscription

1
2
3
4
5
6
7
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
// If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
// --> Strange corner case, which we don't take care of here.
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}

如果在注册的时候,指定要发布粘性事件,那么在订阅的时候,就立马调用 postToSubscription ,去发布了,至于它从缓存中 stickyEvents 中获取订阅事件,可能有疑问,什么时候把 EventType 放进去的呢?

1
2
3
4
5
6
7
8
9
10
11
/**
* Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
* event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
*/
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

粘性事件在调用 postSticky() 方法的时候就已经放入缓存。自此关于EventBus的源码也分析完了。

参考:
[0]. 官网git https://github.com/greenrobot/EventBus
[1]. http://www.10tiao.com/html/227/201607/2650236358/1.html