关于RxJava我会持续更新,从原理到使用一步步由浅至深带大家对RxJava有一个深入的学习,欢迎大家持续关注。如果觉得这里代码排版不是很舒服的读者可以关注我的微信公众号“IT工匠”,我会同步更新,另外微信公众号上还有很多互联网必备资源(涉及算法、数据结构、java、深度学习、计算机网络、python、Android等互联网技术资料),欢迎大家关注、交流。

android系统下的java编程详解(Android进阶必备RxJava全面学习)(1)

RxJava之所以强大的一大原因是其提供了功能完备的各类操作符,有了这些操作符,我们可以完成很多功能。所谓的操作符其实核心功能只有一个,那就是创建被观察者以及发送事件,如果看过我写的《RxJava全面学习(1)——原理介绍&入门使用》一文,一定会对Observable.create()方法有很深的印象,当时我们是使用Observable.create()方法去创建被观察者以及发送事件的,其实Observable.create()方法就是RxJava为我们提供的一个最基本的操作符,RxJava为我们提供的操作符大致有以下这些:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(2)

本文我们来对RxJava中的功能性操作符进行学习,本文目录如下:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(3)

功能性操作符

作用:在被观察者进行事件发送的时候完成一些比如线程调度、错误处理等的功能性任务。

连接观察者&被观察者

subscrible()

作用:连接观察者与被观察者,起到订阅的作用

具体使用:前面每一个实例都有用到。

线程调度

需求场景:快速指定观察者和被观察者所在的线程。

默认情况下观察者与被观察者都是工作在创建其的线程中,一般是主线程,要实现真正的异步任务,我们就需要让被观察者在子线程中生产时间、观察者在主线程中接收事件,比如在被观察者中执行耗时操作,将耗时操作的结果作为事件发送给观察者,观察者在UI线程中接收被观察者发送的事件(异步任务的处理结果),并更新UI。

为了实现上述场景,RxJava为我们提供了两个常用的线程调度的操作符:

  1. subscribeOn():指定被观察者所在的线程
  2. observeOn():指定观察者所在的线程

具体使用方法如下:

public void ThreadRun() { print("主线程:" Thread.currentThread().getId()); Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { print("被观察者所在的线程是:" Thread.currentThread().getId()); emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { print("观察者所在的线程是:" Thread.currentThread().getId()); print("接收到事件:" integer); } }); }

执行结果如下:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(4)

在上面的代码中,我们使用subscribeOn()和observeOn()分别指定了被观察者和观察者所在的线程,传入这两个方法中的实参Schedulers.newThread()代表相对于当前线程的新线程,我们还可以传入下面的几种值来达到不同的线程调度的目的:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(5)

注意以下几点:

  1. 若多次使用subscribeOn()指定被观察者所在线程,则只有第一次是有效的
  2. 若多次使用observeOn()指定观察者所在线程,则每指定一次都会进行一次线程调度

延迟操作

作用:在被观察者发送事件之前进行一些延迟的操作

delay()

作用:使得被观察者延迟一段时间再发送事件

delay()方法具有多个重载形式:

// 1. 指定延迟时间 // 参数1:时间;参数2:时间单位 delay(long delay,TimeUnit unit) // 2. 指定延迟时间 & 调度器 // 参数1:时间;参数2:时间单位;参数3 :线程调度器 delay(long delay,TimeUnit unit,mScheduler scheduler) // 3. 指定延迟时间 & 错误延迟 // 错误延迟,即:若存在Error事件,则如常执行,执行后再抛出错误异常 // 参数1:时间;参数2 :时间单位;参数3:错误延迟参数 delay(long delay,TimeUnit unit,boolean delayError) // 4. 指定延迟时间 & 调度器 & 错误延迟 // 参数1 :时间;参数2:时间单位;参数3:线程调度器;参数4:错误延迟参数 delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延迟多长时间并添加调度器,错误通知可以设置是否延迟

具体使用:

public void delayRun() { print(df.format(new Date()) "开始"); Observable.just(1, 2, 3) .delay(3, TimeUnit.SECONDS) // 延迟3s再发送,由于使用类似,所以此处不作全部展示 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print(df.format(new Date()) "订阅"); } @Override public void onNext(Integer value) { print(df.format(new Date()) "-接收到了事件" value); } @Override public void onError(Throwable e) { print(df.format(new Date()) "-对Error事件作出响应"); } @Override public void onComplete() { print(df.format(new Date()) "-对Complete事件作出响应"); } }); }

执行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(6)

在事件的生命周期中操作

需求场景:在事件发送 & 接收的整个生命周期过程中进行操作,如发送事件前的初始化、发送事件后的回调请求等

do()

作用:在事件的生命周期中调用

具体的分很多小的操作符,如下图所示:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(7)

具体使用:

public void doRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); emitter.onComplete(); } }).doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { print("doOnEach:" integerNotification.toString()); } }).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { print("doOnNext:" integer); } }).doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { print("doOnError:" throwable.getMessage()); } }).doOnComplete(new Action() { @Override public void run() throws Exception { print("doOnComplete"); } }).doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { print("doAfterNext:" integer); } }).doOnTerminate(new Action() { @Override public void run() throws Exception { print("doOnTerminate"); } }).doAfterTerminate(new Action() { @Override public void run() throws Exception { print("doAfterTerminate"); } }).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { print("doOnSubscribe"); } }).doOnDispose(new Action() { @Override public void run() throws Exception { print("doOnDispose"); } }).doOnLifecycle(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { print("doOnLifecycle-accept"); } }, new Action() { @Override public void run() throws Exception { print("doOnLifecycle-action"); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } }); }

运行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(8)

错误处理

需求场景:发送事件时出现错误的情况下的处理机制

常见的错误处理机制有两种:

  1. 发送数据
  1. 重试

onErrorReturn()

作用:捕捉错误事件,将错误事件转化为一个正常的事件,保证整个事件流不会出现错误事件。

具体使用:

public void onErrorRetrurnRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new NullPointerException()); emitter.onNext(4); } }).onErrorReturn(new Function<Throwable, Integer>() { @Override public Integer apply(Throwable throwable) throws Exception { print("捕获异常:" throwable.getMessage()); return 3; } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } }); }

运行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(9)

可以看到,使用了onErrorReturn()操作符之后当被观察者发送异常事件后会被捕捉,并且观察者可以接收到onErrorReturn()操作符的返回值,这样可以保证整个事件流不会出现错误事件,但是,当onErrorReturn()操作符捕捉到错误事件后,被观察者在错误事件之后发送的事件仍然是不会被继续发送的,就像上面例子中发送的事件”4”还是不会被发送,但是最终会发送一个Complete事件。

onErrorResumeNext()

作用:捕捉错误事件,将错误事件转化为一个新的被观察者进行发送

基本使用:

public void onErrorResumeNextRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Throwable("错误")); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception { //如果捕捉到Throwable,发送一个新的被观察者 return Observable.just(3, 4); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } }); }

运行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(10)

注意:onErrorResumeNext()只能捕捉到Throwable类型的错误事件,如果其捕捉到Exception类型的错误事件,会将错误事件传递给观察者的onError()方法。,如果想要捕捉Exception类型的错误事件,应该使用onExceptionResumeNext(),同样,若onExceptionResumeNext()捕捉到了Throwable类型的错误事件,则会将错误事件传递给观察者的onError()方法。

retry()

作用:重试,即当发生错误事件时让被观察者重新发送事件

具体使用:

public void retryRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onError(new Throwable("错误")); emitter.onNext(3); } //设置retry()中参数为3,表示最多允许被观察者重复发送3次 }).retry(3).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } }); }

运行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(11)

可以发现,当发送了错误事件之后,被观察者又重试了3次,知道第5次的时候才以错误事件终止。

retry()方法还有很多重载形式:

retry():出现错误时,让被观察者重新发送数据,若一直错误,则一直重新发送 retry(long time):出现错误时,让被观察者重新发送数据(具备重试次数限制),参数表示最多重试的次数 retry(Predicate predicate) :出现错误后,判断是否需要重新发送数据(若需要重新发送并且持续遇到错误,则持续重试),参数表示判断是否重新发送数据的逻辑 retry(new BiPredicate<Integer, Throwable>:出现错误后,判断是否需要重新发送数据(若需要重新发送 且持续遇到错误,则持续重试,参数表示判断是否需要重新发送数据的逻辑(传入当前重试次数 & 异常错误信息) retry(long time,Predicate predicate):出现错误后,判断是否需要重新发送数据(具备重试次数限制),参数表示重试次数以及判断逻辑

retryUntil()

作用:出现错误后,判断是否需要重新发送数据,若需要重新发送且持续遇到错误,则持续重试,作用类似于retry(Predicate predicate)

retryWhen()

作用:遇到错误时,将发生的错误传递给一个新的被观察者(Observable),并决定是否需要重新订阅原始被观察者(Observable)以及发送事件

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onError(new Exception("发生错误了")); e.onNext(3); } }) // 遇到error事件才会回调 .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception { // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型 // 返回Observable<?> = 新的被观察者 Observable(任意类型) // 此处有两种情况: // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件: // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件: return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception { // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件 // 该异常错误信息可在观察者中的onError()中获得 return Observable.error(new Throwable("retryWhen终止啦")); // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试) // return Observable.just(1); } }); } }) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件" value ); } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应" e.toString()); // 获取异常错误信息 } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });

运行结果:

android系统下的java编程详解(Android进阶必备RxJava全面学习)(12)

重复发送

需求场景:重复不断地发送被观察者事件

repeat()

作用:无条件地、重复发送被观察者事件,具备重载方法,可设置重复创建次数

repeatWhen()

作用:有条件地、重复发送 被观察者事件

注意:

  1. 若新被观察者(Observable)返回1个Complete / Error事件,则不重新订阅 & 发送原来的 Observable
  2. 若新被观察者(Observable)返回其余事件时,则重新订阅 & 发送原来的 `Observable

,