RxJava源码解析一

转载请标明出处,维权必究:http://77blogs.com/?p=512

本文基于RxJava 3.0.0

我们从最简单的开始,挑最主要的讲:

我们先看create方法:

该方法返回的是Observable对象,onAssembly方法可以占时不用理会,我们看

new ObservableCreate<T>(source):

也就是我们最终返回的是ObservableCreate对象,他继承Observable,持有ObservableOnSubscribe的引用。

 

接下来看subscribe方法:

接着:

可以看出,我们传进来的Consumer对象被放进了LambdaObserver中。接下来重点是subscribe(ls);

subscribeActual(observer);是一个抽象方法,实现它的方法有很多,从上面的分析中可以知道,调用下面方法的是ObservableCreate对象。

所以subscribeActual(observer);调用的是ObservableCreate里面的。

这个方法里面构造了一个发射器CreateEmitter,传进去的observer就是上面所说的LambdaObserver,里面持有我们传进去的Consumer对象。

observer.onSubscribe(parent);调用的是LambdaObserver里面的onSubscribe;

这个方法里面的onSubscribe.accept(this);中的onSubscribe,是我们上面的方法中的第四个参数,是一个空的Consumer,所以不用理他:

我们回过头看subscribeActual方法中的source.subscribe(parent);

source是我们之前传进去的ObservableOnSubscribe对象,source.subscribe便是调用ObservableOnSubscribe的subscribe方法,传进去的是一个发射器。

到此我们总算调用到了外面这个方法了。

(也就是说必须调用subscribe方法,才能发射事件)

我们再看e.onNext(1),在ObservableCreate的内部类CreateEmitter里面:

这里的observer我们上面已经分析过,是构造发射器的时候传进去的LambdaObserver,LambdaObserver里面持有我们传进去的Consumer对象。

 

于是observer.onNext(t)便是调用LambdaObserver的onNext(t)

onNext便是它持有的的Consumer对象,于是onNext.accept(t)便回调到了外面这一层的accept:

到此结束。

RxJava的flatmap

转载请标明出处:http://77blogs.com/?p=183

对 Observable 发射的数据都应用(apply)一个函数,这个函数返回一个 Observable,然后合并这些 Observables,并且发送(emit)合并的结果。

flatMap() 的原理是这样的
1. 使用传入的事件对象创建一个 Observable 对象
2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;
3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

flatMap 和 map 操作符很相像,flatMap 发送的是合并后的 Observables,map 操作符发送的是应用函数后返回的结果集。

createObservable方法如下:

打印结果为:

01-02 16:52:36.969 30827-30827/com.status.rxjavasample D/RxJavaHelper: 1
01-02 16:52:36.969 30827-30827/com.status.rxjavasample D/RxJavaHelper: 2
01-02 16:52:36.969 30827-30827/com.status.rxjavasample D/RxJavaHelper: 3

RxJava的map

转载请标明出处http://77blogs.com/?p=175

map的作用:对Observable发射的数据都应用一个函数,然后再发射最后的结果集。最后map()方法返回一个新的Observable。

举个例子:

 

首先遍历集合中的每一个项,然后给map进行映射更改,每一项都加一,然后再转换为集合,然后再将该集合传入map中,在map中加入新的项4,最后给观察者的回调方法,输出该集合。

打印的log为:

12-23 21:46:36.691 3072-3102/com.status.rxjavasample D/RxJavaHelper: [4]

中间可以有多个map操作符。

.map(new Function<Integer, Integer>(){}中的Function里面的泛型参数,第一个是指输入的类型,第二个是指输出的类型。

再看下面这个例子:

输出的log为:

12-30 21:45:25.713 32565-32611/com.status.rxjavasample D/RxJavaHelper: 1
12-30 21:45:25.713 32565-32611/com.status.rxjavasample D/RxJavaHelper: 2
12-30 21:45:25.713 32565-32611/com.status.rxjavasample D/RxJavaHelper: 3

RxJava的concat操作符

转载请标明出处,维权必究http://77blogs.com/?p=170

使用场景一:

现在要执行两个任务:

1、输出字符串0

2、输出字符串1

我们就可以使用concat来实现多个数据源。

1、输出字符串0的数据源:

2、输出字符串1的数据源:

3、接收多个数据源:

可以看到Log:

12-23 20:23:48.771 23643-23643/com.status.rxjavasample D/RxJavaHelper: 0
12-23 20:23:48.771 23643-23643/com.status.rxjavasample D/RxJavaHelper: 1

两个字符串都输出了,而且是有序的。

使用场景二、

获取数据,如果从本地缓存中获取得到数据,那么便不从网络获取,否则从网络获取。

我们将上面的1,2两个步骤分别当成从本地缓存获取数据和从网络缓存中获取数据,那么我们需要改变上面的3步骤。

唯一不同的是加上.firstElement()。

输出的log为:

12-23 20:29:11.731 24458-24458/com.status.rxjavasample D/RxJavaHelper: 0

firstElement操作符:按照顺序依次遍历被观察者中事件,事件不为空,则停止遍历。

RxJava基本使用

转载请标明出处:http://77blogs.com/?p=162

RxJava究竟是啥,从根本上来讲,它就是一个实现异步操作的库,并且能够使代码非常简洁。它的异步是使用观察者模式来实现的。

关于观察者模式的介绍,可以看我这一篇文章:

https://www.cnblogs.com/tangZH/p/11175120.html 

这里我主要讲RxJava的一些基本用法,基本案例,原理的话暂时不深究:

一、自己构造事件

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

onNext():方法用来发送事件。

下面看看其他两个方法:

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

讲一下我们上面的例子,上面这个例子是采用简洁的链式调用来写的:

首先使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则,然后通过emitter.onNext(i)传递出来,.subscribeOn(Schedulers.io())便是指定该事件产生的所在的线程为子线程,.observeOn(AndroidSchedulers.mainThread())指定观察者执行的线程为主线程。这时候为止返回的对象为Observable对象。

然后该Observable对象subscribe绑定观察者(也就是观察者进行订阅),里面有接收被观察者发出来的事件,有一个成功的方法,和一个失败的方法,这样就实现了由被观察者向观察传递事件。

二、对集合里的数据进行变换

且看,我们需要对某个集合里面的数据一一进行变换,然后发送出来执行其他操作。

上面便是对集合里面的每一项进行加一操作,然后再转换为String类型,然后toList(),组合成集合发送出来,最后在观察者方法中打印出每一项。

三、合并执行

定义两个被观察者,各自产生事件,然后合并在一起,发送给一个观察者。

首先定义我们上面第一个例子的被观察者,用于发送一个数字:

其次再定义我们上面第二个例子的被观察者:

最后将这两个被观察者的事件合并起来发送给一个观察者:

zip方法,顾名思义,有点类似与于打包的意思。

o为被观察者1返回的结果,o2为被观察2返回的结果,将这两个结果一起处理后发送给观察者。打印出来。

现在先介绍这几个,找个时间再整理一些其他的用法以及原理实现。