RxJava学习入门1.基本概念和常用的创建操作符

createh51个月前 (02-01)技术教程12


一、简介

1. 什么是响应式编程

响应式编程是一种编程范式,旨在方便地表达静态或动态的数据流,并自动将变化的值通过数据流进行传播。RxJava是响应式编程的一个实现,它基于Java虚拟机,可以方便地在Java环境中使用。

响应式编程的基本原理包括:

  1. 响应式编程通过异步和数据流来构建事物关系,这种关系也可以理解为业务逻辑。
  2. 数据流和异步是实现响应式编程的关键,异步和数据流都是为了正确地构建事物的关系。
  3. 响应式编程可以自动将变化的值通过数据流进行传播,使得相关的计算模型可以自动更新。

2. ReactiveX简介

ReactiveX是一种响应式编程库,提供了处理异步和事件驱动操作的一组操作符和工具。它基于数据流的响应式编程,使用函数式编程风格和链式调用处理数据流。ReactiveX包含观察者模式、迭代器模式和函数式编程概念,简化了异步编程和事件处理。它可以在许多领域广泛应用,如前端开发、后端服务、移动应用程序和响应式UI。

ReactiveX的主要组成部分包括被观察者/观察者、操作符和调度器。被观察者可以产生异步或事件驱动的数据流,而观察者则订阅这些数据流并接收数据。操作符用于处理数据流,包括转换、过滤、合并和调度等操作。调度器用于管理后台任务和事件循环,以支持高效的异步处理。

ReactiveX提供了一种更简单、更灵活的编程范式,使代码可读性更高,减少了bug的产生。它与传统的回调方法和Promise相比,具有更好的组合性和可读性。ReactiveX也可以与其他编程语言和框架集成,如Java、C#、Swift和Angular等。

ReactiveX 有很多种实现,如RxAndroid、RxJS、RxSwift、RxRuby、RxCpp等。

3. RxJava简介

RxJava是ReactiveX的一种Java实现,用于建立可扩展、异步和事件驱动的应用程序。它基于观察者模式和迭代器模式,通过引入数据流的概念,使得在Java环境中处理异步操作和事件驱动的程序更加容易。

RxJava的主要特点包括:

  • 异步编程:RxJava提供了一种异步编程的方式,可以将耗时的任务如网络请求、文件读写等放到后台线程执行,避免阻塞主线程,提升应用的响应速度和用户体验。
  • 事件驱动:RxJava通过将事件封装成流数据,让你可以像处理数据流一样处理事件。例如,你可以使用 RxJava 来处理按钮点击事件、网络请求的响应事件等。
  • 可组合性:RxJava支持通过操作符组合多个异步任务,使得代码更加简洁、可读性更高。
  • 响应式编程:RxJava支持响应式编程,即当数据源发生变化时,会自动通知观察者进行相应的处理,使得代码更加健壮和灵活。
  • RxJava可以应用于各种场景,如网络请求、文件读写、UI事件处理等。它提供了一种简洁、灵活和可组合的编程方式,使得处理异步任务和事件流变得更加容易。

RxJava目前有三个主要分支,分别为RxJava1.x,RxJava2.x,RxJava3.x。 本文示例基于RxJava3.x

4. 几个重要的概念

(1) 观察者: Observer

观察事件变化并处理的主要角色。

(2) 消费者: Consumer

理解为一种特殊的观察者。

(3) 被观察者: 触发事件并决定什么时候发送事件的主要角色。

异常和完成也是一种事件。 - Observable,Flowable,Single,Completable,Maybe五种被观察者。 - Flowable支持背压 - Signle,Completable,Maybe是简化版的Observable。

(4) 订阅:观察者和被观察者建立关联

观察者订阅被观察者。

二、 创建操作符

1. 什么是操作符

在RxJava中,操作符是一种函数,用于处理数据流,对事件进行转换、过滤、组合等操作。RxJava提供了许多操作符,如map、filter、reduce、concat等,这些操作符可以帮助开发人员更加灵活和方便地处理异步和事件驱动的程序。通过操作符,你可以对数据流进行各种复杂的操作,从而构建出更加复杂的应用程序。

RxJava有5类操作符,分别是: - 创建操作符(Creation Operators):用于创建被观察者(Observable)对象以及发送事件。 - 转换操作符(Transforming Operators):用于变换被观察者(Observable)发送的事件。这些- - 操作符包括:map、flatMap、concatMap、switchMap、buffer、groupBy等等。 - 合并操作符(Combining Operators):用于组合多个被观察者(Observable)并合并需要发送的事件。这些操作符包括:concat、merge、mergeArray、concatArray、reduce、collect、startWith、zip、count等。 - 功能性操作符(Functional Operators):用于辅助被观察者(Observable)发送事件时实现一些功能性需求,如错误处理,线程调度等。 - 过滤操作符(Filtering Operators):用于将Observable发送的数据进行过滤和选择。

2. 项目引入RxJava

(1) maven 引用

<dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.0.0</version>
        </dependency>

(2) gradle 引用

dependencies {
  implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}

3. 创建操作符项目示例


在这里插入图片描述


package org.example;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;

public class Main {
    public static void main(String[] args) {
        Observable.create(emitter -> {
            // 事件产生的地方
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onNext("3");
            emitter.onComplete();
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe( Disposable d) {
                System.out.println("建立订阅时调用: onSubscribe");
            }

            @Override
            public void onNext( Object o) {
                System.out.println("调用 onNext");
            }

            @Override
            public void onError( Throwable e) {
                System.out.println("调用 onError");
            }

            @Override
            public void onComplete() {
                System.out.println("订阅执行完成 onComplete");
            }
        });
    }
}

执行效果:

建立订阅时调用: onSubscribe
调用 onNext
调用 onNext
调用 onNext
订阅执行完成 onComplete


在这里插入图片描述

说明:Observer 去观察 Observable,可以在接口中写观察后的逻辑代码。为了方便书写和复用,后面将Observer定义单独拿出来。

3. 消费者

消费者与订阅者相比会简化处理逻辑。

package org.example;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;

public class Main {
    public static void main(String[] args) {
        Observer observer = new Observer<>() {
            @Override
            public void onSubscribe(@NotNull Disposable d) {
                System.out.println("建立订阅时调用: onSubscribe");
            }

            @Override
            public void onNext(@NotNull Object o) {
                System.out.println("调用 onNext");
            }

            @Override
            public void onError(@NotNull Throwable e) {
                System.out.println("调用 onError");
            }

            @Override
            public void onComplete() {
                System.out.println("订阅执行完成 onComplete");
            }
        };
        Observable.create(emitter -> {
            // 事件产生的地方
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onNext("3");

            emitter.onComplete();
        }).subscribe(o -> {
            System.out.println("accept 对象" + o);
        }, throwable -> {
            System.out.println("异常发生" + throwable);
        });
    }
}

运行效果:

accept 对象1
accept 对象2
accept 对象3

4. just 操作符

just操作符用来大幅简化Observable的创建,但最多只能传10个事件。

package org.example;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;

public class Main {
    public static void main(String[] args) {
        Observer observer = new Observer<>() {
            @Override
            public void onSubscribe(@NotNull Disposable d) {
                System.out.println("建立订阅时调用: onSubscribe");
            }

            @Override
            public void onNext(@NotNull Object o) {
                System.out.println("调用 onNext:" + o);
            }

            @Override
            public void onError(@NotNull Throwable e) {
                System.out.println("调用 onError");
            }

            @Override
            public void onComplete() {
                System.out.println("订阅执行完成 onComplete");
            }
        };

        Observable.just("1","ABC","2").subscribe(observer);
    }
}

执行效果:

建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:ABC
调用 onNext:2
订阅执行完成 onComplete

5. fromArray操作符,类似于just,但数组不受10个限制

这里沿用上面的observer。

Observable.fromArray("1","ABC","2")
       .subscribe(observer);


在这里插入图片描述


6. fromIterable 迭代器操作符

ArrayList<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        Observable.fromIterable(list)
            .subscribe(observer);


在这里插入图片描述


7. fromFuture操作符

package org.example;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        Observer observer = new Observer<>() {
            @Override
            public void onSubscribe(@NotNull Disposable d) {
                System.out.println("建立订阅时调用: onSubscribe");
            }

            @Override
            public void onNext(@NotNull Object o) {
                System.out.println("调用 onNext:" + o);
            }

            @Override
            public void onError(@NotNull Throwable e) {
                System.out.println("调用 onError");
            }

            @Override
            public void onComplete() {
                System.out.println("订阅执行完成 onComplete");
            }
        };

        Observable.fromFuture(new Future<>() {
            @Override
            public boolean cancel(boolean b) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return false;
            }

            @Override
            public Object get() {
                return "abc";
            }

            @Override
            public Object get(long l, TimeUnit timeUnit) {
                return null;
            }
        }).subscribe(observer);
    }
}


在这里插入图片描述


8. fromCallable

Observable.fromCallable(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                return "ABC";
            }
        }).subscribe(observer);


在这里插入图片描述

相关文章

Java简介(“java”)

/* *作者:呆萌老师 *?csdn认证讲师 *?51cto高级讲师 *?腾讯课堂认证讲师 *?网易云课堂认证讲师 *?华为开发者学堂认证讲师 *?爱奇艺千人名师计划成员 *在这里给大家分享技术、知识...

Java怎么学?看完这篇文章就知道了

当今,Java编程在日常生活中的运用越来越广泛,相关岗位很有发展前景,因此不少人都想要学习Java。那今天咱们就来聊聊,Java怎么学?这个问题小编非常拿手,下面,我就给大家提供一份详细的Java学...

六种常用事务解决方案,你方唱罢,我登场(没有最好只有更好)

1 事务概念在分布式系统中,为了保证数据的高可用,通常,我们会将数据保留多个副本(replica),这些副本会放置在不同的物理的机器上。为了对用户提供正确的 CRUD 等语义,我们需要保证这些放置在不...

关于Java正则和转义中\\和\\\\的理解

定义一个转义字符的目的是开始一个字符序列,使得转义字符开头的该字符序列具有不同于该字符序列单独出现时的语义。转义就是指转换该字符的原本意义,从而变成另外的意义。\作为Java的转义字符1.在java字...

简单了解下Java中锁的概念和原理(java 锁的是什么)

你好,这里是codetrend专栏“高并发编程基础”。Java提供了很多种锁的接口和实现,通过对各种锁的使用发现理解锁的概念是很重要的。Java的锁通过java代码实现,go语言的锁通过go实现,py...

玩转Java注解-元注解、内置注解、自定义注解的原理和实现

前言Java 注解(Annotation)又称 Java 标注,是 JDK5.0 引入的一种注释机制。 重点:和 Javadoc 不同,Java 标注可以通过反射获取标注内容。 大话空话不用说太多,简...