RxJava2 线程调度的方法

网友投稿 261 2023-01-11


RxJava2 线程调度的方法

subscribeOn和observeOn负责线程切换,同时某些操作符也默认指定了线程.

我们这里不分析在线程中怎么执行的.只看如何切换到某个指定线程.

subscribeOn

Observable.subscribeOn()在方法内部生成了一个ObservableSubscribeOn对象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

@Override

public void subscribeActual(final Observer super T> observer) {

final SubscribeOnObserver parent = new SubscribeOnObserver(observer);

//调用下游的Observer的onSubscribe方法

observer.onSubscribe(parent);

//通过SubscribeTask执行了上游Observable的subscribeActual方法

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

scheduler.scheduleDirect(Runnable)用于执行SubscribeTask这个任务.SubscribeTask本身是Runnable的实现类.看一下其run方法.

@Overrhttp://ide

public void run() {

//上游的Observable.subscribe方法被切换到了新的线程

source.subscribe(parent);

}

首先可以得出结论:subscribeOn将上游的Observable的subscribe方法切换到了新的线程.

如果多次调用subscribeOn切换线程,会有什么效果?

由下往上,每次调用subscribeOn,都会导致上游的Observable的subscribeActual切换到指定的线程.那么最后一次调用的切换最上游的创建型操作符的subscribeActual的执行线程.如果操作符有默认执行线程怎么办?

操作符默认线程

如果是创建型操作符,处于最上游,那么subscribeOn的线程切换对它不起作用.天高皇帝远,县官不如现管.就是这个道理.

如果是其它操作符,会是怎样的?

以操作符timeout为例:它对应ObservableTimeoutTimed和TimeoutObserver

@Override

public void onNext(T t) {

downstream.onNext(t);

//超时计时

startTimeout(idx + 1);

}

void startTimeout(long nextIndex) {

//交给操作符默认的线程执行

task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));

}

@Override

public void onError(Throwable t) {

downstream.onError(t);

}

@Override

public void onComplete() {

downstream.onComplete();

}

}

@Override

public void onTimeout(long idx) {

downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));

}

//TimeoutTask.java

static final class TimeoutTask implements Runnable {

@Override

public void run() {

parent.onTimeout(idx);

}

}

可以看到操作符默认的执行线程只用来做超时计时任务,如果超时了,会在操作符的默认线程执行onError方法..操作符默认线程对下游的observer造成什么影响要做具体对待.

observeOn

observeOn对应ObservableObserveOn和ObserveOnObserver.

//ObservableObserveOn.java

@Override

protected void subscribeActual(Observer super T> observer) {

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

//ObserveOnObserver.java

@Override

public void onSubscribe(Disposable d) {

if (DisposableHelper.validate(this.upstream, d)) {

if (d instanceof QueueDisposable) {

if (m == QueueDisposable.SYNC) {

//执行下游Observer的onSubscribe方法

downstream.onSubscribe(this);

schedule();

return;

}

if (m == QueueDisposable.ASYNC) {

//执行下游Observer的onSubscribe方法

downstream.onSubscribe(this);

return;

}

}

//执行下游Observer的onSubscribe方法

downstream.onSubscribe(this);

}

}

@Override

public void onNext(T t) {

//省略

schedule();

}

@Override

public void onError(Throwable t) {

//省略

schedule();

}

void schedule() {

if (getAndIncrement() == 0) {

/*

ObserveOnObserver是Runnable的实现类.交给线程池执行

*/

worker.schedule(this);

}

}

void drainNormal() {

final Observer super T> a = downstream;

for (;;) {

for (;;) {

T v;

try {

v = q.poll();

} catch (Throwable ex) {

a.onError(ex);

return;

}

//执行下游Observer的onNext方法

a.onNext(v);

}

}

}

void drainFused() {

for (;;) {

if (!delayError && d && ex != null) {

//执行下游Observer的onError方法

downstream.onError(error);

return;

}

downstream.onNext(null);

if (d) {

ex = error;

if (ex != null) {

//执行下游Observer的onError方法

downstream.onError(ex);

} else {

//执行下游Observer的onComplete方法

downstream.onComplete();

}

return;

}

}

}

//执行线程任务

@Override

public void run() {

if (outputFused) {

drainFused();

} else {

drainNormal();

}

}

从上面可以看出ObservableObserveOn在其subscribeActual方法中并没有切换上游Observable的subscribe方法的执行线程.但是ObserveOnObserver在其onNext,onError和onComplete中通过schedule()方法将下游Observer的各个方法切换到了新的线程.

得出结论: observeOn负责切换的是下游Observer的各个方法的执行线程

如果下游多次通过observeOn切换线程,会有什么效果?

每次切换都会对其下游造成影响,直到遇到下一个observeOn为止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete与上游最近的observeOn所切换的线程保持一致.onSubscribe则不同.

遇到线程切换的时候,会首先在对应的Observable的subscribeActual方法内,先调用observer.onSubscribe方法.而observer.onSubscribe会逐级向上传递直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法内调用,这是在主线程执行的.所以onSubscribe方法无论如何都是在主线程执行.

doOnSubscribe

.doOnSubscribe(new Consumer

@Override

public void accept(Disposable disposable) throws Exception {

}

})

我们要看的是方法accept的执行线程.

通过源码找到对应的DisposableLambdaObserver.

@Override

public void onSubscribe(Disposable d) {

//在这里调用了accept方法.

onSubscribe.accept(d);

}

这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.

在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.

doFinally

对应ObservableDoFinally和DoFinallyObserver

//DoFinallyObserver.java

@Override

public void onError(Throwable t) {

runFinally();

}

@Override

public void onComplete() {

runFinally();

}

@Override

public void dispose() {

runFinally();

}

void runFinally() {

onFinally.run();

}

可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.

doOnError

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java

@Override

public void onError(Throwable t) {

onError.accept(t);

}

和自身对应的observer.onError所在线程保持一致.

doOnNext

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java

@Override

public void onNext(T t) {

onNext.accept(t);

}

和自身对应的observer.onNext所在线程保持一致.

操作符对应方法参数的执行线程

包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.

总结:

subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.

@Override

public void accept(Disposable disposable) throws Exception {

}

})

我们要看的是方法accept的执行线程.

通过源码找到对应的DisposableLambdaObserver.

@Override

public void onSubscribe(Disposable d) {

//在这里调用了accept方法.

onSubscribe.accept(d);

}

这就要看上游在哪个线程执行了Observer.onSubscribe(disposable)方法.

在创建型操作符的subscribeActual方法和subscribeOn对应的Observable的subscribeActual方法内调用了Observer.onSubscribe(disposable)方法.那么这两处的执行线程就决定了onSubscribe.accept(d);的执行线程.

doFinally

对应ObservableDoFinally和DoFinallyObserver

//DoFinallyObserver.java

@Override

public void onError(Throwable t) {

runFinally();

}

@Override

public void onComplete() {

runFinally();

}

@Override

public void dispose() {

runFinally();

}

void runFinally() {

onFinally.run();

}

可以看到与它所对应的DoFinallyObserver的onError,onComplete,dispose方法的执行线程有关,这三个方法的执行线程又受到上游的observeOn的影响.如果没有observeOn,则会受到最上游的observable.subscribeActual方法影响.

doOnError

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java

@Override

public void onError(Throwable t) {

onError.accept(t);

}

和自身对应的observer.onError所在线程保持一致.

doOnNext

对应ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java

@Override

public void onNext(T t) {

onNext.accept(t);

}

和自身对应的observer.onNext所在线程保持一致.

操作符对应方法参数的执行线程

包io.reactivex.functions下的接口类一般用于处理上游数据然后往下传递.这些接口类的方法一般在对应的observer.onNext中调用.所以他们的线程保持一致.

总结:

subscribeOn由下往上逐级切换Observable.subscribe的执行线程,不受observeOn影响,也不受具有默认指定线程的非创建型操作符影响,但是会被更上游的subscribeOn夺取线程切换的权利,直到最上游.如果最上游的创建型操作符也有默认执行线程,那么任何一个subscribeOn的线程切换不起作用.subscribeOn由下向上到达最上游后,然后由上往下影响下游的observer的执行线程.遇到observeOn会被夺取线程切换的权利.observeOn影响的是下游的observer的执行线程,由上往下,遇到另一个observeOn会移交线程控制权力,遇到指定默认线程非创建型的操作符,要视具体情况对待.


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:注解方式实现接口限流(接口限流怎么实现)
下一篇:如何对接口测试用例(接口测试的用例)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~