在上一篇blog中,我们主要讨论了一些关于map和flatMap实现方面的知识。但是flatMap远比map复杂,flatMap涉及了merge算子方面的东西。所以这篇博文将重点分析merge算子的实现原理。merge算子的功能是把多个Observable整合到一起,并创建一个新的Observable。听起来这个功能其实并不是很难理解,merge就像两江交汇一般,把各支流的水汇聚到一起。《Learning Reactive Programming》一书中有非常形象的配图。下图就描述了merge的功能:
从视觉上就能看得出来,merge与flatMap非常相似,而这也是flatMap底层使用merge实现的原因。一个例子让你明白flatMap的作用,代码如下。
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);
Observable<Integer> simple1 = Observable
.just( 1, 2, 3, 4);
Observable<Integer> fm1 = simple1.flatMap(
v -> Observable.just(v, v).subscribeOn(scheduler));
fm1.subscribe(
(v) -> System.out.println(v),
(e) -> {
System.err.println("Error");
System.err.println(e.getMessage());
},
() -> System.out.println("ended!")
);
这段程序的输出序列是:4, 4, 1, 1, 2, 3, 3, 2, ended!当然,这段程序的执行结果是不定的,因为这里的flapMap算子使用了并行执行的方式。先简单解释一下这段程序,前三行是在设置线程池,simple1是一个简单的整数序列值,传递给flatMap是一个这样的函数,它接受一个整数并返回一个包含两个整数拷贝的Observable。simple1中的每个值都将通过这个函数,所以会有4个中间Observable产生。然后RxJava通过merge运算符把多个Observable连接到一起。由于多线程并行运算,所以merge的结果就会显得没有规律。接下来就轮到merge运算符登场了。
merge功能的实现
首先,思考merge实现的逻辑,可以预见merge有缓冲区,因为会存在多线程传递数据的问题。事实上也确实如此,在搜寻代码时,我首先尝试寻找队列之类的数据结构,结果大有收获。且看如下代码逻辑:
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
return source.lift(OperatorMerge.<T>instance(false));
}
在这里lift的功能很简单,基本上就是构建一个“傀儡型”的OnSubscribe。真活还是OperatorMerge来干的,OperatorMerge提供了一个call函数,供傀儡OnSubscribe调用。该call函数的代码如下:
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
MergeProducer<T> producer = new MergeProducer<T>(subscriber);
subscriber.producer = producer;
child.add(subscriber);
child.setProducer(producer);
return subscriber;
}
看到这里,大概也可以估计出merge的设计核心就在于MergeSubscriber和MergeProducer了。起初,我以为主要的逻辑应该在Producer中,然而事实上,重要的逻辑其实是放在MergeSubscriber中。这是因为Producer持有了Subscriber的应用,在下一级请求的时候,Producer只是简单的通知了Subscriber去emit一个值。所以,重要的逻辑还是需要到MergeSubscriber里边看。重要的代码如下:
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
addInner(inner);
t.unsafeSubscribe(inner);
emit();
}
}
void addInner(InnerSubscriber inner) {
getOrCreateComposite().add(inner);
synchronized (innerGuard) {
InnerSubscriber>[] a = innerSubscribers;
int n = a.length;
InnerSubscriber>[] b = new InnerSubscriber>[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
innerSubscribers = b;
}
}
作为一个Subscriber,最重要的逻辑当然是onNext了。onNext中最重要的逻辑就是addInner了,addInner的作用是使用RCU的方式增加merge的源。至于MergeSubscriber的emit函数的作用其实就是把变量从InnerSubscriber搬到下一级的subscriber。哈哈,一切都水落石出了!
至此,merge的流程就分析完毕了。对于下一篇blog,暂时想不到很想写的主题,改天再说了!
发表回复