RxJava2学习笔记

参考

Android RxJava:2.0 相对于 1.0的更新 & 变化(含 RxJava 1.0的更新使用)
RxJava 2.0 有什么不同 (译)
RxJava1 升级到 RxJava2 所踩过的坑
RxJava2.x与RxJava1.x的差异对比
RxJava2 vs RxJava1
Rxjava2最全面的解析
RxJava2 只看这一篇文章就够了
RxJava 并发之数据流发射太快如何办(背压(Backpressure))
Rxjava2 入门教程五:Flowable 背压支持——对 Flowable 最全面而详细的讲解
Rxjava2 入门教程六:Single、Completable、Maybe——简化版的 Observable

前言

说起来很惭愧, RxJava2已经出来很久了, 而在现在的项目中我也一直使用着它, 但是我一直都没怎么搞明白RxJava2和RxJava1之间的区别.今天就来好好搞明白.

依赖包更改

RxJava2和RxJava1是不能在一个项目中共存的, RxJava1的依赖方式是这样的

1
2
compile 'io.reactivex:rxjava:1.3.4'
compile 'io.reactivex:rxandroid:1.2.1'

而RxJava2的依赖方式是这样的

1
2
implementation 'io.reactivex.rxjava2:rxjava:2.1.10'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

Nulls

RxJava2不再支持null值
我们先测试一段RxJava1的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.just(null).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {

Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {

Log.e(TAG, "onError: ", e);
}

@Override
public void onNext(Object o) {

Log.d(TAG, "o = " + o);
}
});

输出结果:

1
2
me.mundane.myrxjavasummary D/TestActivity: o = null
me.mundane.myrxjavasummary D/TestActivity: onCompleted:

再来一段RxJava2的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.just(null).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

Log.d(TAG, "onSubscribe: ");
}

@Override
public void onNext(Object o) {

Log.d(TAG, "onNext: o = " + o);
}

@Override
public void onError(Throwable e) {

Log.e(TAG, "onError: ", e);
}

@Override
public void onComplete() {

Log.d(TAG, "onComplete: ");
}
});

输出结果:

抛出了一个空指针异常, 具体位置是在这里

Observable和Flowable

关于背压的概念, 可以先看看这两篇博客
Android RxJava :图文详解 背压策略
RxJava 并发之数据流发射太快如何办(背压(Backpressure))
背压, 简单来说就是生产者生产数据的速度太快, 而消费者没有能力去处理数据.在同步操作中,这不是个问题,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
 // Produce
Observable<Integer> producer = Observable.create(o -> {
o.onNext(1);
o.onNext(2);
o.onCompleted();
});
// Consume
producer.subscribe(i -> {
try {
Thread.sleep(1000);
System.out.println(i);
} catch (Exception e) { }
});

虽然上面的消费者处理数据的速度慢,但是由于是同步调用的,所以当 o.onNext(1) 执行后,一直阻塞到消费者处理完才执行 o.onNext(2)

再看一个MissingBackpressureException的异常, 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long i) {
Log.d(TAG, "call: i = " + i);
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
});

其结果

在RxJava1中可以有一些Backpressure策略, 如onBackpressureBuffer和onBackpressureDrop, onBackpressureBuffer 会缓存所有当前无法消费的数据,直到 Observer 可以处理为止, onBackpressureDrop 会在消费者无法处理数据的时候就直接把该数据丢弃了.
在 RxJava1.X 中,同样是 Observable,有的不支持背压策略,导致某些情况下,显得特别麻烦,出了问题也很难排查.
RxJava2中把Observable不再支持背压,增加了被观察者的新实现Flowable来支持背压.
首先看一段RxJava2中的Observable的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
long i = 0;
while (true) {
i++;
emitter.onNext(i);
}
}
}).observeOn(Schedulers.newThread()).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// 每5000毫秒才消耗一个
Thread.sleep(5000);
Log.d(TAG, "accept: " + aLong);
}
});

运行起来之后发现内存占用暴增

直到最后就直接oom, 应用闪退

可见Observable不再会抛出MissingBackpressureException, 而是直接oom.由于上游通过 Observable 发射数据的速度大于下游通过 Consumer 接收处理数据的速度,而且上下游分别运行在不同的线程中,下游对数据的接收处理不会堵塞上游对数据的发射,造成上游数据积压,内存不断增加,最后便会导致内存溢出.
再来看Flowable的一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Flowable.create(new FlowableOnSubscribe<Integer>() {

@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {

for(int i=0;i<10000;i++){
e.onNext(i);
}
e.onComplete();
}
}, FlowableEmitter.BackpressureMode.ERROR) //指定背压处理策略,抛出异常
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d("JG", integer.toString());
Thread.sleep(1000);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d("JG",throwable.toString());
}
});

也可以使用类似 RxJava1.x 的方式来控制

1
2
3
Flowable.range(1,10000)
.onBackpressureDrop()
.subscribe(integer -> Log.d("JG",integer.toString()));

这几个背压策略的含义如下:

以BUFFER举例来说, 可以无限制向里添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM, 就和使用Observable一样.
只有在需要处理背压问题时,才需要使用 Flowable。

由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题;
所以,如果能够确定:
1、上下游运行在同一个线程中,
2、上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
3、上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用 Flowable,以免影响性能。

RxJava2 的接口方法都允许抛出异常

比如RxJava1的这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.observeOn(Schedulers.newThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long i) {
Log.d(TAG, "call: i = " + i);
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
});

因为call方法没有抛出异常, 所以这里的Thread.sleep()我必须加上try…catch才可以.
而RxJava2的这段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
long i = 0;
while (true) {
i++;
emitter.onNext(i);
}
}
}).observeOn(Schedulers.newThread()).
subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// 每5000毫秒才消耗一个
Thread.sleep(5000);
Log.d(TAG, "accept: " + aLong);
}
});

在accept方法中允许抛出异常

Subscription

RxJava2中的Observer 中多了一个回调方法onSubscribe, 传递过来了一个Disposable的参数.Disposable相当于RxJava1中的Subscription, 用于解除订阅.同时这个订阅方法也改成了无返回值, 而不会再返回Disposable了

但是一些别的方法, 比如subscribe(Consumer)还是返回了一个Disposable

Maybe

Maybe可以发射 0 个或 1 个通知或错误的信号。因为最多有一个元素被发射,Maybe没有背压的概念。Maybe可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。
代码举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Maybe.create(new MaybeOnSubscribe<Integer>() {
@Override
public void subscribe(MaybeEmitter<Integer> emitter) throws Exception {
emitter.onSuccess(1);
emitter.onSuccess(2);
emitter.onComplete();

}
}).subscribe(new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}

@Override
public void onSuccess(Integer integer) {

Log.d(TAG, "onSuccess: integer = " + integer);
}

@Override
public void onError(Throwable e) {

Log.e(TAG, "onError: ", e);
}

@Override
public void onComplete() {

Log.d(TAG, "onComplete: ");
}
});

方法 void onSuccess(T t) 用来发射一条单一的数据,且一次订阅只能调用一次,不同于 Observale 的发射器 ObservableEmitter 中的 void onNext(@NonNull T value) 方法,在一次订阅中,可以多次调用多次发射.

doOnSubscribe的线程是在哪里指定的

这里主要是解决之前我一直以来的一个疑惑.首先我们看这段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe, currentThread = " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {

Log.d(TAG, "doOnSubscribe, currentThread = " + Thread.currentThread()
.getName());
}
})
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "integer = "
+ integer
+ ", currentThread = "
+ Thread.currentThread());
}
});

测试结果:

可以看到我没有在任何地方指定主线程, doOnSybscribe就运行在主线程了.而加入我在doOnsubscribe后面跟上一句subscribeOn呢?代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "subscribe, currentThread = " + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {

Log.d(TAG, "doOnSubscribe, currentThread = " + Thread.currentThread()
.getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "integer = "
+ integer
+ ", currentThread = "
+ Thread.currentThread());
}
});

测试结果:

由此我们可以得出结论, doOnSubscribe默认执行在主线程, 在它的后面跟上一个.subscribeOn()能够指定doOnSubscribe的执行线程.

0%