RxJava使用总结

前言

RxJava出来已经很久了, 但我至今对它仍然不是很熟悉, 今天就系统的总结一下RxJava的常见用法和操作符.
首先打开RxJava在github的主页
https://github.com/ReactiveX/RxJava

点进去查看介绍, 可以看到都是英文的.说点题外话, 以前我也是比较害怕英文文档的, 但是最近有点变化, 大概阅读源码的时候发现把源码上的英文注释看懂之后再去看源码远远比直接看源码去理解容易得多.英文一遍读不懂可以多读几遍, 就当做阅读理解一样, 只要备好工具(网易有道词典什么的), 理解句子中所有单词的意思, 最终理解出来的意思一般和原本表达的意思差不到哪里去的.如果像大段的英文文档, 就先用google翻译把整个页面翻译一下, 通读一遍找到你最感兴趣的信息(因为一般我们都是在一大段文章中找一个点而已), 找到之后恢复成英文页面再去阅读那个点.
比如我想找到rxjava最新的版本号, 我就找到了这里

文档说关于maven, gradle等依赖信息可以在超链接中找到.然后以我就点击那个超链接

可以看到RxJava1最新的版本号是1.3.4(本篇只讨论RxJava1)
以下是我的参考资料:
给 Android 开发者的 RxJava 详解
可能是东半球最全的RxJava使用场景小结

一些基础操作

关于rxjava的观察者模式, 举个例子, 就像电灯和开关.台灯作为观察者, 始终观察着开关的一举一动, 而开关就作为被观察者.开关一旦打开, 台灯就亮起;开关一旦关闭, 台灯就熄灭.

创建被观察者Observable

基于上面的例子, 我们把Observable(被观察者)取名为switcher(开关)

1
2
3
4
5
6
7
8
9
10
11
// 开关作为被观察者
Observable<String> switcher = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("on");
subscriber.onNext("off");
subscriber.onNext("on");
subscriber.onNext("on");
subscriber.onCompleted();
}
});

对于创建Observable还有两种更”偷懒”的方式

1
2
// 创建被观察者的偷懒模式1
Observable<String> switcher = Observable.just("on", "off", "on", "on");
1
2
3
// 创建被观察者的偷懒模式2
String[] k = {"on", "off", "on", "on"};
Observable<String> switcher2 = Observable.from(k);

创建观察者Observer

我们把Observer(观察者)取名为light(电灯)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 电灯作为观察者, 对始终在观察者开关的动作, 对开关的动作而做出相应的反应
Subscriber<String> light = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
};

然后是被观察者订阅了观察者(这个逻辑和我们通常认为的不符, 但是方便了API的设计), 被观察者会持有观察者的引用

1
switcher.subscribe(light);

错误处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(0);
subscriber.onNext(3);
subscriber.onNext(5/0);
subscriber.onCompleted();
}
});

Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
// subscriber的onNext中出现的异常和observable和call方法中的给onNext()传参数出现的异常都会出现在这里
// 总之就是call()方法中发生的异常都会出现在这里
Log.e(TAG, "onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + 10 / integer);
}
};

observable.subscribe(subscriber);

过滤(filter)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just("on", "off", "on", "on")
.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
// 如果这里返回了true数据就会被回调到onNext, 否则返回了false就会被过滤掉
return TextUtils.equals("on", s);
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
});

scheduler(线程切换)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Log.d(TAG, "call: thread = " + Thread.currentThread().getName());
Drawable drawable = getResources().getDrawable(R.mipmap.avatar);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe()发生在IO线程, 即在订阅这个过程发生在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber的回调发生在主线程
.subscribe(new Observer<Drawable>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext: thread = " + Thread.currentThread().getName());
mIv.setImageDrawable(drawable);
}
});

这里需要说明一下了, subscribeOn()指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程(数据发出的线程)。 observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的Subscriber,而是 observeOn()执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定

不同于 observeOn(),subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

图中共有5处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn()影响,运行在红色线程;③和④处受第一个observeOn()的影响,运行在绿色线程;⑤处受第二个observeOn()影响,运行在紫色线程;而第二个subscribeOn(),由于在通知过程中线程就被第一个 subscribeOn()截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()的时候,只有第一个 subscribeOn()起作用。

doOnSubscribe()

默认情况下,doOnSubscribe()执行在subscribe()发生的线程(其实这一点我持怀疑态度);而如果在doOnSubscribe()之后有subscribeOn()的话,它将执行在离它最近的 subscribeOn()所指定的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.d(TAG, "call: thread = " + Thread.currentThread().getName());
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定事件发生在io线程
.doOnSubscribe(new Action0() { // doOnSubscribe在subscribe()调用后而且在事件发送前执行
@Override
public void call() {
Log.d(TAG, "call: thread = " + Thread.currentThread().getName());
Log.d(TAG, "call: 数据发送之前显示progressbar");
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定doOnSubscribe()发生在主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "call: thread = " + Thread.currentThread().getName());
Log.d(TAG, "call: 输出最终的数据" + integer);
}
});

map(变换)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable.just(R.mipmap.avatar)
.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
})
.subscribe(new Subscriber<Drawable>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Drawable drawable) {
mIv.setImageDrawable(drawable);
}
});

compose(对Observable整体的变换)

lift()的区别在于,lift()是针对事件项和事件序列的,而compose()是针对Observable自身进行变换。compose()方法其实挺有用的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Observable.Transformer<Integer, Drawable> transformer = new Observable.Transformer<Integer, Drawable>() {
@Override
public Observable<Drawable> call(Observable<Integer> observable) {
return observable.map(new Func1<Integer, Drawable>() {
@Override
public Drawable call(Integer integer) {
Drawable drawable = getResources().getDrawable(integer);
return drawable;
}
}).map(new Func1<Drawable, Drawable>() {
@Override
public Drawable call(Drawable drawable) {
return drawable;
}
}); // observable.map后面可以继续.lift
}
};
Observable.just(R.mipmap.avatar)
// compose是为了将一系列的变换方法封装起来
.compose(transformer)
.subscribe(new Action1<Drawable>() {
@Override
public void call(Drawable drawable) {
mIv.setImageDrawable(drawable);
}
});

flatmap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Student student1 = new Student();
student1.setStudentName("小明");
List<Student.Course> courseList1 = new ArrayList<>();
courseList1.add(new Student.Course("语文"));
courseList1.add(new Student.Course("数学"));
student1.setCourses(courseList1);

Student student2 = new Student();
student2.setStudentName("小红");
List<Student.Course> courseList2 = new ArrayList<>();
courseList2.add(new Student.Course("英语"));
courseList2.add(new Student.Course("化学"));
student2.setCourses(courseList2);

Student[] students = {student1, student2};

Observable.from(students)
.flatMap(new Func1<Student, Observable<Student.Course>>() {
@Override
public Observable<Student.Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(new Subscriber<Student.Course>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Student.Course course) {
Log.d(TAG, "onNext: course = " + course.courseName);
}
});

doOnNext

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "doOnNext: integer = " + integer);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "call: 最终输出 " + integer);
}
});

一些进阶的操作

timer(定时操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.timer(3, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
});

interval(周期性操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observable.interval(2, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
}
});

throttleFirst(在每次事件触发后的一定时间间隔内丢弃新的事件)

注: 这里引入了rxbinding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 只返回一秒内的第一个, 后续发射出来的全部丢弃
RxView.clicks(button)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(Object o) {
Log.d(TAG, "onNext: button clicked");
}
});

schedulePeriodically(轮询请求)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> observer) {
Schedulers.newThread()
.createWorker()
.schedulePeriodically(new Action0() {
@Override
public void call() {
observer.onNext("呵呵");
}
}, 0, 3, TimeUnit.SECONDS);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "call: " + s);
}
});

concat(将若干个Observable串联起来)

1
2
3
4
5
6
7
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.d(TAG, "concat: " + integer);
}
});

zip(将若干个Observable发射的数据组合)

1
2
3
4
5
6
7
8
9
10
11
12
Observable.zip(Observable.just(1, 2, 3), Observable.just("A", "B", "C"), new Func2<Integer, String, String>() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "zip: " + s);
}
});

merge

1
2
3
4
5
6
7
8
9
// 拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
// 与concat的区别在于不保证顺序, 按照事件产生的顺序
Observable.merge(Observable.just("1", "2", "3"), Observable.just("A", "B", "C"))
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d(TAG, "merge: " + s);
}
});

combineLatest

注: 可以用来处理表单验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
      mFirstObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
mEtFirst.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {

}

@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
subscriber.onNext(s.toString());
}

@Override
public void afterTextChanged(Editable s) {

}
});
}
});

mSecondObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
mEtSecond.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {

}

@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
subscriber.onNext(s.toString());
}

@Override
public void afterTextChanged(Editable s) {

}
});
}
});

Observable<Data> combineLatest = Observable.combineLatest(mFirstObservable, mSecondObservable, new Func2<String, String, Data>() {
@Override
public Data call(String s, String s2) {
return new Data(s, s2);
}
});

combineLatest.subscribe(new Action1<Data>() {
@Override
public void call(Data data) {
mTv.setText("first = " + data.first +", seoncd = " + data.second);
}
});

上面的例子可以用rxbinding简化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Observable<CharSequence> ObservableEmail = RxTextView.textChanges(mEmailView);
Observable<CharSequence> ObservablePassword = RxTextView.textChanges(mPasswordView);

Observable.combineLatest(ObservableEmail, ObservablePassword, new Func2<CharSequence, CharSequence, Boolean>() {
@Override
public Boolean call(CharSequence email, CharSequence password) {
return isEmailValid(email.toString()) && isPasswordValid(password.toString());
}
}).subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Boolean verify) {
if (verify) {
mEmailSignInButton.setEnabled(true);
} else {
mEmailSignInButton.setEnabled(false);
}
}
});

combineLatest与zip的区别在于, zip发射的数据是一一对应的, 如果一个一直发射数据而另一个不发射数据, 下游是获取不到数据的.而combineLates则是: 只要有一个数据源发射了数据, 就会调用call方法

与retrofit结合

1
2
3
4
5
public interface GankAPI {
// http://gank.io/api/data/福利/10/1
@GET("福利/{pagesize}/{page}")
Observable<GankMeiziResult> getMeiziData(@Path("pagesize") int pageSize, @Path("page") int page);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private RetrofitManager() {
mRetrofit = new Retrofit.Builder()
.client(genericClient())
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
// 所有的请求方法都放在同一个interface里, 所以这里创建出来的class不会很多, 所以可以用这一个
mGankAPI = mRetrofit.create(GankAPI.class);
}

public GankAPI getGankAPI() {
return mGankAPI;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
GankAPI gankAPI = APIFactory.createGankAPI();
// 这里的gankAPI.getMeiziData(10, 1)代替之前的call
gankAPI.getMeiziData(10, 1)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
showProgressDialog();
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定doOnSubscribe()所发生的线程, 其实这句代码可以不加的
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GankMeiziResult>() {
@Override
public void onCompleted() {
hideProgressDialog();
}

@Override
public void onError(Throwable e) {
hideProgressDialog();
}

@Override
public void onNext(GankMeiziResult gankMeizhiResult) {
mMeiZiAdapter = new MeiZiAdapter(gankMeizhiResult.beauties);
mRv.setAdapter(mMeiZiAdapter);
}
});

使用compose简化线程指定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
gankAPI.getMeiziData(10, 1)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
// 貌似doOnSubsribe的线程不受前面subscribeOn()指定线程的影响, 默认为主线程
Log.d(TAG, "currentThread = " + Thread.currentThread().getName());
showProgressDialog();
}
})
.compose(new Observable.Transformer<GankMeiziResult, GankMeiziResult>() {
@Override
public Observable<GankMeiziResult> call(Observable<GankMeiziResult> observable) {
return observable.observeOn(AndroidSchedulers.mainThread());
}
})
.subscribe(new Subscriber<GankMeiziResult>() {
@Override
public void onCompleted() {
hideProgressDialog();
}

@Override
public void onError(Throwable e) {
hideProgressDialog();
}

@Override
public void onNext(GankMeiziResult gankMeizhiResult) {
mMeiZiAdapter = new MeiZiAdapter(gankMeizhiResult.beauties);
mRv.setAdapter(mMeiZiAdapter);
}
});

进一步简化

1
2
3
4
5
6
7
8
9
10
11
12
public class RxSchedulersHelper {
public static <T> Observable.Transformer<T, T> io2main() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
gankAPI.getMeiziData(10, 1)
.compose(RxSchedulersHelper.<GankMeiziResult>io2main())
.doOnSubscribe(new Action0() {
@Override
public void call() {
// 貌似doOnSubsribe的线程不受前面subscribeOn()指定线程的影响, 默认为主线程
Log.d(TAG, "currentThread = " + Thread.currentThread().getName());
showProgressDialog();
}
})
.subscribe(new Subscriber<GankMeiziResult>() {
@Override
public void onCompleted() {
hideProgressDialog();
}

@Override
public void onError(Throwable e) {
hideProgressDialog();
}

@Override
public void onNext(GankMeiziResult gankMeizhiResult) {
mMeiZiAdapter = new MeiZiAdapter(gankMeizhiResult.beauties);
mRv.setAdapter(mMeiZiAdapter);
}
});

其实这里一些filter, map等统一操作都可以放在这个helper的compose里面

retrywhen()错误重连机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3 / mDivisor);
subscriber.onCompleted();
}
})
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {

if (throwable instanceof ArithmeticException) {
if (++retryCount <= maxRetries) {
Log.d(TAG, "正在重试");
if (retryCount == 3) {
mDivisor = 1;
}
return Observable.timer(2, TimeUnit.SECONDS);
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: ");
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: integer = " + integer);
}
});

github地址

https://github.com/mundane799699/AndroidProjects/tree/master/MyRxJavaSummary
总结的有点累, 写的有点乱, 毕竟不是太熟, 个人水平也有限.如有疏漏, 请帮助我指出, 感谢您的阅读.

0%