引言

操作符简介

操作符:将发出的数据进行处理并再发送

变化传播--通过操作符实现变化,并能向下传播

android java编程规范(架构师之响应式编程RxJava操作符源码分析与实现)(1)

1.RxJava1操作符源码分析

1.Func1接口

2.operator接口

1.1RxJava1实例

Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { if (!subscriber.isUnsubscribed()) { subscriber.onNext("1"); subscriber.onNext("2"); subscriber.onCompleted(); } } }). //处理 map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s) 2; } }). subscribe(new Observer<Integer>() { @Override public void onCompleted() { Log.d("kpioneer", "onCompleted:"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { Log.d("kpioneer", "onNext:" integer ",integer instanceOf" integer.getClass()); } });

运行

06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer 06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer 06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:

1.2RxJava1操作符源码

android java编程规范(架构师之响应式编程RxJava操作符源码分析与实现)(2)

RxJava1中OnSubscribeMap类

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { final Observable<T> source; final Func1<? super T, ? extends R> transformer; public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } @Override public void call(final Subscriber<? super R> o) { MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual; final Func1<? super T, ? extends R> mapper; boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { if (done) { RxJavaHooks.onError(e); return; } done = true; actual.onError(e); } @Override public void onCompleted() { if (done) { return; } actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } } }

RxJava1中OnSubscribeLift类

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); parent.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }

1.3变换的原理(核心操作符lift):

1.接收原OnSubscribe的当前的Operator

2.创建一个新的OnSubscribe并返回新的Observable

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator)); }

3.用新的Subscriber包裹旧的Subscriber

MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent);

4.在新的Subscriber里做完变换再传给旧的Subscriber

@Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); }

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator)); }

分析

核心实现使用了代理机制

android java编程规范(架构师之响应式编程RxJava操作符源码分析与实现)(3)

2.RxJava2操作符源码分析

2.1.RxJava2实例

Observable. create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { if (!e.isDisposed()) { e.onNext("1"); e.onNext("2"); e.onComplete(); } } }). map(new function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s) 2; } }). subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d("kpioneer", "onSubscribe:"); } @Override public void onNext(Integer value) { Log.d("kpioneer", "onNext:" value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d("kpioneer", "onComplete" ); } }); Flowable. create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { if (!e.isCancelled()) { e.onNext("1"); e.onNext("2"); e.onComplete(); } } }, BackpressureStrategy.DROP). map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s) 2; } }). subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE); Log.d("kpioneer", "onSubscribe"); } @Override public void onNext(Integer integer) { Log.d("kpioneer", "onNext:" integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { Log.d("kpioneer", "onComplete"); } });

运行

06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe: 06-11 10:20:56.688 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:3 06-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:4 06-11 10:20:56.698 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete 06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onSubscribe 06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:3 06-11 10:20:56.758 16675-16675/com.haocai.rxjavademo D/kpioneer: onNext:4 06-11 10:20:56.768 16675-16675/com.haocai.rxjavademo D/kpioneer: onComplete

2.2.RxJava2操作符源码

Function接口

ObservableMap :无背压

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } } }

1.ObservableMap继承了AbstractObservableWithUpstream抽象类

2.利用了AbstractObservableWithUpstream中subscribeActual方法

3.用原Observable去subscribe变换后的Observer

public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> { /** The actual operator. */ final ObservableOperator<? extends R, ? super T> operator; public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) { super(source); this.operator = operator; } @Override public void subscribeActual(Observer<? super R> s) { Observer<? super T> observer; try { observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " operator " returned a null Observer"); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Disposable already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } source.subscribe(observer); } }

FlowableMap: 有背压

public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> { final Function<? super T, ? extends U> mapper; public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) { super(source); this.mapper = mapper; } @Override protected void subscribeActual(Subscriber<? super U> s) { if (s instanceof ConditionalSubscriber) { source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper)); } else { source.subscribe(new MapSubscriber<T, U>(s, mapper)); } } static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> { final Function<? super T, ? extends U> mapper; MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } } static final class MapConditionalSubscriber<T, U> extends BasicFuseableConditionalSubscriber<T, U> { final Function<? super T, ? extends U> mapper; MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) { super(actual); this.mapper = function; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public boolean tryOnNext(T t) { if (done) { return false; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return true; } return actual.tryOnNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } }

1.FlowableMap继承了AbstractFlowableWithUpstream

2.利用了AbstractFlowableWithUpstream中的subscribeActual方法

3.用原Flowable去subscribe变换后的Subscriber

FlowableLift

public final class FlowableLift<R, T> extends AbstractFlowableWithUpstream<T, R> { /** The actual operator. */ final FlowableOperator<? extends R, ? super T> operator; public FlowableLift(Flowable<T> source, FlowableOperator<? extends R, ? super T> operator) { super(source); this.operator = operator; } @Override public void subscribeActual(Subscriber<? super R> s) { try { Subscriber<? super T> st = operator.apply(s); if (st == null) { throw new NullPointerException("Operator " operator " returned a null Subscriber"); } source.subscribe(st); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Subscription has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } }

2.3Operator接口

1.实现此接口 2.在subscribeActual中做变换 3.用于扩展自定义操作符

分析

RxJava2 有背压和无背压核心实现使用了代理机制

3.RxJava1 操作符功能仿写实现

Operator接口实现

  1. Operator接口是操作符的抽象接口
  2. 个操作符实现Operator接口用于处理具体的变换

lift操作符

  1. 变换的基本原理
  2. 各操作符均实现Operator接口,并调用lift操作符

map操作符

  1. 最基本的操作符
  2. 顾名思义,用于做映射

public class Caller<T> { final OnCall<T> onCall; public Caller(OnCall<T> onCall) { this.onCall = onCall; } public static <T> Caller<T> create(OnCall<T> onCall) { return new Caller<>(onCall); } public Calling call(Receiver<T> receiver) { this.onCall.call(receiver); return receiver; } public final <R> Caller<R> lift(final Operator<R, T> operator) { return create(new OnCallLift<>(onCall, operator)); } public final <R> Caller<R> map(Func1<T, R> func) { return lift(new MapOperator<T, R>(func)); } public interface OnCall<T> extends Action1<Receiver<T>> { } public interface Operator<R, T> extends Func1<Receiver<R>, Receiver<T>> { } }

public interface Func1<T,R>{ R call(T t); }

public class MapOperator<T, R> implements Caller.Operator<R, T> { private final Func1<T, R> mapper; public MapOperator(Func1<T, R> mapper) { this.mapper = mapper; } @Override public Receiver<T> call(Receiver<R> rReceiver) { return new MapReceiver<>(rReceiver, this.mapper); } }

public class MapReceiver<T, R> extends Receiver<T> { private final Receiver<R> actual; private final Func1<T, R> mapper; public MapReceiver(Receiver<R> actual, Func1<T, R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onCompleted() { this.actual.onCompleted(); } @Override public void onError(Throwable t) { this.actual.onError(t); } @Override public void onReceive(T t) { R tR = this.mapper.call(t); this.actual.onReceive(tR); } }

public class OnCallLift<T, R> implements Caller.OnCall<R> { private final Caller.OnCall<T> parent; private final Caller.Operator<R, T> operator; public OnCallLift(Caller.OnCall<T> parent, Caller.Operator<R, T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Receiver<R> rReceiver) { Receiver<T> tReceiver = this.operator.call(rReceiver); this.parent.call(tReceiver); } }

调用

public class Lesson2_2Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new Caller.OnCall<String>() { @Override public void call(Receiver<String> stringReceiver) { if (!stringReceiver.isUnCalled()) { stringReceiver.onReceive("1"); stringReceiver.onReceive("2"); stringReceiver.onCompleted(); } } }). map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s) 2; } }). call(new Receiver<Integer>() { @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" integer); } }); } }

Log输出

06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:3 06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onReceive:4 06-11 16:45:26.988 10850-10850/com.haocai.rxjavademo D/kpioneer: onCompleted

3.RxJava2(无背压) 操作符功能仿写实现

CallerWithUpstream(类似于AbstractObservableWithUpstream)

  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法

map操作符

  1. 最基本的操作符
  2. 顾名思义,用于做映射

CallerOperator接口

  1. 在callActual中做变换
  2. 可用于扩展操作符

相关代码:

public abstract class Caller<T> { public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) { return new CallerCreate<>(callerOnCall); } public void call(Callee<T> callee) { callActual(callee); } protected abstract void callActual(Callee<T> callee); public <R> Caller<R> lift(CallerOperator<R, T> operator) { return new CallerLift<>(this, operator); } public <R> Caller<R> map(Function<T, R> function) { return new CallerMap<>(this, function); } }

public interface CallerOperator<T,R> { Callee<R> call(Callee<T> callee); }

/** * Created by Xionghu on 2018/6/11. * Desc:返回源Caller */ public interface CallerSource<T> { Caller<T> source(); }

public abstract class CallerWithUpstream<T, R> extends Caller<R> implements CallerSource<T> { protected final Caller<T> source; public CallerWithUpstream(Caller<T> source) { this.source = source; } @Override public Caller<T> source() { return source; } }

public class CallerLift<R, T> extends CallerWithUpstream<T, R> { private final CallerOperator<R, T> mOperator; public CallerLift(Caller<T> source, CallerOperator<R, T> mOperator) { super(source); this.mOperator = mOperator; } @Override protected void callActual(Callee<R> callee) { Callee<T> tCallee = mOperator.call(callee); source.call(tCallee); } }

public interface Function<T, R> { R call(T t); }

public class CallerMap<T, R> extends CallerWithUpstream<T, R> { private Function<T, R> function; public CallerMap(Caller<T> source, Function<T, R> function) { super(source); this.function = function; } @Override protected void callActual(Callee<R> callee) { source.call(new MapCallee<>(callee, function)); } static class MapCallee<T, R> implements Callee<T> { private final Callee<R> mCallee; private final Function<T, R> mFunction; public MapCallee(Callee<R> mCallee, Function<T, R> mFunction) { this.mCallee = mCallee; this.mFunction = mFunction; } @Override public void onCall(Release release) { mCallee.onCall(release); } @Override public void onReceive(T t) { R tR = mFunction.call(t); mCallee.onReceive(tR); } @Override public void onCompleted() { mCallee.onCompleted(); } @Override public void onError(Throwable t) { mCallee.onError(t); } } }

/** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */ public class Lesson2_3Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Caller. create(new CallerOnCall<String>() { @Override public void call(CallerEmitter<String> callerEmitter) { callerEmitter.onReceive("1"); callerEmitter.onReceive("2"); callerEmitter.onCompleted(); } }). map(new Function<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s); } }). call(new Callee<Integer>() { @Override public void onCall(Release release) { Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" integer); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { } }); Caller. create(new CallerOnCall<String>() { @Override public void call(CallerEmitter<String> callerEmitter) { callerEmitter.onReceive("3"); callerEmitter.onReceive("4"); callerEmitter.onCompleted(); } }). lift(new CallerOperator<Integer, String>() { @Override public Callee<String> call(final Callee<Integer> callee) { return new Callee<String>() { @Override public void onCall(Release release) { callee.onCall(release); } @Override public void onReceive(String s) { callee.onReceive(Integer.parseInt(s)); } @Override public void onCompleted() { callee.onCompleted(); } @Override public void onError(Throwable t) { callee.onError(t); } }; } }). call(new Callee<Integer>() { @Override public void onCall(Release release) { Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" integer); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } }); } }

06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:1 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:2 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCall 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:3 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onReceive:4 06-11 18:03:27.268 24409-24409/com.haocai.rxjavademo D/kpioneer: onCompleted

4.RxJava2(有背压) 操作符功能仿写实现

TelephonerOperator接口

  1. 在callActual中做变换
  2. 可用于扩展操作符

TelephonerWithUpstream(类似于AbstractObservableWithUpstream)

  1. 一个抽象类
  2. 有callActual方法
  3. 实现操作符需实现此方法

相关源码

public abstract class Telephoner<T> { public static <T> Telephoner<T> create(TelephonerOnCall<T> telephonerOnCall){ return new TelephonerCreate<>(telephonerOnCall); } public void call(Receiver<T> receiver) { callActual(receiver);} protected abstract void callActual(Receiver<T> receiver); public <R> Telephoner<R> map(Function<T, R> function) { return new TelephonerMap<>(this, function); } public <R> Telephoner<R> lift(TelephonerOperator<R, T> telephonerOperator) { return new TelephonerLift<>(this, telephonerOperator); } }

/** * Created by Xionghu on 2018/6/12. * Desc: lift操作符 */ public class TelephonerLift<R, T> extends TelephonerWithUpstream<T, R> { private final TelephonerOperator<R, T> operator; public TelephonerLift(Telephoner<T> source, TelephonerOperator<R, T> operator) { super(source); this.operator = operator; } @Override protected void callActual(Receiver<R> receiver) { Receiver<T> tReceiver = operator.call(receiver); source.call(tReceiver); } }

import com.haocai.mylibrary.rxJava2.Function; /** * Created by Xionghu on 2018/6/12. * Desc: map操作符 */ public class TelephonerMap<T, R> extends TelephonerWithUpstream<T, R> { private Function<T, R> trFunction; public TelephonerMap(Telephoner<T> source, Function<T, R> trFunction) { super(source); this.trFunction = trFunction; } @Override protected void callActual(Receiver<R> receiver) { source.call(new MapReceiver<>(receiver, trFunction)); } static class MapReceiver<T, R> implements Receiver<T> { private final Receiver<R> rReceiver; private final Function<T, R> trFunction; public MapReceiver(Receiver<R> rReceiver, Function<T, R> trFunction) { this.rReceiver = rReceiver; this.trFunction = trFunction; } @Override public void onCall(Drop d) { rReceiver.onCall(d); } @Override public void onReceive(T t) { R tr = trFunction.call(t); rReceiver.onReceive(tr); } @Override public void onError(Throwable t) { rReceiver.onError(t); } @Override public void onCompleted() { rReceiver.onCompleted(); } } }

/** * Created by Xionghu on 2018/6/12. * Desc: 操作符接口 */ public interface TelephonerOperator<T, R> { Receiver<R> call(Receiver<T> callee); }

/** * Created by Xionghu on 2018/6/11. * Desc: 返回源Telephoner */ public interface TelephonerSource<T> { Telephoner<T> source(); }

public abstract class TelephonerWithUpstream<T, R> extends Telephoner<R> implements TelephonerSource { protected final Telephoner<T> source; public TelephonerWithUpstream(Telephoner<T> source) { this.source = source; } @Override public Telephoner source() { return source; } }

import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import com.haocai.mylibrary.rxJava2.Function; import com.haocai.mylibrary.rxJava2.backpressure.Drop; import com.haocai.mylibrary.rxJava2.backpressure.Receiver; import com.haocai.mylibrary.rxJava2.backpressure.Telephoner; import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter; import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall; import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator; import com.haocai.rxjavademo.R; import butterknife.ButterKnife; import butterknife.OnClick; /** * Created by Xionghu on 2018/6/11. * Desc: 仿写RxJava2 无背压 操作符方法 */ public class Lesson2_4Activity extends AppCompatActivity { @Override protected void onCreate(final Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_custom_test); ButterKnife.bind(this); } @OnClick(R.id.testDo) public void onViewClicked() { Telephoner. create(new TelephonerOnCall<String>() { @Override public void call(TelephonerEmitter<String> telephonerEmitter) { telephonerEmitter.onReceive("1"); telephonerEmitter.onReceive("2"); telephonerEmitter.onCompleted(); } }). map(new Function<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s); } }). call(new Receiver<Integer>() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" integer); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } }); Telephoner. create(new TelephonerOnCall<String>() { @Override public void call(TelephonerEmitter<String> telephonerEmitter) { telephonerEmitter.onReceive("3"); telephonerEmitter.onReceive("4"); telephonerEmitter.onCompleted(); } }). lift(new TelephonerOperator<Integer, String>() { @Override public Receiver<String> call(final Receiver<Integer> receiver) { return new Receiver<String>() { @Override public void onCall(Drop d) { receiver.onCall(d); } @Override public void onReceive(String s) { receiver.onReceive(Integer.parseInt(s)); } @Override public void onError(Throwable t) { receiver.onError(t); } @Override public void onCompleted() { receiver.onCompleted(); } }; } }). call(new Receiver<Integer>() { @Override public void onCall(Drop d) { d.request(Long.MAX_VALUE); Log.d("kpioneer", "onCall"); } @Override public void onReceive(Integer integer) { Log.d("kpioneer", "onReceive:" integer); } @Override public void onError(Throwable t) { Log.d("kpioneer", "onError"); } @Override public void onCompleted() { Log.d("kpioneer", "onCompleted"); } }); } }

06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:1 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:2 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCall 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:3 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onReceive:4 06-12 09:56:50.108 22364-22364/com.haocai.rxjavademo D/kpioneer: onCompleted

结语

至此响应式编程RxJava操作符源码分析与实现已完成如有需要RxJava等相关教程及资料的朋友可以私信我!

,