企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] 基于 io.reactivex:rxjava:1.3.8 源码分析分为三个方向简单订阅过程、变换过程和线程切换。 ## 简单订阅 ### 使用 ~~~ Observable.create(new Observable.<Integer>() { // 1. 创建被观察者(Observable) & 定义需发送的事件 @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }).subscribe(new Subscriber<Integer>() { // 2. 创建观察者(Observer) & 定义响应事件的行为 // 3. 通过订阅(subscribe)连接观察者和被观察者 @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ ### Observable.create 实际上就是创建一个Observable对象,并且把OnSubscribe对象赋值到Observable对象的onSubscribe 字段里。 ~~~ public static <T> Observable<T> create(OnSubscribe<T> f){ //hook.onCreate(f) 实际上返回的还是f //Observable构造函数,实际上只是赋值到OnSubscribe return new Observable<T>(hook.onCreate(f)); } //hook实际上就是RxJava-ObservableExecutionHook,它的onCreate如下 public <T> OnSubscribe<T> OnCreate(OnSubscribe<T> f){ return f;f } //接着回来查看Observable的构造方法,如下 public Observable(OnSubscribe<T> f){ this.onSubscribe = f; } ~~~ ### Observable().subscribe 找到Observable对象的onSubscribe对象 把自己作为参数调用onSubscribe的对象的call方法。 ~~~ public final Subscription subscribe(Subscriber<? super T> subscriber) { return subscribe(subscriber, this); } static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn((Subscription)subscriber); ... } ~~~ #### RxJavaHooks.onObservableStart 可以理解为直接返回onSubscribe ~~~ public static <T> OnSubscribe<T> onObservableStart(Observable<T> instance, OnSubscribe<T> onSubscribe) { Func2<Observable, OnSubscribe, OnSubscribe> f = onObservableStart; return f != null ? (OnSubscribe)f.call(instance, onSubscribe) : onSubscribe; } ~~~ ## 变换过程 ### 使用 ~~~ Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }).map(new Funcl<Integer,String>(){ @Override public Srting call(Integer integer){ return "a" + integer; r } }).subscribe(new Subscriber<String>() { @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ Observable.create还是创建一个Observable对象,并且把OnSubscribe对象赋值到Observable对象的onSubscribe 字段里。我们把这一步的叫做 obj1 ### Observable().map ~~~ public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return unsafeCreate(new OnSubscribeMap(this, func)); } public static <T> Observable<T> unsafeCreate(Observable.OnSubscribe<T> f) { return new Observable(RxJavaHooks.onCreate(f)); } ~~~ #### OnSubscribeMap ~~~ public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { final Observable<T> source; final Func1<? super T, ? extends R> transformer; public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } public void call(Subscriber<? super R> o) { OnSubscribeMap.MapSubscriber<T, R> parent = new OnSubscribeMap.MapSubscriber(o, this.transformer); o.add(parent); this.source.unsafeSubscribe(parent); } static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } public void onNext(T t) { Object result; try { result = this.mapper.call(t); } catch (Throwable var4) { Exceptions.throwIfFatal(var4); this.unsubscribe(); this.onError(OnErrorThrowable.addValueAsLastCause(var4, t)); return; } this.actual.onNext(result); } public void onError(Throwable e) { if (this.done) { RxJavaHooks.onError(e); } else { this.done = true; this.actual.onError(e); } } public void onCompleted() { if (!this.done) { this.actual.onCompleted(); } } public void setProducer(Producer p) { this.actual.setProducer(p); } ~~~ 这里做就是 1. 生成第二个Observable obj2,然后他的onSubscribe 就是OnSubscribeMap 2. 执行最后Observable().subscribe时,就会调用obj2的onSubscribe的call 3. obj2的onSubscribe的call里,首先把自己new了一个MapSubscriber,主要功能就是完成参数转换,并且调用下一个的对应方法 4. 然后他又会调用unsafeSubscribe(obj1),就会调用obj1的onSubscribe的call ![](https://img.kancloud.cn/fd/98/fd98f1107f67ab638dc97e3ed768f914_582x323.png) ## 线程切换 ~~~ Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<Integer> subScriber){ subScriber.onNext(1); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onNext(Integer value) {} @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); ~~~ 根据【变换过程】的思路,我们很容易猜到,实际上subscribeOn 和observeOn肯定也是生成了一个Observable,然后自定义一个Observable.OnSubscribe,OnSubscribe里面实际上做的也是先线程切换,然后在执行下一个的对应方法 ### observeOn ~~~ public final class OperatorObserveOn<T> implements Operator<T, T> { private final Scheduler scheduler; private final boolean delayError; private final int bufferSize; public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this(scheduler, delayError, RxRingBuffer.SIZE); } public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize > 0 ? bufferSize : RxRingBuffer.SIZE; } //核心 public Subscriber<? super T> call(Subscriber<? super T> child) { OperatorObserveOn.ObserveOnSubscriber<T> parent = new OperatorObserveOn.ObserveOnSubscriber(this.scheduler, child, this.delayError, this.bufferSize); parent.init(); return parent; } static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { final Subscriber<? super T> child; final Worker recursiveScheduler ... public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; ... } void init() { Subscriber<? super T> localChild = this.child; localChild.setProducer(new Producer() { public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(ObserveOnSubscriber.this.requested, n); ObserveOnSubscriber.this.schedule(); } } }); localChild.add(this.recursiveScheduler); localChild.add(this); } public void onNext(T t) { if (!this.isUnsubscribed() && !this.finished) { if (!this.queue.offer(NotificationLite.next(t))) { this.onError(new MissingBackpressureException()); } else { this.schedule(); } } } public void onCompleted() { if (!this.isUnsubscribed() && !this.finished) { this.finished = true; this.schedule(); } } public void onError(Throwable e) { if (!this.isUnsubscribed() && !this.finished) { this.error = e; this.finished = true; this.schedule(); } else { RxJavaHooks.onError(e); } } protected void schedule() { if (this.counter.getAndIncrement() == 0L) { this.recursiveScheduler.schedule(this); } } //schedule(this) 已经在异步线程上了实际上执行的方法 public void call() { //代码太多贴出来不便,实际上就是根据error 和 finished 或者是t值调用child对应的onNext onError onCompleted方法 ~~~ ### subscribeOn ~~~ public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler; final Observable<T> source; final boolean requestOn; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) { this.scheduler = scheduler; this.source = source; this.requestOn = requestOn; } public void call(Subscriber<? super T> subscriber) { Worker inner = this.scheduler.createWorker(); OperatorSubscribeOn.SubscribeOnSubscriber<T> parent = new OperatorSubscribeOn.SubscribeOnSubscriber(subscriber, this.requestOn, inner, this.source); subscriber.add(parent); subscriber.add(inner); //核心,在异步线程调用父Observable的方法 inner.schedule(parent); } static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 { ... //这里的代码实际上就是调用子的 onNext onError onCompleted方法 } ~~~ ## 总结 1. 中间一步都会创建一个Observable 观察者 2. Observable都有一个 OnSubscribe onSubscribe, 这个实际上就是被出入的参数 3. 最后一个调用subScribe时,最后会调用上一个Observable的call,并且把自己当作参数出入 4. 上一个Observable会调用通过subScribe调用上一个Observable的call,一路上去,直到第一个Observable。 5. 第一个处理往后,会调用下一个的Observable的onSubscribe.onNext,下一个的调用下下个的。 6. 直到最后一个,调用结束 ## 参考资料 《Android进阶之光》