参考
Android RxJava:2.0 相对于 1.0的更新 & 变化(含 RxJava 1.0的更新使用)
RxJava 2.0 有什么不同 (译)
RxJava1 升级到 RxJava2 所踩过的坑
RxJava2.x与RxJava1.x的差异对比
RxJava2 vs RxJava1
Rxjava2最全面的解析
RxJava2 只看这一篇文章就够了
RxJava 并发之数据流发射太快如何办(背压(Backpressure))
Rxjava2 入门教程五:Flowable 背压支持——对 Flowable 最全面而详细的讲解
Rxjava2 入门教程六:Single、Completable、Maybe——简化版的 Observable
前言
说起来很惭愧, RxJava2已经出来很久了, 而在现在的项目中我也一直使用着它, 但是我一直都没怎么搞明白RxJava2和RxJava1之间的区别.今天就来好好搞明白.
依赖包更改
RxJava2和RxJava1是不能在一个项目中共存的, RxJava1的依赖方式是这样的
1 | compile 'io.reactivex:rxjava:1.3.4' |
而RxJava2的依赖方式是这样的
1 | implementation 'io.reactivex.rxjava2:rxjava:2.1.10' |
Nulls
RxJava2不再支持null值
我们先测试一段RxJava1的代码
1 | Observable.just(null).subscribe(new Subscriber<Object>() { |
输出结果:
1 | me.mundane.myrxjavasummary D/TestActivity: o = null |
再来一段RxJava2的代码
1 | Observable.just(null).subscribe(new Observer<Object>() { |
输出结果:
抛出了一个空指针异常, 具体位置是在这里
Observable和Flowable
关于背压的概念, 可以先看看这两篇博客
Android RxJava :图文详解 背压策略
RxJava 并发之数据流发射太快如何办(背压(Backpressure))
背压, 简单来说就是生产者生产数据的速度太快, 而消费者没有能力去处理数据.在同步操作中,这不是个问题,例如:
1 | // Produce |
虽然上面的消费者处理数据的速度慢,但是由于是同步调用的,所以当 o.onNext(1) 执行后,一直阻塞到消费者处理完才执行 o.onNext(2)
再看一个MissingBackpressureException的异常, 代码如下
1 | Observable.interval(1, TimeUnit.MILLISECONDS) |
其结果
在RxJava1中可以有一些Backpressure策略, 如onBackpressureBuffer和onBackpressureDrop, onBackpressureBuffer 会缓存所有当前无法消费的数据,直到 Observer 可以处理为止, onBackpressureDrop 会在消费者无法处理数据的时候就直接把该数据丢弃了.
在 RxJava1.X 中,同样是 Observable,有的不支持背压策略,导致某些情况下,显得特别麻烦,出了问题也很难排查.
RxJava2中把Observable不再支持背压,增加了被观察者的新实现Flowable来支持背压.
首先看一段RxJava2中的Observable的代码:
1 | Observable.create(new ObservableOnSubscribe<Long>() { |
运行起来之后发现内存占用暴增
直到最后就直接oom, 应用闪退
可见Observable不再会抛出MissingBackpressureException, 而是直接oom.由于上游通过 Observable 发射数据的速度大于下游通过 Consumer 接收处理数据的速度,而且上下游分别运行在不同的线程中,下游对数据的接收处理不会堵塞上游对数据的发射,造成上游数据积压,内存不断增加,最后便会导致内存溢出.
再来看Flowable的一段代码:
1 | Flowable.create(new FlowableOnSubscribe<Integer>() { |
也可以使用类似 RxJava1.x 的方式来控制
1 | Flowable.range(1,10000) |
这几个背压策略的含义如下:
以BUFFER举例来说, 可以无限制向里添加数据,不会抛出 MissingBackpressureException 异常,但会导致 OOM, 就和使用Observable一样.
只有在需要处理背压问题时,才需要使用 Flowable。
由于只有在上下游运行在不同的线程中,且上游发射数据的速度大于下游接收处理数据的速度时,才会产生背压问题;
所以,如果能够确定:
1、上下游运行在同一个线程中,
2、上下游工作在不同的线程中,但是下游处理数据的速度不慢于上游发射数据的速度,
3、上下游工作在不同的线程中,但是数据流中只有一条数据
则不会产生背压问题,就没有必要使用 Flowable,以免影响性能。
RxJava2 的接口方法都允许抛出异常
比如RxJava1的这段代码:
1 | Observable.interval(1, TimeUnit.MILLISECONDS) |
因为call方法没有抛出异常, 所以这里的Thread.sleep()我必须加上try…catch才可以.
而RxJava2的这段:
1 | Observable.create(new ObservableOnSubscribe<Long>() { |
在accept方法中允许抛出异常
Subscription
RxJava2中的Observer 中多了一个回调方法onSubscribe, 传递过来了一个Disposable的参数.Disposable相当于RxJava1中的Subscription, 用于解除订阅.同时这个订阅方法也改成了无返回值, 而不会再返回Disposable了
但是一些别的方法, 比如subscribe(Consumer)还是返回了一个Disposable
Maybe
Maybe可以发射 0 个或 1 个通知或错误的信号。因为最多有一个元素被发射,Maybe没有背压的概念。Maybe可发射一条单一的数据,以及发射一条完成通知,或者一条异常通知,其中完成通知和异常通知只能发射一个,发射数据只能在发射完成通知或者异常通知之前,否则发射数据无效。
代码举例:
1 | Maybe.create(new MaybeOnSubscribe<Integer>() { |
方法 void onSuccess(T t) 用来发射一条单一的数据,且一次订阅只能调用一次,不同于 Observale 的发射器 ObservableEmitter 中的 void onNext(@NonNull T value) 方法,在一次订阅中,可以多次调用多次发射.
doOnSubscribe的线程是在哪里指定的
这里主要是解决之前我一直以来的一个疑惑.首先我们看这段代码
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
测试结果:
可以看到我没有在任何地方指定主线程, doOnSybscribe就运行在主线程了.而加入我在doOnsubscribe后面跟上一句subscribeOn呢?代码如下:
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
测试结果:
由此我们可以得出结论, doOnSubscribe默认执行在主线程, 在它的后面跟上一个.subscribeOn()能够指定doOnSubscribe的执行线程.