RxJava 2 | retryWhen 操作符源码分析
基本使用
retryWhen
操作符的作用是在上游发射了一个错误事件时,可以根据条件自己决定是否进行重试,让上游重新发送事件。在开发中也是一个非常有用的操作符,例如它可以用来在网络请求失败时要求其进行重试,重新去进行网络请求。
下面是一个简单的使用案例:
Observable
.create((ObservableOnSubscribe<String>) e -> {
e.onNext("A");
e.onNext("B");
e.onError(new RuntimeException("always on error!"));
e.onNext("C");
e.onComplete();
})
.retryWhen(attempts -> {
System.out.println("retryWhen: apply()");
return attempts.flatMap(throwable -> {
System.out.println("attempts: flatMap apply() -> " + throwable);
return Observable.just("just retry!");
});
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onNext(String s) {
System.out.println("onNext(): " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
});
输出结果如下:
retryWhen: apply()
onSubscribe()
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
...
如果将以上代码中的:
return Observable.just("just retry!");
更改为下面这样:
return Observable.error(new RuntimeException("shutdown!"))
再看输出结果:
retryWhen: apply()
onSubscribe()
onNext(): A
onNext(): B
attempts: flatMap apply()
onError(): java.lang.RuntimeException: shutdown!
为了下面理解方便,将上面的代码改写成下面 👇 这样,代码逻辑一模一样,只是不使用链式调用并且去掉了 retryWhen
的 lambda 表达式,拆分出上游 upStream
和下游 downStream
的概念:
// 上游
Observable<String> upStream = Observable.create(e -> {
e.onNext("A");
e.onNext("B");
e.onError(new RuntimeException("always on error!"));
e.onNext("C");
e.onComplete();
});
// 下游
Observer<String> downStream = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onNext(String s) {
System.out.println("onNext(): " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError(): " + e);
}
@Override
public void onComplete() {
System.out.println("onComplete()");
}
};
// retryWhen
upStream.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {
System.out.println("retryWhen: apply()");
return attempts.flatMap(throwable -> {
System.out.println("attempts: flatMap apply() -> " + throwable);
Thread.sleep(1000);
return Observable.just("just retry");
});
}
}).subscribe(downStream);
以上就是 retryWhen
操作符的基本使用,如果在 retryWhen
中 return
返回的流发射了:
- 一个正常事件,即代表重试,会重新发射上游的事件。
- 一个错误事件,即代表不想重试,会立即调用下游的
onError()
方法,并且会将这个错误事件中的异常信息作为参数传过去。
从上面的输出还可以得出一个结论,retryWhen
的 Function
的 apply()
方法只会被调用一次,并且是在下游 onSubscribe()
方法之前调用,并不是在每次上游出错都会去调用。
在 retryWhen
的 Function
的 apply()
方法中有一个 Observable<Throwable> attempts
参数,这是一个流,是 retryWhen
操作符内部 ( 准确的是 ObservableRetryWhen
内 ) 的一个流,当上游出错时,虽然 retryWhen
的 Function
的 apply()
方法不会被调用,但是会将每次错误信息从 attempts
中发射出去,所以上游每出现错误时,attempts
就会发射一次,所以我们可以根据 attempts
的错误信息来决定是否重试。
以上结论会在下面源码分析中得到论证。
流程分析
首先看 create()
:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
例子中创建上游时使用的是 create()
操作符,此时流 Observable
的实际类型为 ObservableCreate
,然后经过 retryWhen
:
public final Observable<T> retryWhen(
final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
ObjectHelper.requireNonNull(handler, "handler is null");
return RxJavaPlugins.onAssembly(new ObservableRetryWhen<T>(this, handler));
}
经过 retryWhen
操作符后,流的类型变为 ObservableRetryWhen
,然后下游订阅上游调用 subscribe()
时,调用的是 ObservableRetryWhen
的 subscribe()
方法,内部会最终调用其 subscribeActual()
方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 信号源,即上文中提到的 attempts
Subject<Throwable> signaller = PublishSubject.<Throwable>create().toSerialized();
ObservableSource<?> other;
try {
// 所以 other 就是我们在 retryWhen 里面返回的那个流,即 Observable.just("just retry!")
other = ObjectHelper.requireNonNull(handler.apply(signaller), "The handler returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
RepeatWhenObserver<T> parent = new RepeatWhenObserver<T>(observer, signaller, source);
observer.onSubscribe(parent);
other.subscribe(parent.inner);
parent.subscribeNext();
}
signaller
在 ObservableRetryWhen
被订阅时首先会创建一个信号源 signaller
,即上文中提到的 attempts
,用来发射上游出现的错误。
other
other = ObjectHelper.requireNonNull(handler.apply(signaller)
handler
就是 retryWhen
的参数 Function
,可以看到在调用 Function
的 apply()
函数时将 signaller
作为参数 传过去了,就是刚刚外面提到的 attempts
参数,所以 other
就是 Function
的返回值,即使用 retryWhen
操作符返回的那个流:
Observable.just("just retry!")
RepeatWhenObserver
RepeatWhenObserver
在 ObservableRetryWhen
内部将原始的上游 source
( upstream
) 以及原始的下游 observer
( downstream
) 封装起来,通过其变量名 parent
也可以看出其代表的意思为 retryWhen
操作符的上游。
订阅关系
other.subscribe(parent.inner);
parent.inner
即 RepeatWhenObserver
的内部类 InnerRepeatObserver
订阅了我们返回的流。下面会提到,这里只需要知道 parent.inner
订阅了我们返回的 Observable.just("just retry!")
即可。
parent.subscribeNext();
再看 RepeatWhenObserver
的 subscribeNext()
方法:
void subscribeNext() {
if (wip.getAndIncrement() == 0) {
do {
if (isDisposed()) {
return;
}
if (!active) {
active = true;
source.subscribe(this);
}
} while (wip.decrementAndGet() != 0);
}
}
内部会让 RepeatWhenObserver
自己订阅我们的原始上游 source
( upstream
) 。
被观察者 | 观察者 |
---|---|
upstream ( 原始流 ) | RepeatWhenObserver |
other ( retryWhen 中我们返回的流 ) | InnerRepeatObserver |
重试机制
在将重试机制之前,会先分析正常的流程,即上游不会抛出错误不重试的流程,然后再分析错误重试的流程,即上游发射错误事件触发重试的流程。
正常不重试流程
还是按照上面的例子,上游 upstream
首先发射 A
,由订阅关系得知会触发 RepeatWhenObserver
的 onNext()
方法:
@Override
public void onNext(T t) {
HalfSerializer.onNext(downstream, t, this, error);
}
public static <T> void onNext(Observer<? super T> observer, T value,
AtomicInteger wip, AtomicThrowable error) {
if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
observer.onNext(value);
...
}
}
此处的 observer
就是真正的下游 downstream
。可以看到内部会调用 downstream
的 onNext()
方法,完成转发操作。
错误重试流程
按照上面的例子,上游 upstream
模拟出错会发射 RuntimeException("always on error!")
事件,由订阅关系得知会触发 RepeatWhenObserver
的 onError()
方法:
@Override
public void onError(Throwable e) {
DisposableHelper.replace(upstream, null);
active = false;
signaller.onNext(e);
}
很明显,在这里拿到上游的异常信息后,将异常信息通过 signaller
流发射了出去。signaller
即 retryWhen
参数 Function
中 apply()
方法的参数 attempts
。
这里先不必探究谁会订阅 signaller
流,我们先看 other
参数,就是我们在 retryWhen
中返回的流,它是被 InnerRepeatObserver
所订阅着的,如果我们返回的流 other
发射了一个正常时间,来看看会怎样,它会触发 InnerRepeatObserver
的 onNext()
方法:
final class InnerRepeatObserver extends AtomicReference<Disposable> implements Observer<Object> {
private static final long serialVersionUID = 3254781284376480842L;
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onNext(Object t) {
innerNext();
}
@Override
public void onError(Throwable e) {
innerError(e);
}
@Override
public void onComplete() {
innerComplete();
}
}
InnerRepeatObserver
是 RepeatWhenObserver
的一个内部类,所以它的 onNext()
中调用的 innerNext()
方法为 RepeatWhenObserver
的:
void innerNext() {
subscribeNext();
}
这样就清楚了,里面又调用 RepeatWhenObserver
的 subscribeNext()
方法,上面已经分析过了,该方法会让 RepeatWhenObserver
自己订阅我们的原始上游 source
( upstream
) 。
所以我们只要让 other
即我们返回的流发射一次 onNext()
事件即可完成重新订阅达到重试,那怎么触发 other
发射事件呢?答案就是上面的 signaller
。上游出错时,会导致 signaller
将错误信息当成正常时间发射出去,所以我们只要让 signaller
触发 other
即可,可以理解为只要将 other
和 signaller
相连接即可:
upStream.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {
System.out.println("retryWhen: apply()");
return attempts.flatMap(throwable -> {
System.out.println("attempts: flatMap apply() -> " + throwable);
Thread.sleep(1000);
return Observable.just("just retry");
});
}
}).subscribe(downStream);
在 attempts.flatMap
中,我们还可以拿到 attempts
中的错误信息来判断是否重试,如果重试只要发射一个正常事件即可,向上面的例子一样,那不想重试了呢?只要发射一个 onError()
错误事件即可。由上面 InnerRepeatObserver
源码可以看到如果发射了一个错误事件,会调用 RepeatWhenObserver
的 innerError()
方法:
void innerError(Throwable ex) {
DisposableHelper.dispose(upstream);
HalfSerializer.onError(downstream, ex, this, error);
}
public static void onError(Observer<?> observer, Throwable ex,
AtomicInteger wip, AtomicThrowable error) {
if (error.addThrowable(ex)) {
if (wip.getAndIncrement() == 0) {
observer.onError(error.terminate());
}
} else {
RxJavaPlugins.onError(ex);
}
}
可以看到内部会调用 downstream
的 onError()
方法,完成转发操作。
至此已经分析完了,下面附上流程图方便理解:
RWO :
RepeatWhenObserver
IRO :
InnerRepeatObserver