android RxJava 线程切换原理探索
使用Rxjava进行切换线程很简单,一行代码让耗时操作去子线程执行,再来一行代码回主线程进行结果监听。
那么问题就来了,Rxjava 是怎么实现的线程切换呢?
脑海里的答案是什么?
线程池 or Handler 还是什么?
emmmmm,这个答案 只对了一半
我的答案是:
耗时操作、主线程操作。确实是线程池和handler。但是具体的切换操作是 Scheduler。它是个抽象类,把耗时操作、结果监听 ,包装成 Runnable。丢到Scheduler的实现类去执行。
为什么说Rxjava的线程切换是由Scheduler完成的呢?
来,从源码中找答案的支撑。
阅读建议:rxjava代码比较绕,建议静下心来,打着断点一步步看。
首先来看一段代码(图1)
图1
通过调用subscribeOn() 和observeOn 方法,完成了线程的切换。
众所周知,RxJava是出了名的绕。为了方便分析,将(图1)进行代码拆分(图2)。
图2
testRxjavaThreadSwitch2 中的代码块调度逻辑与testRxjavaThreadSwitch1 完成相等。
分析前我们要先定下分析的方法(图3所示)。
图3
代码的调用链是由一到四步组合而成。链的关键在第四步。没有进行subscribe的话,这段Rxjava 代码是不会执行。代码真正的执行又是从第四步,一步步的封装打包。传递到第一步去。
可以理解为是U型结构。
代码链的分析:
分析第一步。Observable.create方法。点到源码中去看
图4
图5
总结: 真正的实现类是ObservableCreate。持有的对象source是我们穿进去的 接口类ObservableOnSubscribe。
第二步:create.subscribeOn的源码
图6
总结:因为第一步的实现类是ObservableCreate,所以create.subscribeOn 实际是
ObservableCreate.subscribeOn。 返回的函数 new ObservableSubscribeOn<>(this, scheduler) 中的this 是 ObservableCreate对象,scheduler 是我们传入的
Schedulers.computation()。所以第二步的实现类是ObservableSubscribeOn。
第三步:
observableonSubscribeOn.observeOn的源码:
图7
总结:第二步的实现类是ObservableSubscribeOn。所以
observableonSubscribeOn.observeOn的调用实际是
ObservableSubscribeOn.observeOn。返回函数new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)。中的this是ObservableSubscribeOn,scheduler是我们传入的
AndroidSchedulers.mainThread()。
因此,第三步的实现类是ObservableObserveOn
第四步:
observableObserveOn.subscribe的源码:
图8
总结:因为我们只传入了一个Consumer,所以Rxjava为我们补齐了onError和onComplete
第三步的实现类是ObservableObserveOn。所以第四步的
observableObserveOn.subscribe的调用实际是ObservableObserveOn.subscribe。
至此,调用链串联完成。让我们回顾总结下代码真实调用。
第一步:Observable.create 方法构造个ObservableCreate作为返回对象
第二步:
ObservableCreate.subscribeOn 构造ObservableSubscribeOn作为返回对象
第三步:
ObservableSubscribeOn.observeOn 构造ObservableObserveOn作为返回对象
第四步:
ObservableObserveOn.subscribe。调用subscribe,rxjava代码链 成!!开始执行。
执行链的分析:
我们先看一段代码:
图9
抽象类 Observable。是 第一二三步实现类ObservableCreate、ObservableSubscribeOn、ObservableObserveOn的父类,当在实现类中调用(图9)中的subscribe方法。实际是执行到了 Observable类中的subscribe。由subscribe方法来进行各种验证,然后调用subscribeActual来进行统一分发到实现类中,去做具体的实现行为。
让我们回顾第四步:
一次包装:
图10
(图10)所示,将我们传入的Consumer包装成LambdaObserver。然后传递ObservableObserveOn中的subscribeActual执行。代码运行起来打个断点验证下
二次包装:
图11
在(图11)中我们可以看到第四步的Consumer包装类LambdaObserver传递了过来。
再把LambdaObserver包装成ObserveOnObserver(二次包装)。ObserveOnObserver持有 LambdaObserver和我们传入的AndroidSchedulers。交给source(第二步的实现类)ObservableSubscribeOn 执行。
三次包装:
图12
(图12)所示,第二次包装的ObserveOnObserver包装成SubscribeOnObserver。
这个时候要注意了!!!!!!!!!!!!!!!!!!!!!
图13
scheduler.scheduleDirect(new SubscribeTask(parent))
这行代码标识,我们层层包装的observer (SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。要包装成SubscribeTask(继承了Runnable接口) 这个Runnable,交给scheduler调度。
图14
看(图14)的红圈,代码已经到子线程执行了。
加快我们的分析。run代码块中的source 是ObservableCreate,而parent对象则是我们层层包裹的observer。按照老方法。下一步会执行到
ObservableCreate.subscribeActual。
图15
四次包装:
看这个传入的observer。是不是我们上层包装的observer。到了这层 会在次包装。包装后的效果(CreateEmitter->SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。还记得我们代码链分析的第一步吗?(图15)中的source,是我们传入的ObservableOnSubscribe接口类。
在回到我们的测试代码
图16
(图16)中箭头指向的it,也就是我们所常见的发射器emitter。就是我们经过四次包装,最终生成的CreateEmitter。
至此,切换到子线程的逻辑,跑通了!!!!!!
说了这么多。才跑通了切换到子线程。是不是已经进入混沌状态了。
我们在总结下关键点:
Rxjava,会按照我们代码书写顺序生成个调用链,最后的subscribe方法标志着调用链形成。然后从下往上,把Consumer经过层层包装。传递到顶层作为emitter发射器的实现类。其中会在
ObservableSubscribeOn.subscribeActual方法中,生成个SubscribeTask的Runnable。将代码的执行切换到子线程。
我们继续,攻坚最后一步。子线程在切换到主线程。
在观察(图16),发射器调用了it.onNext("111111")。因为发射器的实现类是CreateEmitter。
图17
还记得通过四次包装的observer吗?接下来我们拆快递了(CreateEmitter->SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。快递长这样。
剩下的就不一步步分析了,打着断点一步步跟就行。接下来只分析关键点。拆快递拆到ObserveOnObserver层,看(图18)
图18
经过onNext的调用后,执行schedule()方法,交给worker进行调用,worker是我们传入的
AndroidSchedulers.mainThread()。实现类是HandlerScheduler。
图19
(图19)中的红框,老哥们看了都会懂吧!此时还在子线程执行呢。马上要把我们的数据包装成ScheduledRunnable作为个message,传递到handler中执行了。
当包装的ScheduledRunnable的run执行的时候代表什么???
代表已经回到主线程了。后续的代码执行会继续进行对observer拆包。最终把数据回调到我们subscribe传入的Consumer中。
至此我们这篇文章的探索的Rxjava线程切换原理。在源码层需要找寻的答案,已经找到了。
最后的最后在总结下。
Rxjava,会按照我们代码书写顺序生成个调用链,最后的subscribe方法标志着调用链形成。然后从下往上,把Consumer经过层层包装。传递到顶层作为emitter发射器的实现类。其中会在
ObservableSubscribeOn.subscribeActual方法中,生成个SubscribeTask的Runnable。将代码的执行切换到子线程。
发射器emitter调用onNext,开始对Consumer的包装进行拆包。当拆包拆到ObserveOnObserver层时。会有个worker调度,也就是开始文章开篇说的Scheduler。生成个Runnable的message交给handler分发。将代码的执行切换到主线程。
在总结的精炼点:
Rxjava,通过把要切换线程的任务包装成Runnable交给Scheduler调度。具体是线程池或handler,交由Scheduler实现类实现。
写在最后。Rxjava源码很绕很复杂。当我们耐心跑通一个流程后,其他的各种操作符的理解都会变得容易起来。耐下心来,细读源码,与大神在不同时空的思想碰撞,获益良多。