RxJava 基础订阅流程解析

首先看看一个基础使用的例子:

//1. 创建 Observable Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { // 发送消息 e.onNext("哈哈哈哈"); } }); //2. 创建 Observer Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { // 显示消息 Toast.makeText(BaseUseActivity.this, s, Toast.LENGTH_SHORT).show(); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; //3. 建立订阅关系 observable.subscribe(observer);

问题:Observable 是怎么跟 Observer 建立关系并向其发送消息?

首先,我们来看 Observable 的 craete() 干了什么事情。

@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }

可以看到,create() 创建了一个 ObservableCreate 对象,并传入了我们传给 create() 的 ObservableOnSubscribe 对象。

Observable 是一个抽象类,而 ObservableCreate 是它的实现类。下面我们来看 Observable 的 subscribe(observer) 方法,这里我们传入了一个 Observer 接口的具体对象。

@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // cant call onError because no way to know if a Disposable has been set or not // cant call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but cant throw other exceptions due to RS"); npe.initCause(e); throw npe; } }

看以上代码,其实带有 RxJavaPlugins 字样的方法都是为了 Hook 而准备的,我们可以忽略。所以我们来看看 subscribeActual(observer) 方法。

protected abstract void subscribeActual(Observer<? super T> observer);

可以看到,subscribeActual() 是 Observable 类的一个抽象方法,那么其具体实现也就是在其子类 ObservableCreate 这个类里面。

@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }

也就是这段代码里面,真正实现了Observable 和 Observer 类的关联。

首先,创建了 CreateEmitter 类并传入了 Observer 对象。CreateEmitter 可以理解为是一个消息发射器,是 Observer 的一个内部类,代码如下。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } @Override public void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public ObservableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } }

创建了 CreateEmitter 后,接着就调用了 Observer 的 onSubscribe() 方法,将CreateEmitter 传入。这个方法接收的参数是一个 Disposable 接口,从以上代码可以看车,CreateEmitter 实现了 Disposable 接口,用于中断事件的传递和判断事件是否被中断。

最后,调用了 source.subscribe(CreateEmitter) 方法,并CreateEmitter 对象传入。source 也就是 ObservableOnSubscribe 接口对象。

现在我们再来看基础使用的例子,我们在 Observable.create() 里面传入了 ObservableOnSubscribe 接口,经过源码的处理,ObservableOnSubscribe 的 subscribe 的参数 ObservableEmitter 也就是源码创建的 CreateEmitter 对象,我们调用这个对象的 onNext 方法来发送消息。而 CreateEmitter 持有 Observer 的实例,CreateEmitter 又在它的 onNext 方法中调用 Observer 的 onNext 方法,实现了事件的传递。