转载请标明出处,维权必究:http://77blogs.com/?p=512
本文基于RxJava 3.0.0
1 2 |
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0' implementation 'io.reactivex.rxjava3:rxjava:3.0.0' |
我们从最简单的开始,挑最主要的讲:
1 2 3 4 5 6 7 8 9 10 11 |
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } }); |
我们先看create方法:
1 2 3 4 |
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } |
该方法返回的是Observable对象,onAssembly方法可以占时不用理会,我们看
new ObservableCreate<T>(source):
1 2 3 |
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } |
也就是我们最终返回的是ObservableCreate对象,他继承Observable,持有ObservableOnSubscribe的引用。
接下来看subscribe方法:
1 2 3 |
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } |
接着:
1 2 3 4 5 6 7 8 9 10 |
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { ObjectHelper.requireNonNull(onNext, "onNext is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ObjectHelper.requireNonNull(onComplete, "onComplete is null"); ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; } |
可以看出,我们传进来的Consumer对象被放进了LambdaObserver中。接下来重点是subscribe(ls);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } |
subscribeActual(observer);是一个抽象方法,实现它的方法有很多,从上面的分析中可以知道,调用下面方法的是ObservableCreate对象。
1 2 3 4 5 6 |
.subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } }); |
所以subscribeActual(observer);调用的是ObservableCreate里面的。
1 2 3 4 5 6 7 8 9 10 11 |
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } |
这个方法里面构造了一个发射器CreateEmitter,传进去的observer就是上面所说的LambdaObserver,里面持有我们传进去的Consumer对象。
observer.onSubscribe(parent);调用的是LambdaObserver里面的onSubscribe;
1 2 3 4 5 6 7 8 9 10 |
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); onError(ex); } } |
这个方法里面的onSubscribe.accept(this);中的onSubscribe,是我们上面的方法中的第四个参数,是一个空的Consumer,所以不用理他:
1 2 3 |
public final Disposable subscribe(Consumer<? super T> onNext) { return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } |
我们回过头看subscribeActual方法中的source.subscribe(parent);
source是我们之前传进去的ObservableOnSubscribe对象,source.subscribe便是调用ObservableOnSubscribe的subscribe方法,传进去的是一个发射器。
到此我们总算调用到了外面这个方法了。
(也就是说必须调用subscribe方法,才能发射事件)
1 2 3 4 5 6 |
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } }) |
我们再看e.onNext(1),在ObservableCreate的内部类CreateEmitter里面:
1 2 3 4 5 6 7 8 9 10 |
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } |
这里的observer我们上面已经分析过,是构造发射器的时候传进去的LambdaObserver,LambdaObserver里面持有我们传进去的Consumer对象。
于是observer.onNext(t)便是调用LambdaObserver的onNext(t)
1 2 3 4 5 6 7 8 9 10 11 |
@Override public void onNext(T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); } } } |
onNext便是它持有的的Consumer对象,于是onNext.accept(t)便回调到了外面这一层的accept:
1 2 3 4 5 6 7 8 9 10 11 |
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext(1); } }).subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d("A", String.valueOf(o)); } }); |
到此结束。