RxJava学习入门1.基本概念和常用的创建操作符
一、简介
1. 什么是响应式编程
响应式编程是一种编程范式,旨在方便地表达静态或动态的数据流,并自动将变化的值通过数据流进行传播。RxJava是响应式编程的一个实现,它基于Java虚拟机,可以方便地在Java环境中使用。
响应式编程的基本原理包括:
- 响应式编程通过异步和数据流来构建事物关系,这种关系也可以理解为业务逻辑。
- 数据流和异步是实现响应式编程的关键,异步和数据流都是为了正确地构建事物的关系。
- 响应式编程可以自动将变化的值通过数据流进行传播,使得相关的计算模型可以自动更新。
2. ReactiveX简介
ReactiveX是一种响应式编程库,提供了处理异步和事件驱动操作的一组操作符和工具。它基于数据流的响应式编程,使用函数式编程风格和链式调用处理数据流。ReactiveX包含观察者模式、迭代器模式和函数式编程概念,简化了异步编程和事件处理。它可以在许多领域广泛应用,如前端开发、后端服务、移动应用程序和响应式UI。
ReactiveX的主要组成部分包括被观察者/观察者、操作符和调度器。被观察者可以产生异步或事件驱动的数据流,而观察者则订阅这些数据流并接收数据。操作符用于处理数据流,包括转换、过滤、合并和调度等操作。调度器用于管理后台任务和事件循环,以支持高效的异步处理。
ReactiveX提供了一种更简单、更灵活的编程范式,使代码可读性更高,减少了bug的产生。它与传统的回调方法和Promise相比,具有更好的组合性和可读性。ReactiveX也可以与其他编程语言和框架集成,如Java、C#、Swift和Angular等。
ReactiveX 有很多种实现,如RxAndroid、RxJS、RxSwift、RxRuby、RxCpp等。
3. RxJava简介
RxJava是ReactiveX的一种Java实现,用于建立可扩展、异步和事件驱动的应用程序。它基于观察者模式和迭代器模式,通过引入数据流的概念,使得在Java环境中处理异步操作和事件驱动的程序更加容易。
RxJava的主要特点包括:
- 异步编程:RxJava提供了一种异步编程的方式,可以将耗时的任务如网络请求、文件读写等放到后台线程执行,避免阻塞主线程,提升应用的响应速度和用户体验。
- 事件驱动:RxJava通过将事件封装成流数据,让你可以像处理数据流一样处理事件。例如,你可以使用 RxJava 来处理按钮点击事件、网络请求的响应事件等。
- 可组合性:RxJava支持通过操作符组合多个异步任务,使得代码更加简洁、可读性更高。
- 响应式编程:RxJava支持响应式编程,即当数据源发生变化时,会自动通知观察者进行相应的处理,使得代码更加健壮和灵活。
- RxJava可以应用于各种场景,如网络请求、文件读写、UI事件处理等。它提供了一种简洁、灵活和可组合的编程方式,使得处理异步任务和事件流变得更加容易。
RxJava目前有三个主要分支,分别为RxJava1.x,RxJava2.x,RxJava3.x。 本文示例基于RxJava3.x
4. 几个重要的概念
(1) 观察者: Observer
观察事件变化并处理的主要角色。
(2) 消费者: Consumer
理解为一种特殊的观察者。
(3) 被观察者: 触发事件并决定什么时候发送事件的主要角色。
异常和完成也是一种事件。 - Observable,Flowable,Single,Completable,Maybe五种被观察者。 - Flowable支持背压 - Signle,Completable,Maybe是简化版的Observable。
(4) 订阅:观察者和被观察者建立关联
观察者订阅被观察者。
二、 创建操作符
1. 什么是操作符
在RxJava中,操作符是一种函数,用于处理数据流,对事件进行转换、过滤、组合等操作。RxJava提供了许多操作符,如map、filter、reduce、concat等,这些操作符可以帮助开发人员更加灵活和方便地处理异步和事件驱动的程序。通过操作符,你可以对数据流进行各种复杂的操作,从而构建出更加复杂的应用程序。
RxJava有5类操作符,分别是: - 创建操作符(Creation Operators):用于创建被观察者(Observable)对象以及发送事件。 - 转换操作符(Transforming Operators):用于变换被观察者(Observable)发送的事件。这些- - 操作符包括:map、flatMap、concatMap、switchMap、buffer、groupBy等等。 - 合并操作符(Combining Operators):用于组合多个被观察者(Observable)并合并需要发送的事件。这些操作符包括:concat、merge、mergeArray、concatArray、reduce、collect、startWith、zip、count等。 - 功能性操作符(Functional Operators):用于辅助被观察者(Observable)发送事件时实现一些功能性需求,如错误处理,线程调度等。 - 过滤操作符(Filtering Operators):用于将Observable发送的数据进行过滤和选择。
2. 项目引入RxJava
(1) maven 引用
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.0.0</version>
</dependency>
(2) gradle 引用
dependencies {
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}
3. 创建操作符项目示例
在这里插入图片描述
package org.example;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
public class Main {
public static void main(String[] args) {
Observable.create(emitter -> {
// 事件产生的地方
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe( Disposable d) {
System.out.println("建立订阅时调用: onSubscribe");
}
@Override
public void onNext( Object o) {
System.out.println("调用 onNext");
}
@Override
public void onError( Throwable e) {
System.out.println("调用 onError");
}
@Override
public void onComplete() {
System.out.println("订阅执行完成 onComplete");
}
});
}
}
执行效果:
建立订阅时调用: onSubscribe
调用 onNext
调用 onNext
调用 onNext
订阅执行完成 onComplete
在这里插入图片描述
说明:Observer 去观察 Observable,可以在接口中写观察后的逻辑代码。为了方便书写和复用,后面将Observer定义单独拿出来。
3. 消费者
消费者与订阅者相比会简化处理逻辑。
package org.example;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;
public class Main {
public static void main(String[] args) {
Observer observer = new Observer<>() {
@Override
public void onSubscribe(@NotNull Disposable d) {
System.out.println("建立订阅时调用: onSubscribe");
}
@Override
public void onNext(@NotNull Object o) {
System.out.println("调用 onNext");
}
@Override
public void onError(@NotNull Throwable e) {
System.out.println("调用 onError");
}
@Override
public void onComplete() {
System.out.println("订阅执行完成 onComplete");
}
};
Observable.create(emitter -> {
// 事件产生的地方
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}).subscribe(o -> {
System.out.println("accept 对象" + o);
}, throwable -> {
System.out.println("异常发生" + throwable);
});
}
}
运行效果:
accept 对象1
accept 对象2
accept 对象3
4. just 操作符
just操作符用来大幅简化Observable的创建,但最多只能传10个事件。
package org.example;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;
public class Main {
public static void main(String[] args) {
Observer observer = new Observer<>() {
@Override
public void onSubscribe(@NotNull Disposable d) {
System.out.println("建立订阅时调用: onSubscribe");
}
@Override
public void onNext(@NotNull Object o) {
System.out.println("调用 onNext:" + o);
}
@Override
public void onError(@NotNull Throwable e) {
System.out.println("调用 onError");
}
@Override
public void onComplete() {
System.out.println("订阅执行完成 onComplete");
}
};
Observable.just("1","ABC","2").subscribe(observer);
}
}
执行效果:
建立订阅时调用: onSubscribe
调用 onNext:1
调用 onNext:ABC
调用 onNext:2
订阅执行完成 onComplete
5. fromArray操作符,类似于just,但数组不受10个限制
这里沿用上面的observer。
Observable.fromArray("1","ABC","2")
.subscribe(observer);
在这里插入图片描述
6. fromIterable 迭代器操作符
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
Observable.fromIterable(list)
.subscribe(observer);
在这里插入图片描述
7. fromFuture操作符
package org.example;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
Observer observer = new Observer<>() {
@Override
public void onSubscribe(@NotNull Disposable d) {
System.out.println("建立订阅时调用: onSubscribe");
}
@Override
public void onNext(@NotNull Object o) {
System.out.println("调用 onNext:" + o);
}
@Override
public void onError(@NotNull Throwable e) {
System.out.println("调用 onError");
}
@Override
public void onComplete() {
System.out.println("订阅执行完成 onComplete");
}
};
Observable.fromFuture(new Future<>() {
@Override
public boolean cancel(boolean b) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get() {
return "abc";
}
@Override
public Object get(long l, TimeUnit timeUnit) {
return null;
}
}).subscribe(observer);
}
}
在这里插入图片描述
8. fromCallable
Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return "ABC";
}
}).subscribe(observer);
在这里插入图片描述