ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] 本系列文章为 Season_zlc 大神的[《给初学者的RxJava2.0教程》](http://www.jianshu.com/p/464fa025229e)系列学习笔记。 # 配置 添加 Gradle 配置: ```groovy implementation 'io.reactivex.rxjava2:rxjava:x.y.z' implementation 'io.reactivex.rxjava2:rxandroid:x.y.z' ``` # 基础 [原文链接](http://www.jianshu.com/p/464fa025229e) ![](https://img.kancloud.cn/f7/b3/f7b38edb806c5854a3dbde9b974c5e41_580x325.png) ## 上下游 * 上游和下游分别对应着 Observable 和 Observer * 水管中流动的是事件(对象),这个重要! ## 发送事件 * 只有调用了 subscribe 方法之后,上游才会发事件 * 上游发送了 onComplete 或 onError 事件后可以继续发送其他事件,而下游接收了 onComplete 或 onError 事件之后不再接收其他事件 * 上游可以不发送 onComplete 或 onError 事件,发送的话只能发送一次并且互斥,即不能同时发送 ## 水管机关 * 调用 subscribe 方法就把上游、下游两个水管连接起来了,同时会发送一个 onSubscribe 事件,并可以拿到一个 disposable 开关 * disposable 可以理解为两根管子之间的一个机关,调用其 dispose 方法时,可以将两根管道切断,导致下游收不到事件 * RxJava 内置了一个 disposable 容器 CompositeDisposable,可以存储 disposable,用来在 Activity 退出时调用所有机关的 dispose 方法,使下游不再接收事件 ```java Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { Disposable mDisposable; @Override public void onSubscribe(Disposable d) { mDisposable = d; } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; observable.subscribe(observer); ``` # 事件源类型 事件源类型|描述 ---|--- Flowable|可以发送 N 个事件,支持背压 Observable|可以发送 N 个事件,不支持背压 Single|可以发送 1 个事件,或 1 个 onError 事件 Completable|不可以发送普通事件,可以发送 1 个 onComplete 事件或 1 个 onError 事件 Maybe|可以发射 1 个事件、1 个 onError 事件,或者什么都不发射 # 线程调度 [原文链接](http://www.jianshu.com/p/8818b98c44e2) ## 应用场景 Rxjava 最常见的应用场景是: 在子线程进行数据计算、网络请求操作等,然后回到主线程展示结果(成功或错误) ## 内置线程 * Schedulers.io():代表 IO 操作线程,用于网络、读写文件等 IO 密集型操作 * Schedulers.newThread():代表一个常规的新线程 * Schedulers.computation():代表 CPU 计算密集型操作,如需要大量计算操作时使用 * AndroidSchedulers.mainThread():Android 主线程 ## 线程调度 * subscribeOn 指定上游发送事件的线程 * observeOn 指定下游接收事件的线程 * 多次指定上游线程只有第一次有效,其他会被忽略 * 多次指定下游线程没问题,每调用一次 observeOn,下游线程就会切换一次 ```java observable.subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.io()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此处为 IO 线程 } }) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此处为主线程 } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // 此处为新的子线程 } }); ``` # 变换操作符 [原文链接](http://www.jianshu.com/p/128e662906af) ## Map 操作符 * 对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数变化 ```java Observable.fromArray(1, 2, 3) .map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return "Result is " + integer; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i(TAG, s); } }); ``` ## FlatMap 操作符 ![](https://img.kancloud.cn/7f/19/7f19a8efb4005e241437ed6cd54bdc89_647x763.png) ### 定义 * FlatMap 的作用是将一个事件,转换为,发送新的事件的,Observable * 被转换为 Observable 后开始发送新的事件 * 发送出来的,这些新的事件,会被合并到一个水管中,发给下游 * FlatMap 相当于直接转换了上游发射源所发射的事件类型 * Map 相当于转换了水管中流动的事件的类型 ### 其他 * FlatMap 不保证新事件的顺序,ConcatMap 可以保证 ### 代码示例 ```java public interface Api { @GET Observable<LoginResponse> login(@Body LoginRequest request); @GET Observable<RegisterResponse> register(@Body RegisterRequest request); } ``` ```java api.register(new RegisterRequest()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(new Consumer<RegisterResponse>() { @Override public void accept(RegisterResponse registerResponse) throws Exception { // 先根据注册结果做一些事情 } }) .observeOn(Schedulers.io()) .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { @Override public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception { return api.login(new LoginRequest()); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<LoginResponse>() { @Override public void accept(LoginResponse loginResponse) throws Exception { // 登录成功 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { // 登录失败 } }); ``` ## Zip 操作符 [原文链接](http://www.jianshu.com/p/bb58571cdb64) ### 定义 * Zip 可将多个 Observable 发送的事件结合到一起,然后发送这些组合到一起的事件 * 严格按照顺序执行 * 只发射与发射数据项数最少的那个 Observable 一样多的数据 ![](https://img.kancloud.cn/ea/49/ea49166f780a9b84b9f1c52753423abb_648x879.png) ### 代码示例 ```java public interface Api { @GET Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request); @GET Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request); } ``` ```java Observable<UserBaseInfoResponse> userBaseInfoObservable = api.getUserBaseInfo( new UserBaseInfoRequest()) .subscribeOn(Schedulers.io()); Observable<UserExtraInfoResponse> userExtraInfoObservable = api.getUserExtraInfo( new UserExtraInfoRequest()) .subscribeOn(Schedulers.io()); Observable.zip(userBaseInfoObservable, userExtraInfoObservable, new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() { @Override public UserInfo apply(UserBaseInfoResponse userBaseInfoResponse, UserExtraInfoResponse userExtraInfoResponse) throws Exception { return new UserInfo(userBaseInfoResponse, userExtraInfoResponse); } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<UserInfo>() { @Override public void accept(UserInfo userInfo) throws Exception { // do something } }); ``` # 背压 ## 上下游流速不均衡问题 [原文链接](http://www.jianshu.com/p/0f2d6c2387c9) ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); ``` * 上游水管发送事件过快时,事件会被先保存到该水管的水缸中 * 当上下游处在不同线程,即为异步操作时,上下游流速不均衡时会造成内存溢出(水缸溢出) * 上下游为同一线程,即为同步操作时,不会出现内存溢出情况。因为上游需等待下游处理完毕才会发送下一个事件 ## 流速问题解决方案 [原文链接](http://www.jianshu.com/p/e4c6d7989356) * 从数量上治理,减少发送进入水缸事件的数量(会造成部分事件丢失) * 从时间上治理,减缓事件发送进入水缸的速度(事件不会丢失) 方案一 demo: ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); } } }).subscribeOn(Schedulers.io()) .sample(2, TimeUnit.SECONDS) // sample 每隔 2 秒取样 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); } ``` 方案二 demo: ```java Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { for (int i = 0; ; i++) { e.onNext(i); Thread.sleep(2000); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.e(TAG, "accept: " + integer); } }); } ``` # Flowable * Flowable 采取响应式拉取 * Subscription 的 request 方法相当于下游告诉上游,下游它所能处理事件的能力 * Flowable 默认有一个大小为 128 的水缸,上下游工作在不同线程时,上游会先把事件贮存在水缸中 ```java Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { e.onNext(0); e.onNext(1); e.onNext(2); e.onComplete(); } // 此处第二个参数 BackpressureStrategy.ERROR 代表处理上下游流速不均问题的处理策略,此处为直接抛出错误 }, BackpressureStrategy.ERROR) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } }); ``` ## Flowable 处理上下游流速不均问题的策略 类型|策略描述 ---|--- BackpressureStrategy.ERROR|直接抛出 MissingBackpressureException 异常 BackpressureStrategy.BUFFER|上游的水缸更换为无限大的 BackpressureStrategy.DROP|水缸存不下的事件直接丢弃 BackpressureStrategy.LATEST|水缸内只保留最新的事件