[TOC]
本系列文章为 Season_zlc 大神的[《给初学者的RxJava2.0教程》](http://www.jianshu.com/p/464fa025229e)系列学习笔记。
# 配置
添加 Gradle 配置:
```groovy
implementation 'io.reactivex.rxjava2:rxjava:x.y.z'
implementation 'io.reactivex.rxjava2:rxandroid:x.y.z'
```
# 基础
[原文链接](http://www.jianshu.com/p/464fa025229e)
![](https://img.kancloud.cn/f7/b3/f7b38edb806c5854a3dbde9b974c5e41_580x325.png)
## 上下游
* 上游和下游分别对应着 Observable 和 Observer
* 水管中流动的是事件(对象),这个重要!
## 发送事件
* 只有调用了 subscribe 方法之后,上游才会发事件
* 上游发送了 onComplete 或 onError 事件后可以继续发送其他事件,而下游接收了 onComplete 或 onError 事件之后不再接收其他事件
* 上游可以不发送 onComplete 或 onError 事件,发送的话只能发送一次并且互斥,即不能同时发送
## 水管机关
* 调用 subscribe 方法就把上游、下游两个水管连接起来了,同时会发送一个 onSubscribe 事件,并可以拿到一个 disposable 开关
* disposable 可以理解为两根管子之间的一个机关,调用其 dispose 方法时,可以将两根管道切断,导致下游收不到事件
* RxJava 内置了一个 disposable 容器 CompositeDisposable,可以存储 disposable,用来在 Activity 退出时调用所有机关的 dispose 方法,使下游不再接收事件
```java
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribe(observer);
```
# 事件源类型
事件源类型|描述
---|---
Flowable|可以发送 N 个事件,支持背压
Observable|可以发送 N 个事件,不支持背压
Single|可以发送 1 个事件,或 1 个 onError 事件
Completable|不可以发送普通事件,可以发送 1 个 onComplete 事件或 1 个 onError 事件
Maybe|可以发射 1 个事件、1 个 onError 事件,或者什么都不发射
# 线程调度
[原文链接](http://www.jianshu.com/p/8818b98c44e2)
## 应用场景
Rxjava 最常见的应用场景是:
在子线程进行数据计算、网络请求操作等,然后回到主线程展示结果(成功或错误)
## 内置线程
* Schedulers.io():代表 IO 操作线程,用于网络、读写文件等 IO 密集型操作
* Schedulers.newThread():代表一个常规的新线程
* Schedulers.computation():代表 CPU 计算密集型操作,如需要大量计算操作时使用
* AndroidSchedulers.mainThread():Android 主线程
## 线程调度
* subscribeOn 指定上游发送事件的线程
* observeOn 指定下游接收事件的线程
* 多次指定上游线程只有第一次有效,其他会被忽略
* 多次指定下游线程没问题,每调用一次 observeOn,下游线程就会切换一次
```java
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此处为 IO 线程
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此处为主线程
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// 此处为新的子线程
}
});
```
# 变换操作符
[原文链接](http://www.jianshu.com/p/128e662906af)
## Map 操作符
* 对上游发送的每一个事件应用一个函数,使得每一个事件都按照指定的函数变化
```java
Observable.fromArray(1, 2, 3)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "Result is " + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, s);
}
});
```
## FlatMap 操作符
![](https://img.kancloud.cn/7f/19/7f19a8efb4005e241437ed6cd54bdc89_647x763.png)
### 定义
* FlatMap 的作用是将一个事件,转换为,发送新的事件的,Observable
* 被转换为 Observable 后开始发送新的事件
* 发送出来的,这些新的事件,会被合并到一个水管中,发给下游
* FlatMap 相当于直接转换了上游发射源所发射的事件类型
* Map 相当于转换了水管中流动的事件的类型
### 其他
* FlatMap 不保证新事件的顺序,ConcatMap 可以保证
### 代码示例
```java
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
```
```java
api.register(new RegisterRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
// 先根据注册结果做一些事情
}
})
.observeOn(Schedulers.io())
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
// 登录成功
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
// 登录失败
}
});
```
## Zip 操作符
[原文链接](http://www.jianshu.com/p/bb58571cdb64)
### 定义
* Zip 可将多个 Observable 发送的事件结合到一起,然后发送这些组合到一起的事件
* 严格按照顺序执行
* 只发射与发射数据项数最少的那个 Observable 一样多的数据
![](https://img.kancloud.cn/ea/49/ea49166f780a9b84b9f1c52753423abb_648x879.png)
### 代码示例
```java
public interface Api {
@GET
Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);
@GET
Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);
}
```
```java
Observable<UserBaseInfoResponse> userBaseInfoObservable = api.getUserBaseInfo(
new UserBaseInfoRequest())
.subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> userExtraInfoObservable = api.getUserExtraInfo(
new UserExtraInfoRequest())
.subscribeOn(Schedulers.io());
Observable.zip(userBaseInfoObservable, userExtraInfoObservable, new BiFunction<UserBaseInfoResponse,
UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse userBaseInfoResponse, UserExtraInfoResponse
userExtraInfoResponse) throws Exception {
return new UserInfo(userBaseInfoResponse, userExtraInfoResponse);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
// do something
}
});
```
# 背压
## 上下游流速不均衡问题
[原文链接](http://www.jianshu.com/p/0f2d6c2387c9)
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
```
* 上游水管发送事件过快时,事件会被先保存到该水管的水缸中
* 当上下游处在不同线程,即为异步操作时,上下游流速不均衡时会造成内存溢出(水缸溢出)
* 上下游为同一线程,即为同步操作时,不会出现内存溢出情况。因为上游需等待下游处理完毕才会发送下一个事件
## 流速问题解决方案
[原文链接](http://www.jianshu.com/p/e4c6d7989356)
* 从数量上治理,减少发送进入水缸事件的数量(会造成部分事件丢失)
* 从时间上治理,减缓事件发送进入水缸的速度(事件不会丢失)
方案一 demo:
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
}
}
}).subscribeOn(Schedulers.io())
.sample(2, TimeUnit.SECONDS) // sample 每隔 2 秒取样
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
}
```
方案二 demo:
```java
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++) {
e.onNext(i);
Thread.sleep(2000);
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
Log.e(TAG, "accept: " + integer);
}
});
}
```
# Flowable
* Flowable 采取响应式拉取
* Subscription 的 request 方法相当于下游告诉上游,下游它所能处理事件的能力
* Flowable 默认有一个大小为 128 的水缸,上下游工作在不同线程时,上游会先把事件贮存在水缸中
```java
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onComplete();
}
// 此处第二个参数 BackpressureStrategy.ERROR 代表处理上下游流速不均问题的处理策略,此处为直接抛出错误
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
```
## Flowable 处理上下游流速不均问题的策略
类型|策略描述
---|---
BackpressureStrategy.ERROR|直接抛出 MissingBackpressureException 异常
BackpressureStrategy.BUFFER|上游的水缸更换为无限大的
BackpressureStrategy.DROP|水缸存不下的事件直接丢弃
BackpressureStrategy.LATEST|水缸内只保留最新的事件
- 导读
- Java知识
- Java基本程序设计结构
- 【基础知识】Java基础
- 【源码分析】Okio
- 【源码分析】深入理解i++和++i
- 【专题分析】JVM与GC
- 【面试清单】Java基本程序设计结构
- 对象与类
- 【基础知识】对象与类
- 【专题分析】Java类加载过程
- 【面试清单】对象与类
- 泛型
- 【基础知识】泛型
- 【面试清单】泛型
- 集合
- 【基础知识】集合
- 【源码分析】SparseArray
- 【面试清单】集合
- 多线程
- 【基础知识】多线程
- 【源码分析】ThreadPoolExecutor源码分析
- 【专题分析】volatile关键字
- 【面试清单】多线程
- Java新特性
- 【专题分析】Lambda表达式
- 【专题分析】注解
- 【面试清单】Java新特性
- Effective Java笔记
- Android知识
- Activity
- 【基础知识】Activity
- 【专题分析】运行时权限
- 【专题分析】使用Intent打开三方应用
- 【源码分析】Activity的工作过程
- 【面试清单】Activity
- 架构组件
- 【专题分析】MVC、MVP与MVVM
- 【专题分析】数据绑定
- 【面试清单】架构组件
- 界面
- 【专题分析】自定义View
- 【专题分析】ImageView的ScaleType属性
- 【专题分析】ConstraintLayout 使用
- 【专题分析】搞懂点九图
- 【专题分析】Adapter
- 【源码分析】LayoutInflater
- 【源码分析】ViewStub
- 【源码分析】View三大流程
- 【源码分析】触摸事件分发机制
- 【源码分析】按键事件分发机制
- 【源码分析】Android窗口机制
- 【面试清单】界面
- 动画和过渡
- 【基础知识】动画和过渡
- 【面试清单】动画和过渡
- 图片和图形
- 【专题分析】图片加载
- 【面试清单】图片和图形
- 后台任务
- 应用数据和文件
- 基于网络的内容
- 多线程与多进程
- 【基础知识】多线程与多进程
- 【源码分析】Handler
- 【源码分析】AsyncTask
- 【专题分析】Service
- 【源码分析】Parcelable
- 【专题分析】Binder
- 【源码分析】Messenger
- 【面试清单】多线程与多进程
- 应用优化
- 【专题分析】布局优化
- 【专题分析】绘制优化
- 【专题分析】内存优化
- 【专题分析】启动优化
- 【专题分析】电池优化
- 【专题分析】包大小优化
- 【面试清单】应用优化
- Android新特性
- 【专题分析】状态栏、ActionBar和导航栏
- 【专题分析】应用图标、通知栏适配
- 【专题分析】Android新版本重要变更
- 【专题分析】唯一标识符的最佳做法
- 开源库源码分析
- 【源码分析】BaseRecyclerViewAdapterHelper
- 【源码分析】ButterKnife
- 【源码分析】Dagger2
- 【源码分析】EventBus3(一)
- 【源码分析】EventBus3(二)
- 【源码分析】Glide
- 【源码分析】OkHttp
- 【源码分析】Retrofit
- 其他知识
- Flutter
- 原生开发与跨平台开发
- 整体归纳
- 状态及状态管理
- 零碎知识点
- 添加Flutter到现有应用
- Git知识
- Git命令
- .gitignore文件
- 设计模式
- 创建型模式
- 结构型模式
- 行为型模式
- RxJava
- 基础
- Linux知识
- 环境变量
- Linux命令
- ADB命令
- 算法
- 常见数据结构及实现
- 数组
- 排序算法
- 链表
- 二叉树
- 栈和队列
- 算法时间复杂度
- 常见算法思想
- 其他技术
- 正则表达式
- 编码格式
- HTTP与HTTPS
- 【面试清单】其他知识
- 开发归纳
- Android零碎问题
- 其他零碎问题
- 开发思路