[TOC]
基于 io.reactivex:rxjava:1.3.8
源码分析分为三个方向简单订阅过程、变换过程和线程切换。
## 简单订阅
### 使用
~~~
Observable.create(new Observable.<Integer>() {
// 1. 创建被观察者(Observable) & 定义需发送的事件
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
}).subscribe(new Subscriber<Integer>() {
// 2. 创建观察者(Observer) & 定义响应事件的行为
// 3. 通过订阅(subscribe)连接观察者和被观察者
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
### Observable.create
实际上就是创建一个Observable对象,并且把OnSubscribe对象赋值到Observable对象的onSubscribe 字段里。
~~~
public static <T> Observable<T> create(OnSubscribe<T> f){
//hook.onCreate(f) 实际上返回的还是f
//Observable构造函数,实际上只是赋值到OnSubscribe
return new Observable<T>(hook.onCreate(f));
}
//hook实际上就是RxJava-ObservableExecutionHook,它的onCreate如下
public <T> OnSubscribe<T> OnCreate(OnSubscribe<T> f){
return f;f
}
//接着回来查看Observable的构造方法,如下
public Observable(OnSubscribe<T> f){
this.onSubscribe = f;
}
~~~
### Observable().subscribe
找到Observable对象的onSubscribe对象
把自己作为参数调用onSubscribe的对象的call方法。
~~~
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn((Subscription)subscriber);
...
}
~~~
#### RxJavaHooks.onObservableStart
可以理解为直接返回onSubscribe
~~~
public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) {
Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart;
return f != null ? (OnSubscribe)f.call(instance, onSubscribe) : onSubscribe;
}
~~~
## 变换过程
### 使用
~~~
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
}).map(new Funcl<Integer,String>(){
@Override
public Srting call(Integer integer){
return "a" + integer;
r
}
}).subscribe(new Subscriber<String>() {
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
Observable.create还是创建一个Observable对象,并且把OnSubscribe对象赋值到Observable对象的onSubscribe 字段里。我们把这一步的叫做 obj1
### Observable().map
~~~
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return unsafeCreate(new OnSubscribeMap(this, func));
}
public static <T> Observable<T> unsafeCreate(Observable.OnSubscribe<T> f) {
return new Observable(RxJavaHooks.onCreate(f));
}
~~~
#### OnSubscribeMap
~~~
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
public void call(Subscriber<? super R> o) {
OnSubscribeMap.MapSubscriber<T, R> parent = new OnSubscribeMap.MapSubscriber(o, this.transformer);
o.add(parent);
this.source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
public void onNext(T t) {
Object result;
try {
result = this.mapper.call(t);
} catch (Throwable var4) {
Exceptions.throwIfFatal(var4);
this.unsubscribe();
this.onError(OnErrorThrowable.addValueAsLastCause(var4, t));
return;
}
this.actual.onNext(result);
}
public void onError(Throwable e) {
if (this.done) {
RxJavaHooks.onError(e);
} else {
this.done = true;
this.actual.onError(e);
}
}
public void onCompleted() {
if (!this.done) {
this.actual.onCompleted();
}
}
public void setProducer(Producer p) {
this.actual.setProducer(p);
}
~~~
这里做就是
1. 生成第二个Observable obj2,然后他的onSubscribe 就是OnSubscribeMap
2. 执行最后Observable().subscribe时,就会调用obj2的onSubscribe的call
3. obj2的onSubscribe的call里,首先把自己new了一个MapSubscriber,主要功能就是完成参数转换,并且调用下一个的对应方法
4. 然后他又会调用unsafeSubscribe(obj1),就会调用obj1的onSubscribe的call
![](https://img.kancloud.cn/fd/98/fd98f1107f67ab638dc97e3ed768f914_582x323.png)
## 线程切换
~~~
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<Integer> subScriber){
subScriber.onNext(1);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
~~~
根据【变换过程】的思路,我们很容易猜到,实际上subscribeOn 和observeOn肯定也是生成了一个Observable,然后自定义一个Observable.OnSubscribe,OnSubscribe里面实际上做的也是先线程切换,然后在执行下一个的对应方法
### observeOn
~~~
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize > 0 ? bufferSize : RxRingBuffer.SIZE;
}
//核心
public Subscriber<? super T> call(Subscriber<? super T> child) {
OperatorObserveOn.ObserveOnSubscriber<T> parent = new OperatorObserveOn.ObserveOnSubscriber(this.scheduler, child, this.delayError, this.bufferSize);
parent.init();
return parent;
}
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Worker recursiveScheduler
...
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
...
}
void init() {
Subscriber<? super T> localChild = this.child;
localChild.setProducer(new Producer() {
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(ObserveOnSubscriber.this.requested, n);
ObserveOnSubscriber.this.schedule();
}
}
});
localChild.add(this.recursiveScheduler);
localChild.add(this);
}
public void onNext(T t) {
if (!this.isUnsubscribed() && !this.finished) {
if (!this.queue.offer(NotificationLite.next(t))) {
this.onError(new MissingBackpressureException());
} else {
this.schedule();
}
}
}
public void onCompleted() {
if (!this.isUnsubscribed() && !this.finished) {
this.finished = true;
this.schedule();
}
}
public void onError(Throwable e) {
if (!this.isUnsubscribed() && !this.finished) {
this.error = e;
this.finished = true;
this.schedule();
} else {
RxJavaHooks.onError(e);
}
}
protected void schedule() {
if (this.counter.getAndIncrement() == 0L) {
this.recursiveScheduler.schedule(this);
}
}
//schedule(this) 已经在异步线程上了实际上执行的方法
public void call() {
//代码太多贴出来不便,实际上就是根据error 和 finished 或者是t值调用child对应的onNext onError onCompleted方法
~~~
### subscribeOn
~~~
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
public void call(Subscriber<? super T> subscriber) {
Worker inner = this.scheduler.createWorker();
OperatorSubscribeOn.SubscribeOnSubscriber<T> parent = new OperatorSubscribeOn.SubscribeOnSubscriber(subscriber, this.requestOn, inner, this.source);
subscriber.add(parent);
subscriber.add(inner);
//核心,在异步线程调用父Observable的方法
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
...
//这里的代码实际上就是调用子的 onNext onError onCompleted方法
}
~~~
## 总结
1. 中间一步都会创建一个Observable 观察者
2. Observable都有一个 OnSubscribe onSubscribe, 这个实际上就是被出入的参数
3. 最后一个调用subScribe时,最后会调用上一个Observable的call,并且把自己当作参数出入
4. 上一个Observable会调用通过subScribe调用上一个Observable的call,一路上去,直到第一个Observable。
5. 第一个处理往后,会调用下一个的Observable的onSubscribe.onNext,下一个的调用下下个的。
6. 直到最后一个,调用结束
## 参考资料
《Android进阶之光》
- Android
- 四大组件
- Activity
- Fragment
- Service
- 序列化
- Handler
- Hander介绍
- MessageQueue详细
- 启动流程
- 系统启动流程
- 应用启动流程
- Activity启动流程
- View
- view绘制
- view事件传递
- choreographer
- LayoutInflater
- UI渲染概念
- Binder
- Binder原理
- Binder最大数据
- Binder小结
- Android组件
- ListView原理
- RecyclerView原理
- SharePreferences
- AsyncTask
- Sqlite
- SQLCipher加密
- 迁移与修复
- Sqlite内核
- Sqlite优化v2
- sqlite索引
- sqlite之wal
- sqlite之锁机制
- 网络
- 基础
- TCP
- HTTP
- HTTP1.1
- HTTP2.0
- HTTPS
- HTTP3.0
- HTTP进化图
- HTTP小结
- 实践
- 网络优化
- Json
- ProtoBuffer
- 断点续传
- 性能
- 卡顿
- 卡顿监控
- ANR
- ANR监控
- 内存
- 内存问题与优化
- 图片内存优化
- 线下内存监控
- 线上内存监控
- 启动优化
- 死锁监控
- 崩溃监控
- 包体积优化
- UI渲染优化
- UI常规优化
- I/O监控
- 电量监控
- 第三方框架
- 网络框架
- Volley
- Okhttp
- 网络框架n问
- OkHttp原理N问
- 设计模式
- EventBus
- Rxjava
- 图片
- ImageWoker
- Gilde的优化
- APT
- 依赖注入
- APT
- ARouter
- ButterKnife
- MMKV
- Jetpack
- 协程
- MVI
- Startup
- DataBinder
- 黑科技
- hook
- 运行期Java-hook技术
- 编译期hook
- ASM
- Transform增量编译
- 运行期Native-hook技术
- 热修复
- 插件化
- AAB
- Shadow
- 虚拟机
- 其他
- UI自动化
- JavaParser
- Android Line
- 编译
- 疑难杂症
- Android11滑动异常
- 方案
- 工业化
- 模块化
- 隐私合规
- 动态化
- 项目管理
- 业务启动优化
- 业务架构设计
- 性能优化case
- 性能优化-排查思路
- 性能优化-现有方案
- 登录
- 搜索
- C++
- NDK入门
- 跨平台
- H5
- Flutter
- Flutter 性能优化
- 数据跨平台