前言
RxJava出来已经很久了, 但我至今对它仍然不是很熟悉, 今天就系统的总结一下RxJava的常见用法和操作符.
首先打开RxJava在github的主页
https://github.com/ReactiveX/RxJava
点进去查看介绍, 可以看到都是英文的.说点题外话, 以前我也是比较害怕英文文档的, 但是最近有点变化, 大概阅读源码的时候发现把源码上的英文注释看懂之后再去看源码远远比直接看源码去理解容易得多.英文一遍读不懂可以多读几遍, 就当做阅读理解一样, 只要备好工具(网易有道词典什么的), 理解句子中所有单词的意思, 最终理解出来的意思一般和原本表达的意思差不到哪里去的.如果像大段的英文文档, 就先用google翻译把整个页面翻译一下, 通读一遍找到你最感兴趣的信息(因为一般我们都是在一大段文章中找一个点而已), 找到之后恢复成英文页面再去阅读那个点.
比如我想找到rxjava最新的版本号, 我就找到了这里
文档说关于maven, gradle等依赖信息可以在超链接中找到.然后以我就点击那个超链接
可以看到RxJava1最新的版本号是1.3.4(本篇只讨论RxJava1)
以下是我的参考资料:
给 Android 开发者的 RxJava 详解
可能是东半球最全的RxJava使用场景小结
一些基础操作
关于rxjava的观察者模式, 举个例子, 就像电灯和开关.台灯作为观察者, 始终观察着开关的一举一动, 而开关就作为被观察者.开关一旦打开, 台灯就亮起;开关一旦关闭, 台灯就熄灭.
创建被观察者Observable
基于上面的例子, 我们把Observable(被观察者)取名为switcher(开关)
1 | // 开关作为被观察者 |
对于创建Observable还有两种更”偷懒”的方式
1 | // 创建被观察者的偷懒模式1 |
1 | // 创建被观察者的偷懒模式2 |
创建观察者Observer
我们把Observer(观察者)取名为light(电灯)
1 | // 电灯作为观察者, 对始终在观察者开关的动作, 对开关的动作而做出相应的反应 |
然后是被观察者订阅了观察者(这个逻辑和我们通常认为的不符, 但是方便了API的设计), 被观察者会持有观察者的引用
1 | switcher.subscribe(light); |
错误处理
1 | Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { |
过滤(filter)
1 | Observable.just("on", "off", "on", "on") |
scheduler(线程切换)
1 | Observable.create(new Observable.OnSubscribe<Drawable>() { |
这里需要说明一下了, subscribeOn()
指定 subscribe()
所发生的线程,即 Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程(数据发出的线程)。 observeOn()
: 指定 Subscriber
所运行在的线程。或者叫做事件消费的线程。Subscriber
并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe()
参数中的Subscriber
,而是 observeOn()
执行时的当前 Observable
所对应的 Subscriber
,即它的直接下级 Subscriber
。换句话说,observeOn()
指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn()
即可
1 | Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定 |
不同于 observeOn(),subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
图中共有5处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn()
影响,运行在红色线程;③和④处受第一个observeOn()
的影响,运行在绿色线程;⑤处受第二个observeOn()
影响,运行在紫色线程;而第二个subscribeOn()
,由于在通知过程中线程就被第一个 subscribeOn()
截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()
的时候,只有第一个 subscribeOn()
起作用。
doOnSubscribe()
默认情况下,doOnSubscribe()
执行在subscribe()
发生的线程(其实这一点我持怀疑态度);而如果在doOnSubscribe()
之后有subscribeOn()
的话,它将执行在离它最近的 subscribeOn()
所指定的线程。
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
map(变换)
1 | Observable.just(R.mipmap.avatar) |
compose(对Observable整体的变换)
和lift()
的区别在于,lift()
是针对事件项和事件序列的,而compose()
是针对Observable
自身进行变换。compose()
方法其实挺有用的.
1 | Observable.Transformer<Integer, Drawable> transformer = new Observable.Transformer<Integer, Drawable>() { |
flatmap
1 | Student student1 = new Student(); |
doOnNext
1 | Observable.just(1, 2, 3) |
一些进阶的操作
timer(定时操作)
1 | Observable.timer(3, TimeUnit.SECONDS) |
interval(周期性操作)
1 | Observable.interval(2, TimeUnit.SECONDS) |
throttleFirst(在每次事件触发后的一定时间间隔内丢弃新的事件)
注: 这里引入了rxbinding
1 | // 只返回一秒内的第一个, 后续发射出来的全部丢弃 |
schedulePeriodically(轮询请求)
1 | Observable.create(new Observable.OnSubscribe<String>() { |
concat(将若干个Observable串联起来)
1 | Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6)) |
zip(将若干个Observable发射的数据组合)
1 | Observable.zip(Observable.just(1, 2, 3), Observable.just("A", "B", "C"), new Func2<Integer, String, String>() { |
merge
1 | // 拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者 |
combineLatest
注: 可以用来处理表单验证
1 | mFirstObservable = Observable.create(new Observable.OnSubscribe<String>() { |
上面的例子可以用rxbinding简化
1 | Observable<CharSequence> ObservableEmail = RxTextView.textChanges(mEmailView); |
combineLatest与zip的区别在于, zip发射的数据是一一对应的, 如果一个一直发射数据而另一个不发射数据, 下游是获取不到数据的.而combineLates则是: 只要有一个数据源发射了数据, 就会调用call方法
与retrofit结合
1 | public interface GankAPI { |
1 | private RetrofitManager() { |
1 | GankAPI gankAPI = APIFactory.createGankAPI(); |
使用compose简化线程指定
1 | gankAPI.getMeiziData(10, 1) |
进一步简化
1 | public class RxSchedulersHelper { |
1 | gankAPI.getMeiziData(10, 1) |
其实这里一些filter, map等统一操作都可以放在这个helper的compose里面
retrywhen()错误重连机制
1 | Observable.create(new Observable.OnSubscribe<Integer>() { |
github地址
https://github.com/mundane799699/AndroidProjects/tree/master/MyRxJavaSummary
总结的有点累, 写的有点乱, 毕竟不是太熟, 个人水平也有限.如有疏漏, 请帮助我指出, 感谢您的阅读.