RxJava即Reactive Java,是响应式编程范式在Java上的一种实现。响应式编程范式的目标是,提高程序的容错性、降低软件模块的耦合性、提高程序相应速度。到目前为止,几乎所有主流语言都有相应的reactive库。其中,RxJava在android的开发中,应用的非常广泛。我使用RxJava也有一两个月了,期间断断续续的学习了RxJava。坦诚的说,刚开始的时候确实有点晕。首先,这种相应式的编程范式确实比较新颖,其次,我很好奇RxJava背后的实现原理,但一时间又想不通。既然想不通,我就打算看源码来分析,所以就有了这篇blog。
-
RxJava基本用法的实现
这里所说的基本用法是指,只使用Observable、Observer,而不使用其他高阶运算符的用法。毕竟研究问题,还是由浅入深相对容易理解。首先研究下面这个例子:
Observable<Integer> simple = Observable.just( 2, 3, 5, 8);
simple.subscribe(
(v) -> System.out.println(v),
(e) -> {
System.err.println("Error");
System.err.println(e.getMessage());
},
() -> System.out.println("ended!")
);
这个例子非常简单,就是把几个数字打印出去,最后输出“ended!”。但是这个例子又刚好包含了RxJava运行的主要原理,麻雀虽小五脏俱全哟!这里插一句话,这篇blog定位于研究RxJava原理,对入门阶段的同学可能并不合适。首先,研究Observable.just(),它的调用栈大致如下:
//Observable.java
public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
return from((T[])new Object[] { t1, t2, t3, t4 });
}
public static <T> Observable<T> from(T[] array) {
/* */
return create(new OnSubscribeFromArray<T>(array));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
其中RxJavaHooks是一个用于管理组件生命周期的辅助类,这里不做研究。我们可以看出,Observable.just()首先把几个整数组成了一个数组,然后使用它们构造了一个OnSubscribeFromArray类,最后,新建了一个Observable实例并把前面的数组类传递给该实例。那么这个OnSubscribeFromArray类又有什么玄机呢?继续查看它的源码。
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
/* */
}
这构造函数非常简单,就是把数组保存起来。值得注意的是这个call方法,这个方法定义于OnSubscribe接口。该方法接收一个Subscriber对象,然后通过调用setProducer方法给该对象传递一个FromArrayProducer实例。这个类的代码,我们等一会再来看,先卖个关子。
通过Observable.just(),程序创建了一个Observable实例simple。然后simple.subscribe()方法为simple添加了一个观察者,其调用栈如下。
//Observable.java
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted) {
/* */
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//
try {
/* */
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
/* */
return Subscriptions.unsubscribed();
}
}
这个Subscriber其实是用三个接口函数组成的。上述代码片段中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)在功能上等价于observable.onSubscribe.call(subscriber)。而observable的onSubscribe字段指向的就是OnSubscribeFromArray。所以,这句程序实际上就是调用了OnSubscribeFromArray.call。call函数实际调用了Subscriber.setproducer()。
//Subscriber.java
public void setProducer(Producer p) {
/* */
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
这里重点就是producer.request()。而此处的Producer是一个FromArrayProducer,它的request方法最终会调用下面或者类似逻辑的代码:
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
看到了child的各个接口在这里被依次调用,一切真相都水落石出了。这边blog也基本写到尾声了,最后以一个图结束了吧!代码和插图更配哟! (
)
下一篇blog,准备解析一些高阶函数的实现原理。
发表回复