RxJava的消息发送和线程切换实现原理

网友投稿 292 2023-01-19


RxJava的消息发送和线程切换实现原理

Rxjava是一个在Java虚拟机上的响应式扩展,通过使用可观察的序列将异步和基于事件的程序组合起来的一个库。

它扩展了观察者模式来支持数据/事件序列,并且添加了操作符,这些操作符允许你声明性地组合序列,同时抽象出要关注的问题:比如低级线程、同步、线程安全和并发数据结构等。

RxJava相信大家都非常了解吧,今天分享一下RxJava的消息发送和线程源码的分析。最后并分享一个相关demo,让大家更加熟悉我们天天都在用的框架。

消息订阅发送

首先让我们看看消息订阅发送最基本的代码组成:

Observable observable = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext("Jack1");

emitter.onNext("Jack2");

emitter.onNext("Jack3");

emitter.onComplete();

}

});

Observer observer = new Observer() {

@Override

public void onSubscribe(Disposable d) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(String s) {

Log.d(TAG, "onNext : " + s);

}

@Override

public void onError(Throwable e) {

Log.d(TAG, "onError : " + e.toString());

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

};

observable.subscribe(observer);

代码很简单,observable为被观察者,observer为观察者,然后通过observable.subscribe(observer),把观察者和被观察者关联起来。被观察者发送消息(emitter.onNext("内容")),观察者就可以在onNext()方法里回调出来。

我们先来看Observable,创建是用Observable.create()方法进行创建,源码如下:

public static Observable create(ObservableOnSubscribe source) {

ObjectHelper.requireNonNull(source, "source is null");

return RxJavaPlugins.onAssembly(new ObservableCreate(source));

}

public static T requireNonNull(T object, String message) {

if (object == null) {

throw new NullPointerException(message);

}

return object;

}

public static Observable onAssembly(@NonNull Observable source) {

Function super Observable, ? extends Observable> f = onObservableAssembly;

if (f != null) {

return apply(f, source);

}

return source;

}

可以看出,create()方法里最主要的还是创建用ObservableOnSubscribe传入创建了一个ObservableCreate对象并且保存而已。

public final class ObservableCreate extends Observable {

final ObservableOnSubscribe source;

public ObservableCreate(ObservableOnSubscribe source) {

this.source = source;

}

}

接着是创建Observer,这比较简单只是单纯创建一个接口对象而已

public interface Observer {

void onSubscribe(@NonNull Disposable d);

void onNext(@NonNull T t);

void onError(@NonNull Throwable e);

void onComplete();

}

订阅发送消息

observable.subscribe(observer)的subscribe方法如下:

public final void subscribe(Observer super T> observer) {

ObjectHelper.requireNonNull(observer, "observer is null");

try {

observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);

} catch (NullPointerException e) { // NOPMD

throw e;

} catch (Throwable e) {

Exceptions.throwIfFatal(e);

RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");

npe.initCause(e);

throw npe;

}

}

//ObjectHelper.requireNonNull()方法

public static T requireNonNull(T object, String message) {

if (object == http://null) {

throw new NullPointerException(message);

}

return object;

}

//RxJavaPlugins.onSubscribe()方法

public static Observer super T> onSubscribe(@NonNull Observable source, @NonNull Observer super T> observer) {

BiFunction super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;

if (f != null) {

return apply(f, source, observer);

}

return observer;

}

从上面源码可以看出requireNonNull()只是做非空判断而已,而RxJavaPlugins.onSubscribe()也只是返回最终的观察者而已。所以关键代码是抽象方法subscribeActual(observer);那么subscribeActual对应哪个代码段呢?

还记得Observable.create()创建的ObservableCreate类吗,这就是subscribeActual()具体实现类,源码如下:

protected void subscribeActual(Observer super T> observer) {

CreateEmitter parent = new CreateEmitter(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

parent.onError(ex);

}

}

从上面的代码可以看出,首先创建了一个CreateEmitter对象并传入observer,然后回到observer的onSubscribe()方法,而source就是我们之前创建ObservableCreate传入的ObservableOnSubscribe对象。

class CreateEmitter extends AtomicReference

implements ObservableEmitter, Disposable {

}

而CreateEmitter又继承ObservableEmitter接口,又回调ObservableOnSubscribe的subscribe方法,对应着我们的:

Observable observable = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext("Jack1");

emitter.onNext("Jack2");

emitter.onNext("Jack3");

emitter.onComplete();

}

});

当它发送消息既调用emitter.onNext()方法时,既调用了CreateEmitter的onNext()方法:

public void onNext(T t) {

if (t == null) {

onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));

return;

}

if (!isDisposed()) {

observer.onNext(t);

}

}

可以看到最终又回调了观察者的onNext()方法,把被观察者的数据传输给了观察者。有人会问

isDisposed()是什么意思,是判断要不要终止传递的,我们看emitter.onComplete()源码:

public void onComplete() {

if (!isDisposed()) {

try {

observer.onComplete();

} finally {

dispose();

}

}

}

public static boolean dispose(AtomicReference field) {

Disposable current = field.get();

Disposable d = DISPOSED;

if (current != d) {

current = field.getAndSet(d);

if (current != d) {

if (current != null) {

current.dispose();

}

return true;

}

}

return false;

}

public static boolean isDisposed(Disposable d) {

return d == DISPOSED;

}

dispose()方法是终止消息传递,也就付了个DISPOSED常量,而isDisposed()方法就是判断这个常量而已。这就是整个消息订阅发送的过程,用的是观察者模式。

线程切换

在上面模板代码的基础上,线程切换只是改变了如下代码:

observable.subscribeOn(Schedulers.io())

.observeOn(androidSchedulers.mainThread())

.subscribe(observer);

下面我们对线程切换的源码进行一下分析,分为两部分:subscribeOn()和observeOn()

subscribeOn()

首先是subscribeOn()源码如下:

public final Observable subscribeOn(Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

我们传进去了一个Scheduler类,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。

Scheduler有如下类型:

类型

使用方式

含义

使用场景

IoScheduler

Schedulers.io()

io操作线程

读写SD卡文件,查询数据库,访问网络等IO密集型操作

NewThreadScheduler

Schedulers.newThread()

创建新线程

耗时操作等

SingleScheduler

Schedulers.single()

单例线程

只需一个单例线程时

ComputationScheduler

Schedulers.computation()

CPU计算操作线程

图片压缩取样、xml,json解析等CPU密集型计算

TrampolineScheduler

Schedulers.trampoline()

当前线程

需要在当前线程立即执行任务时

HandlerScheduler

AndroidSchedulers.mainThread()

Android主线程

更新UI等

接着就没什么了,只是返回一个ObservableSubscribeOn对象而已。

observeOn()

首先看源码如下:

public final Observable observeOn(Scheduler scheduler) {

return observeOn(scheduler, false, bufferSize());

}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));

}

这里也是没什么,只是最终返回一个ObservableObserveOn对象而已。

接着还是像原来那样调用subscribe()方法进行订阅,看起来好像整体变化不大,就是封装了一些对象而已,不过着恰恰是RxJava源码的精华,当他再次调用subscribeActual()方法时,已经不是之前的ObservableCreate()里subscribeActual方法了,而是最先调用ObservableObserveOn的subscribeActual()方法,对应源码如下:

protected void subscribeActual(Observer super T> observer) {

if (scheduler instanceof TrampolineScheduler) {

source.subscribe(observer);

} else {

Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));

}

}

在这里有两点要讲,一点是ObserveOnObserver是执行观察者的线程,后面还会详解,然后就是source.subscribe,这个source.subscribe调的是ObservableSubscribeOn的subscribe方法,而subscribe方法因为继承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一样的方法,所以会调用ObservableSubscribeOn里的subscribeActual()方法,对应的代码如下:

public void subscribeActual(final Observer super T> s) {

final SubscribeOnObserver parent = new SubscribeOnObserver(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

上面代码中,首先把ObserveOnObserver返回给来的用SubscribeOnObserver“包装”起来,然后在回调Observer的onSubscribe(),就是对应模板代码的onSubscribe()方法。

接着看SubscribeTask类的源码:

final class SubscribeTask implements Runnable {

private final SubscribeOnObserver parent;

SubscribeTask(SubscribeOnObserver parent) {

this.parent = parent;

}

@Override

public void run() {

source.subscribe(parent);

}

}

其中的source.subscribe(parent),就是我们执行子线程的回调方法,对应我们模板代码里的被观察者的subscribe()方法。它放在run()方法里,并且继承Runnable,说明这个类主要是线程运行。接着看scheduler.scheduleDirect()方法对应的源码如下:

public Disposable scheduleDirect(@NonNull Runnable run) {

return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);

}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;

}

在这里,createWorker()也是一个抽象方法,调用的是我们的调度类对应的Schedulers类里面的方法,这里是IoScheduler类,

public final class IoScheduler extends Scheduler{

final AtomicReference pool;

//省略....

public Worker createWorker() {

return new EventLoopWorker(pool.get());

}

static final class EventLoopWorker extends Scheduler.Worker {

private final CompositeDisposable tasks;

private final CachedWorkerPool pool;

private final ThreadWorker threadWorker;

final AtomicBoolean once = new AtomicBoolean();

EventLoopWorker(CachedWorkerPool pool) {

this.pool = pool;

this.tasks = new CompositeDisposable();

this.threadWorker = pool.get();

}

//省略....

@NonNull

@Override

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

if (tasks.isDisposed()) {

// don't schedule, we are unsubscribed

return EmptyDisposable.INSTANCE;

}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);

}

}

}

static final class CachedWorkerPool implements Runnable {

//省略....

ThreadWorker get() {

if (allWorkers.isDisposed()) {

return SHUTDOWN_THREAD_WORKER;

}

while (!expiringWorkerQueue.isEmpty()) {

ThreadWorker threadWorker = expiringWorkerQueue.poll();

if (threadWorker != null) {

return threadWorker;

}

}

ThreadWorker w = new ThreadWorker(threadFactory);

allWorkers.add(w);

return w;

}

//省略....

}

这就是IoScheduler的createWorker()的方法,其实最主要的意思就是获取线程池,以便于生成子线程,让SubscribeTask()可以运行。然后直接调用 w.schedule(task, delay, unit)方法让它在线程池里执行。上面中那ThreadWorker的源码如下:

static final class ThreadWorker extends NewThreadWorker {

private long expirationTime;

ThreadWorker(ThreadFactory threadFactory) {

super(threadFactory);

this.expirationTime = 0L;

}

//省略代码....

}

public class NewThreadWorker extends Scheduler.Worker implements Disposable {

private final ScheduledExecutorService executor;

public NewThreadWorker(ThreadFactory threadFactory) {

executor = SchedulerPoolFactory.create(threadFactory);

}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {

if (!parent.add(sr)) {

return sr;

}

}

Future> f;

try {

if (delayTime <= 0) {

http:// f = executor.submit((Callable)sr);

} else {

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

} catch (RejectedExecutionException ex) {

if (parent != null) {

parent.remove(sr);

}

RxJavaPlugins.onError(ex);

}

return sr;

}

}

可以看到,这就调了原始的javaAPI来进行线程池操作。

然后最后一环在子线程调用source.subscribe(parent)方法,然后回调刚开始创建的ObservableCreate的subscribeActual(),既:

protected void subscribeActual(Observer super T> observer) {

CreateEmitter parent = new CreateEmitter(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

parent.onError(ex);

}

}

进行消息的订阅绑定。

当我们在调用 emitter.onNext(内容)时,是在io线程里的,那回调的onNext()又是什么时候切换的?那就是前面为了整个流程流畅性没讲的在observeOn()里的ObserveOnObserver是执行观察者的线程的过程。

class ObserveOnObserver extends BasicIntQueueDisposable

implements Observer, Runnable {

//省略代码....

ObserveOnObserver(Observer super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {

this.actual = actual;

this.worker = worker;

this.delayError = delayError;

this.bufferSize = bufferSize;

}

@Override

public void onSubscribe(Disposable s) {

if (DisposableHelper.validate(this.s, s)) {

this.s = s;

if (s instanceof QueueDisposable) {

@SuppressWarnings("unchecked")

QueueDisposable qd = (QueueDisposable) s;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {

sourceMode = m;

queue = qd;

done = true;

actual.onSubscribe(this);

schedule();

return;

}

if (m == QueueDisposable.ASYNC) {

sourceMode = m;

queue = qd;

actual.onSubscribe(this);

return;

}

}

queue = new SpscLinkedArrayQueue(bufferSize);

actual.onSubscribe(this);

}

}

@Override

public void onNext(T t) {

if (done) {

return;

}

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

schedule();

}

void schedule() {

if (getAndIncrement() == 0) {

worker.schedule(this);

}

}

//省略代码....

}

当调用emitter.onNext(内容)方法,会调用上面的onNext()方法,然后在这个方法里会把数据压入一个队列,然后执行worker.schedule(this)方法,work是什么呢,还记得AndroidSchedulers.mainThread()吗,这个对应这个HandlerScheduler这个类,所以createWorker()对应着:

private static final class MainHolder {

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

public Worker createWorker() {

return new HandlerWorker(handler);

}

private static final class HandlerWorker extends Worker {

private final Handler handler;

private volatile boolean disposed;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

if (run == null) throw new NullPointerException("run == null");

if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {

return Disposables.disposed();

}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);

message.obj = this; // Used as token for batch disposal of this worker's runnables.

handler.sendMessageDelayed(message, unit.toMillis(delay));

if (disposed) {

handler.removeCallbacks(scheduled);

return Disposables.disposed();

}

return scheduled;

}

}

在next()方法里,运用android自带的Handler消息机制,通过把方法包裹在Message里,同通过handler.sendMessageDelayed()发送消息,就会在ui线程里回调Next()方法,从而实现从子线程切换到android主线程的操作。我们在主线程拿到数据就可以进行各种在主线程的操作了。

总结一下:

ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn为初始化顺序

当调用observable.subscribe(observer)时的执行顺序

ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate

当发送消息的执行顺序

ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn

以上就是消息订阅和线程切换的源码的所有讲解了。

为了让你们理解更清楚,我仿照RxJava写了大概的消息订阅和线程切换的最基本代码和基本功能,以帮助你们理解

https://github.com/jack921/RxJava2Demo


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:开源的研发管理平台(开源的研发管理平台有哪些)
下一篇:开源 研发管理平台(开源官方网站)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~