RxJava源码解析一

转载请标明出处,维权必究:http://77blogs.com/?p=512

本文基于RxJava 3.0.0

我们从最简单的开始,挑最主要的讲:

我们先看create方法:

该方法返回的是Observable对象,onAssembly方法可以占时不用理会,我们看

new ObservableCreate<T>(source):

也就是我们最终返回的是ObservableCreate对象,他继承Observable,持有ObservableOnSubscribe的引用。

 

接下来看subscribe方法:

接着:

可以看出,我们传进来的Consumer对象被放进了LambdaObserver中。接下来重点是subscribe(ls);

subscribeActual(observer);是一个抽象方法,实现它的方法有很多,从上面的分析中可以知道,调用下面方法的是ObservableCreate对象。

所以subscribeActual(observer);调用的是ObservableCreate里面的。

这个方法里面构造了一个发射器CreateEmitter,传进去的observer就是上面所说的LambdaObserver,里面持有我们传进去的Consumer对象。

observer.onSubscribe(parent);调用的是LambdaObserver里面的onSubscribe;

这个方法里面的onSubscribe.accept(this);中的onSubscribe,是我们上面的方法中的第四个参数,是一个空的Consumer,所以不用理他:

我们回过头看subscribeActual方法中的source.subscribe(parent);

source是我们之前传进去的ObservableOnSubscribe对象,source.subscribe便是调用ObservableOnSubscribe的subscribe方法,传进去的是一个发射器。

到此我们总算调用到了外面这个方法了。

(也就是说必须调用subscribe方法,才能发射事件)

我们再看e.onNext(1),在ObservableCreate的内部类CreateEmitter里面:

这里的observer我们上面已经分析过,是构造发射器的时候传进去的LambdaObserver,LambdaObserver里面持有我们传进去的Consumer对象。

 

于是observer.onNext(t)便是调用LambdaObserver的onNext(t)

onNext便是它持有的的Consumer对象,于是onNext.accept(t)便回调到了外面这一层的accept:

到此结束。