android RxJava 线程切换原理探索

使用Rxjava进行切换线程很简单,一行代码让耗时操作去子线程执行,再来一行代码回主线程进行结果监听。

那么问题就来了,Rxjava 是怎么实现的线程切换呢?

脑海里的答案是什么?

线程池 or Handler 还是什么?

emmmmm,这个答案 只对了一半

我的答案是:

耗时操作、主线程操作。确实是线程池和handler。但是具体的切换操作是 Scheduler。它是个抽象类,把耗时操作、结果监听 ,包装成 Runnable。丢到Scheduler的实现类去执行。

为什么说Rxjava的线程切换是由Scheduler完成的呢?

来,从源码中找答案的支撑。

阅读建议:rxjava代码比较绕,建议静下心来,打着断点一步步看。

首先来看一段代码(图1)

图1

通过调用subscribeOn() 和observeOn 方法,完成了线程的切换。

众所周知,RxJava是出了名的绕。为了方便分析,将(图1)进行代码拆分(图2)。


图2

testRxjavaThreadSwitch2 中的代码块调度逻辑与testRxjavaThreadSwitch1 完成相等。

分析前我们要先定下分析的方法(图3所示)。


图3

代码的调用链是由一到四步组合而成。链的关键在第四步。没有进行subscribe的话,这段Rxjava 代码是不会执行。代码真正的执行又是从第四步,一步步的封装打包。传递到第一步去。

可以理解为是U型结构。

代码链的分析:

分析第一步。Observable.create方法。点到源码中去看

图4


图5

总结: 真正的实现类是ObservableCreate。持有的对象source是我们穿进去的 接口类ObservableOnSubscribe

第二步:create.subscribeOn的源码

图6

总结:因为第一步的实现类是ObservableCreate,所以create.subscribeOn 实际是


ObservableCreate.subscribeOn。 返回的函数 new ObservableSubscribeOn<>(this, scheduler) 中的this 是 ObservableCreate对象,scheduler 是我们传入的

Schedulers.computation()。所以第二步的实现类是ObservableSubscribeOn

第三步:
observableonSubscribeOn.observeOn的源码:

图7

总结:第二步的实现类是ObservableSubscribeOn。所以
observableonSubscribeOn.observeOn的调用实际是
ObservableSubscribeOn.observeOn。返回函数new ObservableObserveOn<>(this, scheduler, delayError, bufferSize)。中的this是ObservableSubscribeOn,scheduler是我们传入的
AndroidSchedulers.mainThread()。

因此,第三步的实现类是ObservableObserveOn


第四步:
observableObserveOn.subscribe的源码:

图8

总结:因为我们只传入了一个Consumer,所以Rxjava为我们补齐了onError和onComplete

第三步的实现类是ObservableObserveOn。所以第四步的
observableObserveOn.subscribe的调用实际是
ObservableObserveOn.subscribe。

至此,调用链串联完成。让我们回顾总结下代码真实调用。

第一步:Observable.create 方法构造个ObservableCreate作为返回对象

第二步:
ObservableCreate.subscribeOn 构造ObservableSubscribeOn作为返回对象

第三步:
ObservableSubscribeOn.observeOn 构造ObservableObserveOn作为返回对象

第四步:
ObservableObserveOn.subscribe。调用subscribe,rxjava代码链 成!!开始执行。

执行链的分析:

我们先看一段代码:

图9

抽象类 Observable。是 第一二三步实现类ObservableCreate、ObservableSubscribeOn、ObservableObserveOn的父类,当在实现类中调用(图9)中的subscribe方法。实际是执行到了 Observable类中的subscribe。由subscribe方法来进行各种验证,然后调用subscribeActual来进行统一分发到实现类中,去做具体的实现行为。

让我们回顾第四步:

一次包装:


图10

(图10)所示,将我们传入的Consumer包装成LambdaObserver。然后传递ObservableObserveOn中的subscribeActual执行。代码运行起来打个断点验证下

二次包装:

图11

在(图11)中我们可以看到第四步的Consumer包装类LambdaObserver传递了过来。

再把LambdaObserver包装成ObserveOnObserver(二次包装)。ObserveOnObserver持有 LambdaObserver和我们传入的AndroidSchedulers。交给source(第二步的实现类)ObservableSubscribeOn 执行。

三次包装:

图12

(图12)所示,第二次包装的ObserveOnObserver包装成SubscribeOnObserver。

这个时候要注意了!!!!!!!!!!!!!!!!!!!!!

图13

scheduler.scheduleDirect(new SubscribeTask(parent))

这行代码标识,我们层层包装的observer (SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。要包装成SubscribeTask(继承了Runnable接口) 这个Runnable,交给scheduler调度。

图14

看(图14)的红圈,代码已经到子线程执行了。

加快我们的分析。run代码块中的source 是ObservableCreate,而parent对象则是我们层层包裹的observer。按照老方法。下一步会执行到
ObservableCreate.subscribeActual。

图15

四次包装:

看这个传入的observer。是不是我们上层包装的observer。到了这层 会在次包装。包装后的效果(CreateEmitter->SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。还记得我们代码链分析的第一步吗?(图15)中的source,是我们传入的ObservableOnSubscribe接口类

在回到我们的测试代码

图16

(图16)中箭头指向的it,也就是我们所常见的发射器emitter。就是我们经过四次包装,最终生成的CreateEmitter。

至此,切换到子线程的逻辑,跑通了!!!!!!

说了这么多。才跑通了切换到子线程。是不是已经进入混沌状态了。

我们在总结下关键点:

Rxjava,会按照我们代码书写顺序生成个调用链,最后的subscribe方法标志着调用链形成。然后从下往上,把Consumer经过层层包装。传递到顶层作为emitter发射器的实现类。其中会在
ObservableSubscribeOn.subscribeActual方法中,生成个SubscribeTask的Runnable。将代码的执行切换到子线程。

我们继续,攻坚最后一步。子线程在切换到主线程。

在观察(图16),发射器调用了it.onNext("111111")。因为发射器的实现类是CreateEmitter。

图17

还记得通过四次包装的observer吗?接下来我们拆快递了(CreateEmitter->SubscribeOnObserver->ObserveOnObserver->LambdaObserver->Consumer)。快递长这样。

剩下的就不一步步分析了,打着断点一步步跟就行。接下来只分析关键点。拆快递拆到ObserveOnObserver层,看(图18)

图18

经过onNext的调用后,执行schedule()方法,交给worker进行调用,worker是我们传入的
AndroidSchedulers.mainThread()。实现类是HandlerScheduler。

图19

(图19)中的红框,老哥们看了都会懂吧!此时还在子线程执行呢。马上要把我们的数据包装成ScheduledRunnable作为个message,传递到handler中执行了。

当包装的ScheduledRunnable的run执行的时候代表什么???

代表已经回到主线程了。后续的代码执行会继续进行对observer拆包。最终把数据回调到我们subscribe传入的Consumer中。

至此我们这篇文章的探索的Rxjava线程切换原理。在源码层需要找寻的答案,已经找到了。

最后的最后在总结下。

Rxjava,会按照我们代码书写顺序生成个调用链,最后的subscribe方法标志着调用链形成。然后从下往上,把Consumer经过层层包装。传递到顶层作为emitter发射器的实现类。其中会在
ObservableSubscribeOn.subscribeActual方法中,生成个SubscribeTask的Runnable。将代码的执行切换到子线程。

发射器emitter调用onNext,开始对Consumer的包装进行拆包。当拆包拆到ObserveOnObserver层时。会有个worker调度,也就是开始文章开篇说的Scheduler。生成个Runnable的message交给handler分发。将代码的执行切换到主线程。

在总结的精炼点:

Rxjava,通过把要切换线程的任务包装成Runnable交给Scheduler调度。具体是线程池或handler,交由Scheduler实现类实现。


写在最后。Rxjava源码很绕很复杂。当我们耐心跑通一个流程后,其他的各种操作符的理解都会变得容易起来。耐下心来,细读源码,与大神在不同时空的思想碰撞,获益良多。

相关文章

Java微服务架构选型:优雅拆分与高效整合

Java微服务架构选型:优雅拆分与高效整合在当今的软件开发领域,微服务架构已经成为一种主流趋势。它将单一庞大的应用程序划分为多个小型、自治的服务,每个服务负责特定的功能模块。对于使用Java语言的开发...

Java 9 到 Java 16 的版本演进:一次模块化革命和语言的持续进化

Java 9 到 Java 16 的版本演进:一次模块化革命和语言的持续进化在这个飞速发展的时代,Java 的发布节奏也变得越来越快,从传统的“龟速”升级到如今的“火车模式”。从 2017 年 9 月...

Java 拆分PDF页面

在操作PDF文档时,拆分PDF页面,意味着将一页的文本内容展示在多个较小些的页面上。Free Spire.PDF for Java控件支持从水平或垂直方向来将PDF页面拆分为多个页面。本文将演示相关代...

项目案例:Java多线程批量拆分List导入数据库

一、前言二、直接把list怼进Mysql三、分组把list导入Mysql中四、多线程分批导入Mysql五、小结一、前言前两天做了一个导入的功能,导入开始的时候非常慢,导入2w条数据要1分多钟,后来一点...

value中存储过多的元素-Redis大key多key拆分方案

背景在我的项目中,会存在一个DG下拥有10w+的学生,每个学生在进入直播之前,都需要通过校验,查询是否是这个直播所关联DG下的学生;为了提高并发,我们把大纲和学生的关系存入Redis中,使用set存储...

如何将长字符串定义拆分成多行?

在编程的过程中,我们常常会遇到需要处理长字符串的情况。当字符串特别长时,将其全部写在一行会让代码变得难以阅读和维护。那么,怎样才能把长字符串的定义拆分成多行呢?这是不少开发者都会遇到并想要解决的问题。...