RxJava2基础
RxJava核心思想是观察者模式和响应式编程,因此其核心的东西主要有两个:Observable(被观察者) 和 Observer(观察者),Observable可以发出一系列的 事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后交给Observer的回调处理。
- RxJava2 的观察者模式
观察者模式是对象的行为模式,也叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。
- RxJava2 响应式编程结构
什么是响应式编程?举个栗子,a = b + c; 这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。
响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:
- Observable发出一系列事件,他是事件的产生者;
- Subscriber负责处理事件,他是事件的消费者;
- Operator是对Observable发出的事件进行修改和变换;
- 若事件从产生到消费不需要其他处理,则可以省略掉中间的Operator,从而流程变为Obsevable -> Subscriber;
- Subscriber通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的处理则交给Operator;
这个流程,可以简单的理解为:Observable -> Operator1 -> …….-> OperatorN -> Subscriber。
- 背压
背压简单理解即生产者的生产速度大于消费者的消费速度带来的问题,这个并非新概念,只是在RxJava2中新增了新的实现者Flowable及其子类:
初步体验
使用前需要添加相应的依赖:1
2compile 'io.reactivex.rxjava2:rxjava:2.0.7'
compile 'org.reactivestreams:reactive-streams:1.0.0'
注意:不同的版本在使用上可能略微有差异。
RxJava2的使用主要有三个步骤:
- 创建一个Observable
1 | //被观察者 |
- 创建一个Observer
1 | //观察者,观察者中onError和onComplete两个方法是互斥的,只有其中一个会执行。 |
- 建立订阅关系
1 | observable.subscribe(observer); |
通过以上可以看出,首先,创建Observable时,回调的是ObservableEmitter,字面意思即发射器,用于发射数据(onNext)和通知(onError/onComplete);其次,创建的Observer中多了一个回调方法onSubscrible,传递参数为Disposable,Disposable用于解除订阅关系。
包解析和类说明
操作符
目前只解析标准包中的操作符。对于扩展包,待查阅相应资料再补上。
创建操作符:用于创建Observable
create:使用OnSubscribe从头创建一个Observable。需要注意的是,使用该方法创建时,建议在ObservableOnSubscribe#subscribe方法中检查订阅状态,以便及时停止发射数据或者运算。
1
2
3
4
5
6Observable.create(new ObservableOnSubscribe<String >() {
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
}
});from*: 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable。Future转换为OnSubscribe是通过OnSubscribeToObservableFuture进行的,Iterable转换通过OnSubscribeFromIterable进行。数组通过OnSubscribeFromArray转换。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50List<String > list=new ArrayList<>();
list.add("abc");
Observable.fromArray(list).subscribe(new Observer<List<String>>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(List<String> strings) {
}
public void onError(Throwable throwable) {
}
public void onComplete() {
}
});
Future<String> future = Executors.newSingleThreadExecutor().submit(new Callable<String>() {
public String call() throws Exception {
return "abc";
}
});
Observable.fromFuture(future).subscribe(new Observer<String>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(String s) {
}
public void onError(Throwable throwable) {
}
public void onComplete() {
}
});just: 将一个或多个对象转换成发射这个或这些对象的一个Observable。如果是单个对象,内部创建的是ScalarSynchronousObservable对象。如果是多个对象,则是调用了from方法创建。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22List<String> strings = new ArrayList<>();
Observable.just(strings).subscribe(new Observer<List<String>>() {
public void onSubscribe(Disposable disposable) {
}
public void onNext(List<String> strings) {
}
public void onError(Throwable throwable) {
}
public void onComplete() {
}
});empty: 创建一个什么都不做直接通知完成的Observable。
- error: 创建一个什么都不做直接通知错误的Observable。
never: 创建一个什么都不做的Observable。
1
2
3Observable.empty(); //直接调用onCompleted。
Observable.error(new Throwable("adb"));//直接调用onError。这里可以自定义异常
Observable.never();//啥都不做timer: 创建一个在给定的延时之后发射数据项为0的Observable,内部通过OnSubscribeTimerOnce工作。
1 | Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() { |
interval:创建一个按照给定时间间隔发射从0开始的整数序列的Observable,内部通过ObservableInterval工作。
1
2
3
4
5
6Observable.interval(2,TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
public void accept(Long aLong) throws Exception {
}
});range:创建一个发射指定范围的整数序列的Observable。
1
2
3
4
5
6Observable.range(3,9).subscribe(new Consumer<Integer>() {
public void accept(Integer integer) throws Exception {
}
});defer:只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observable。
1
2
3
4
5
6
7
8
9
10
11Observable.defer(new Callable<ObservableSource<String >>() {
public ObservableSource<String > call() throws Exception {
return null;
}
}).subscribe(new Consumer<String>() {
public void accept(String s) throws Exception {
}
});
合并操作
用户组合多个Observable。
注意 为了使结构更加清晰以及缩小代码量,之后例子都使用Lamdda表达式。
concat:按顺序链接多个Observable,需要注意的是Observable。concat(a,b)等价与a.concatWith(b),内部调用的是concatArray。
1
2
3
4Observable<Integer> observable1=Observable.just(1,2,3,5);
Observable<Integer> observable2=Observable.just(2,5,6,8);
Observable.concat(observable1,observable2).subscribe(item -> System.out.println(item));
observable1.concatWith(observable2);startWith:在数据序列的开头增加一个数据项,其内部也是调用concatArray。
1
2
3Observable.just(1,2,3,4,5)
.startWith(1)
.subscribe(integer -> System.out.print(integer));merge:将多个Observable合并为一个,不同于concat,merge不是按照添加顺序链接,而是按照时间线来链接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知。
1
2
3Observable<String > observable1=Observable.just("4","3","1");
Observable<String > observable2=Observable.just("4","3","1");
Observable.merge(observable1,observable2).subscribe(s -> System.out.println(s));zip:使用一个函数组合多个Observab发射的数据集合,然后发射这个结果。然后再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合。内部通过OperatorZip进行压合。
1
2
3
4
5
6
7
8Observable<String > observable1=Observable.just("4","3","1");
Observable<String > observable2=Observable.just("4","3","1");
Observable.zip(observable1, observable2, new BiFunction<String, String, String >() {
public String apply(String s, String s2) throws Exception {
return s+" and "+s2;
}
}).subscribe(item -> System.out.println(item));
过滤操作
filter:过滤数据,内部通过ObservableFilter所虑数据。
1
2
3Observable.just(3, 4, 5, 6)
.filter(integer -> integer > 3)
.subscribe(integer -> System.out.println(integer));ofType:过滤制定的类型。
1
2
3Observable.just(3, 4, 5, "aad")
.ofType(Integer.class)
.subscribe(integer -> System.out.println(integer));take:只发射开始的N想数据或者一定时间内的数据,内部通过ObservableTake和ObservableTakeUntil过滤数据。
1
2
3
4Observable.just(3, 4, 5, "aad")
.take(3)
.take(100,TimeUnit.MILLISECONDS)
.subscribe(integer -> System.out.println(integer));takeLast:只发射最后的N项数据或者一定时间内的数据。
1
2
3
4Observable.just(3, 4, 5)
.takeLast(2)
.take(100,TimeUnit.MILLISECONDS)
.subscribe(integer -> System.out.println(integer));last/lastOrError:只发射最后一项数据。
- first/firstOrError:只发射第一项数据。
- skip:跳过开始的N项数据或者一定时间内的数据。
- skiplast:跳过最后的N想数据或者一定时间内的数据。
- elementAt/elementAtOrError:发射某项数据,如果超过了范围可以指定默认值。
- ignoreElements:丢弃所有数据,只发射错误或者正确终止的通知。
- distinct:过滤重复数据。
- distinctUntilChanged:过滤掉连续重复的数据。
- throttleFirst:定期发射Observable发射的第一项数据。
- throttleWithTimeout/debounce:发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射。
- sample/throttleLast:定期发射Observable最近的数据。
- timeout:如果原始Observable过了指定的一段时长没有发射数据,就发射一个异常或者使用备用的Observable。
条件或者布尔操作
all :判断所有的数据项是否满足某个条件,内部通过ObservableAllSingle实现。
1
2
3Observable.just(3, 4, 5)
.all(integer -> integer>3)
.subscribe(integer -> System.out.println(integer));//得到boolean值contains:判断在发射的所有数据项中是否包含指定的数据。
1
2
3Observable.just(3, 4, 5)
.contains(3)
.subscribe(integer -> System.out.println(integer)); //得到boolean值sequenceEqual:用于判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)。
1
2
3
4Observable<String> observable1 = Observable.just("4", "3", "1");
Observable<String> observable2 = Observable.just("4", "3", "1");
Observable.sequenceEqual(observable1,observable2)
.subscribe(integer -> System.out.println(integer)); //得到boolean值isEmpty:用于判断Observable发射完毕时,有没有发射数据。有数据false,如果只收到了onComplete通知则为true。
1
2
3Observable.just(3,2,5,4)
.isEmpty()
.subscribe(integer -> System.out.println(integer)); //得到boolean值amb:给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他的Observable将会被忽略。
1
2ambArray:给定多个Observable,只让第一个发射数据的Observable发射全部数据,其他的Observable将会被忽略。
1
2
3
4Observable<Integer> observable1 = Observable.just(4,3,2,1);
Observable<Integer> observable2 = Observable.just(6,7,8,9);
Observable.ambArray(observable1,observable2)
.subscribe(integer -> System.out.println(integer));switchIfEmpty:如果原始Observable正常终止后仍然没有发射任何的数据,就使用备用的Observable。
1
2
3Observable.empty()
.switchIfEmpty(Observable.just(2, 3, 4))
.subscribe(integer -> System.out.println(integer));defaultIfEmpty:如果原始Observable正常终止后仍然没有发射任何数据,就发射一个默认值,内部调用的switchIfEmpty。
1
2
3Observable.empty()
.defaultIfEmpty(Observable.just(2, 3, 4))
.subscribe(integer -> System.out.println(integer));takeUtil:当发射的数据满足某个条件后(包含该数据),或者第二个Observable发送完毕,终止第一个Observable发送数据。
1
2
3Observable.just(2,3,4,5)
.takeUntil(integer -> integer==4)
.subscribe(integer -> System.out.println(integer)); //得到2,3,4takeWhile:当发射的数据满足某个条件时(不包含该数据),Observab终止发射数据。
1
2
3Observable.just(2,3,4,5)
.takeWhile(integer -> integer==4)
.subscribe(integer -> System.out.println(integer));skipUntil:丢弃Observable发射的数据,直到第二个Observable发送数据。(丢弃条件数据)
- skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立(不丢弃条件数据)。
聚合操作
reduce/reduceInto:对序列使用reduce()函数并发射最终的结果。
1
2
3Observable.just(2,3,4,5)
.reduce((integer, integer2) -> integer+integer2)
.subscribe(integer -> System.out.println(integer)); //得到14collect:使用collect收集数据到一个可变的数据结构。
1
2
3
4Observable.just(2,3,4,5)
.collect((Callable<List<Integer>>) () -> new ArrayList<>(),
(s, integer) -> s.add(integer))
.subscribe(integer -> System.out.println(integer)); //得到列表[2, 3, 4, 5]count:计算发射的数量。
转换操作
toList:收集原始Observable发射的所有数据到一个列表,然后返回列表。
1
2
3Observable.just(2,3,4,5)
.toList()
.subscribe(integer -> System.out.println(integer)); //得到列表[2, 3, 4, 5]toSortedList:收集原始Observable发射的所有数据到一个有序的列表,然后返回这个列表。
1
2
3Observable.just(8,3,6,5)
.toSortedList((o1, o2) -> o1-o2)
.subscribe(integer -> System.out.println(integer));toMap:将序列数据转换为一个Map,可以根据数据项生成key和生成value。
1
2
3
4Observable.just(8,3,6,5)
//根据数据项生成map的key,//根据数据项生成map的kvalue
.toMap(integer -> integer+"-", integer -> integer+"+")
.subscribe(integer -> System.out.println(integer)); //得到{8-=8+, 6-=6+, 5-=5+, 3-=3+}toMultiMap:类似toMap,不同的地方在于map的value是一个集合。
变换操作
map:对Observable发射的每一项数据都应用一个函数来变换。
1
2
3Observable.just(8,3,6,5)
.map(integer -> "item"+integer)
.subscribe(integer -> System.out.println(integer)); //得到item8,item3,item6,item5cast:在发射之前将Observable发射的所有数据转换为指定类型。
flatMap:将Observable发射的数据变换为Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。
1
2
3
4
5
6Observable.just(8,3,6,5)
.flatMap((Function<Integer, ObservableSource<?>>) integer -> Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
observableEmitter.onNext(integer+"");
observableEmitter.onComplete();
}))
.subscribe(integer -> System.out.println(integer)); //得到8,3,6,5flatMapIterable:和flatMap的作用一样,只不过生产的是Iterable而不是Observable。
1
2
3
4
Observable.just(8,3,6,5)
.flatMapIterable(integer -> Arrays.asList("item"+integer))
.subscribe(integer -> System.out.println(integer));concatMap:类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射。
SwitchMap:和flatMap很像,将Observable发射的数据变换为Observable集合,将原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
for(int i=0;i<7;i++){
observableEmitter.onNext(i);
Thread.sleep(500);
}
observableEmitter.onComplete();
})
.switchMap((Function<Integer, ObservableSource<?>>) integer -> Observable.create(new ObservableOnSubscribe<Integer>() {
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(integer*10);
Thread.sleep(500);
observableEmitter.onComplete();
}
}))
.subscribe(integer -> System.out.println(integer)); //得到 0 10 20scan:与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序一次发射每个值。
1
2
3Observable.just(2,3,4)
.scan((integer, integer2) -> integer+integer2)
.subscribe(integer -> System.out.println(integer)); //得到 2 5 9groupBy:将Observab分拆为Observable集合,将原始Observable发射的数据按key分组,每一个Observable发射一组不同的数据。
1
2
3
4
5
6
7Observable.just(2,3,4)
.groupBy(integer -> integer%2==0?"偶数":"奇数")
.subscribe(stringIntegerGroupedObservable -> stringIntegerGroupedObservable.subscribe(integer ->
System.out.println(stringIntegerGroupedObservable.getKey()+":"+integer)));//得到
//偶数:2
//奇数:3
//偶数:4buffer:定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个。
1
2
3Observable.just(2,3,4)
.buffer(2)
.subscribe(integers -> System.out.println(integers)); //得到 [2, 3] [4]window:定期将来自Observable的数据分拆成一些Observable窗口,饭后发射这些窗口数据,而不是每次发射一项。
1
2
3Observable.just(2,3,4)
.window(2)
.subscribe(integerObservable -> integerObservable.subscribe(integer -> System.out.println(integer)));
错误处理/重试机制
onErrorResumeNext:当原始Observable在遇到错误时,使用备用的Observable。
1
2
3
4Observable.just(2,"3",4)
.cast(Integer.class)
.onErrorResumeNext(Observable.just(5,6,7))
.subscribe(integer -> System.out.println(integer)); //得到 2,5,6,7onErrorReturn:当原始Observable在遇到错误时发射一个特定的数据。
1
2
3
4Observable.just(2,"3",4)
.cast(Integer.class)
.onErrorReturn(throwable -> 0)
.subscribe(integer -> System.out.println(integer)); //得到2,0retry:当原始Observable在遇到错误时进行重试。
1
2
3
4Observable.just(2,"3",4)
.cast(Integer.class)
.retry(3)
.subscribe(integer -> System.out.println(integer),throwable ->System.out.println("error"));retryWhen:当原始Observable在遇到错误时,将错误传递给另一个Observable类决定是否要重新订阅这个Observable。
1
2
3
4Observable.just(2,"3",4)
.cast(Integer.class)
.retryWhen(throwableObservable -> throwableObservable.retry(1))
.subscribe(integer -> System.out.println(integer),throwable ->System.out.println("error"));
链接操作
ConnectableObservable与普通的Observable差不多,但是可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。
ConnectableObservable.connect()指示一个可连接的Observable开始发射数据。
Observable.publish()将一个Observable转换为一个可连接的Observable 。
Observable.replay()确保所有的订阅者看到相同的数据序列的ConnectableObservable,即使它们在Observable开始发射数据之后才订阅。
ConnectableObservable.refCount()让一个可连接的Observable表现得像一个普通的Observable。1
2
3
4ConnectableObservable co= ConnectableObservable.just(1,2,3)
.publish();
co.subscribe(integer -> System.out.println(integer));
co.connect();
阻塞操作
BlockingObservable是一个阻塞的Observable。普通的Observable 转换为 BlockingObservable,可以使用 Observable.blocking( )方法。内部通过CountDownLatch实现了阻塞操作。
以下的操作符可以用于BlockingObservable,如果是普通的Observable,务必使用Observable.blocking()转为阻塞Observable后使用,否则达不到预期的效果。
- blockingForEach:对BlockingObservable发射的每一项数据调用一个方法,会阻塞到Observable完成。
1
2
3Observable.just(2, 3, 4)
.observeOn(Schedulers.newThread())
.blockingForEach(integer ->System.out.println(integer) );
工具集
materialize:将Observable转换成一个通知列表。
1
2
3Observable.just(2, 3, 4)
.materialize()
.subscribe(integerNotification ->System.out.println(integerNotification.isOnNext()) );dematerialize:与上面的作用相反,将通知逆转回一个Observable。?
timestamp:给Observable发射的每个数据项添加一个时间戳。
1
2
3Observable.just(2, 3, 4)
.timestamp()
.subscribe(integerTimed ->System.out.println( integerTimed.time()+" "+integerTimed.value()));timeInterval:给Observable发射的两个数据项间添加一个时间差,实现在OperatorTimeInterval中。?
- serialize:强制Observable按次序发射数据并且要求功能是完好的。
- cache: 缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者。
- observeOn: 指定观察者观察Observable的调度器。
- subscribeOn: 指定Observable执行任务的调度器。
doOnEach: 注册一个动作,对Observable发射的每个数据项使用
1
2
3Observable.just(2, 3, 4)
.doOnEach(integerNotification -> integerNotification.getValue())
.subscribe(integer -> System.out.println(integer));doOnCompleted: 注册一个动作,对正常完成的Observable使用。
- doOnError: 注册一个动作,对发生错误的Observable使用。
doOnTerminate:注册一个动作,对完成的Observable使用,无论是否发生错误。
1
2
3Observable.just(2, 3, 4)
.doOnTerminate(() -> System.out.println("do torminate"))
.subscribe(integer -> System.out.println(integer));doOnSubscribe: 注册一个动作,在观察者订阅时使用。内部由OperatorDoOnSubscribe实现。
1
2
3Observable.just(2, 3, 4)
.doOnSubscribe(disposable -> disposable.isDisposed())
.subscribe(integer -> System.out.println(integer));doOnUnsubscribe: 注册一个动作,在观察者取消订阅时使用。内部由OperatorDoOnUnsubscribe实现,在call中加入一个解绑动作。
doFinally: 注册一个动作,在Observable完成时使用。
1
2
3Observable.just(2, 3, 4)
.doFinally(() -> {})
.subscribe(integer -> System.out.println(integer));delay: 延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)。
delaySubscription: 延时处理订阅请求。实现在OnSubscribeDelaySubscription中
1
2
3Observable.just(2, 3, 4)
.delaySubscription(200,TimeUnit.MILLISECONDS)
.subscribe(integer -> System.out.println(integer));single/singleOrError: 强制返回单个数据,否则抛出异常或默认数据。
参考: