基本使用

retryWhen 操作符的作用是在上游发射了一个错误事件时,可以根据条件自己决定是否进行重试,让上游重新发送事件。在开发中也是一个非常有用的操作符,例如它可以用来在网络请求失败时要求其进行重试,重新去进行网络请求。

下面是一个简单的使用案例:

Observable
        .create((ObservableOnSubscribe<String>) e -> {
            e.onNext("A");
            e.onNext("B");
            e.onError(new RuntimeException("always on error!"));
            e.onNext("C");
            e.onComplete();
        })
        .retryWhen(attempts -> {
            System.out.println("retryWhen: apply()");
            return attempts.flatMap(throwable -> {
                System.out.println("attempts: flatMap apply() -> " + throwable);
                return Observable.just("just retry!");
            });
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe()");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext(): " + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError(): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete()");
            }
        });

输出结果如下:

retryWhen: apply()
onSubscribe()
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
onNext(): A
onNext(): B
attempts: flatMap apply() -> java.lang.RuntimeException: always on error!
...

如果将以上代码中的:

return Observable.just("just retry!");

更改为下面这样:

return Observable.error(new RuntimeException("shutdown!"))

再看输出结果:

retryWhen: apply()
onSubscribe()
onNext(): A
onNext(): B
attempts: flatMap apply()
onError(): java.lang.RuntimeException: shutdown!

为了下面理解方便,将上面的代码改写成下面 👇 这样,代码逻辑一模一样,只是不使用链式调用并且去掉了 retryWhen 的 lambda 表达式,拆分出上游 upStream 和下游 downStream 的概念:

// 上游
Observable<String> upStream = Observable.create(e -> {
    e.onNext("A");
    e.onNext("B");
    e.onError(new RuntimeException("always on error!"));
    e.onNext("C");
    e.onComplete();
});

// 下游
Observer<String> downStream = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("onSubscribe()");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext(): " + s);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError(): " + e);
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete()");
    }
};

// retryWhen
upStream.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {
        System.out.println("retryWhen: apply()");
        return attempts.flatMap(throwable -> {
            System.out.println("attempts: flatMap apply() -> " + throwable);
            Thread.sleep(1000);
            return Observable.just("just retry");
        });
    }
}).subscribe(downStream);

以上就是 retryWhen 操作符的基本使用,如果在 retryWhenreturn 返回的流发射了:

  • 一个正常事件,即代表重试,会重新发射上游的事件。
  • 一个错误事件,即代表不想重试,会立即调用下游的 onError() 方法,并且会将这个错误事件中的异常信息作为参数传过去。

从上面的输出还可以得出一个结论,retryWhenFunctionapply() 方法只会被调用一次,并且是在下游 onSubscribe() 方法之前调用,并不是在每次上游出错都会去调用

retryWhenFunctionapply() 方法中有一个 Observable<Throwable> attempts 参数,这是一个流,是 retryWhen 操作符内部 ( 准确的是 ObservableRetryWhen 内 ) 的一个流,当上游出错时,虽然 retryWhenFunctionapply() 方法不会被调用,但是会将每次错误信息从 attempts 中发射出去,所以上游每出现错误时,attempts 就会发射一次,所以我们可以根据 attempts 的错误信息来决定是否重试。

以上结论会在下面源码分析中得到论证。

流程分析

首先看 create()

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

例子中创建上游时使用的是 create() 操作符,此时流 Observable 的实际类型为 ObservableCreate ,然后经过 retryWhen

public final Observable<T> retryWhen(
    final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler) {
    ObjectHelper.requireNonNull(handler, "handler is null");
    return RxJavaPlugins.onAssembly(new ObservableRetryWhen<T>(this, handler));
}

经过 retryWhen 操作符后,流的类型变为 ObservableRetryWhen ,然后下游订阅上游调用 subscribe() 时,调用的是 ObservableRetryWhensubscribe() 方法,内部会最终调用其 subscribeActual() 方法:

@Override
protected void subscribeActual(Observer<? super T> observer) {
  	// 信号源,即上文中提到的 attempts
    Subject<Throwable> signaller = PublishSubject.<Throwable>create().toSerialized();
		
    ObservableSource<?> other;

    try {
      	// 所以 other 就是我们在 retryWhen 里面返回的那个流,即 Observable.just("just retry!")
        other = ObjectHelper.requireNonNull(handler.apply(signaller), "The handler returned a null ObservableSource");
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        EmptyDisposable.error(ex, observer);
        return;
    }

    RepeatWhenObserver<T> parent = new RepeatWhenObserver<T>(observer, signaller, source);
    observer.onSubscribe(parent);

    other.subscribe(parent.inner);

    parent.subscribeNext();
}

signaller

ObservableRetryWhen 被订阅时首先会创建一个信号源 signaller,即上文中提到的 attempts ,用来发射上游出现的错误。

other

other = ObjectHelper.requireNonNull(handler.apply(signaller)

handler 就是 retryWhen 的参数 Function ,可以看到在调用 Functionapply() 函数时将 signaller 作为参数 传过去了,就是刚刚外面提到的 attempts 参数,所以 other 就是 Function 的返回值,即使用 retryWhen 操作符返回的那个流:

Observable.just("just retry!")

RepeatWhenObserver

RepeatWhenObserverObservableRetryWhen 内部将原始的上游 source ( upstream ) 以及原始的下游 observer ( downstream ) 封装起来,通过其变量名 parent 也可以看出其代表的意思为 retryWhen 操作符的上游。

订阅关系

other.subscribe(parent.inner);

parent.innerRepeatWhenObserver 的内部类 InnerRepeatObserver 订阅了我们返回的流。下面会提到,这里只需要知道 parent.inner 订阅了我们返回的 Observable.just("just retry!") 即可。

parent.subscribeNext();

再看 RepeatWhenObserversubscribeNext() 方法:

void subscribeNext() {
    if (wip.getAndIncrement() == 0) {

        do {
            if (isDisposed()) {
                return;
            }

            if (!active) {
                active = true;
                source.subscribe(this);
            }
        } while (wip.decrementAndGet() != 0);
    }
}

内部会让 RepeatWhenObserver 自己订阅我们的原始上游 source ( upstream ) 。

被观察者 观察者
upstream ( 原始流 ) RepeatWhenObserver
other ( retryWhen 中我们返回的流 ) InnerRepeatObserver

重试机制

在将重试机制之前,会先分析正常的流程,即上游不会抛出错误不重试的流程,然后再分析错误重试的流程,即上游发射错误事件触发重试的流程。

正常不重试流程

还是按照上面的例子,上游 upstream 首先发射 A ,由订阅关系得知会触发 RepeatWhenObserveronNext() 方法:

@Override
public void onNext(T t) {
    HalfSerializer.onNext(downstream, t, this, error);
}
public static <T> void onNext(Observer<? super T> observer, T value,
        AtomicInteger wip, AtomicThrowable error) {
    if (wip.get() == 0 && wip.compareAndSet(0, 1)) {
        observer.onNext(value);
        ...
    }
}

此处的 observer 就是真正的下游 downstream。可以看到内部会调用 downstreamonNext() 方法,完成转发操作。

错误重试流程

按照上面的例子,上游 upstream 模拟出错会发射 RuntimeException("always on error!") 事件,由订阅关系得知会触发 RepeatWhenObserveronError() 方法:

@Override
public void onError(Throwable e) {
    DisposableHelper.replace(upstream, null);
    active = false;
    signaller.onNext(e);
}

很明显,在这里拿到上游的异常信息后,将异常信息通过 signaller 流发射了出去。signallerretryWhen 参数 Functionapply() 方法的参数 attempts

这里先不必探究谁会订阅 signaller 流,我们先看 other 参数,就是我们在 retryWhen 中返回的流,它是被 InnerRepeatObserver 所订阅着的,如果我们返回的流 other 发射了一个正常时间,来看看会怎样,它会触发 InnerRepeatObserveronNext() 方法:

final class InnerRepeatObserver extends AtomicReference<Disposable> implements Observer<Object> {

    private static final long serialVersionUID = 3254781284376480842L;

    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }

    @Override
    public void onNext(Object t) {
        innerNext();
    }

    @Override
    public void onError(Throwable e) {
        innerError(e);
    }

    @Override
    public void onComplete() {
        innerComplete();
    }
}

InnerRepeatObserverRepeatWhenObserver 的一个内部类,所以它的 onNext() 中调用的 innerNext() 方法为 RepeatWhenObserver 的:

void innerNext() {
    subscribeNext();
}

这样就清楚了,里面又调用 RepeatWhenObserversubscribeNext() 方法,上面已经分析过了,该方法会让 RepeatWhenObserver 自己订阅我们的原始上游 source ( upstream ) 。

所以我们只要让 other 即我们返回的流发射一次 onNext() 事件即可完成重新订阅达到重试,那怎么触发 other 发射事件呢?答案就是上面的 signaller 。上游出错时,会导致 signaller 将错误信息当成正常时间发射出去,所以我们只要让 signaller 触发 other 即可,可以理解为只要将 othersignaller 相连接即可:

upStream.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Observable<Throwable> attempts) throws Exception {
        System.out.println("retryWhen: apply()");
        return attempts.flatMap(throwable -> {
            System.out.println("attempts: flatMap apply() -> " + throwable);
            Thread.sleep(1000);
            return Observable.just("just retry");
        });
    }
}).subscribe(downStream);

attempts.flatMap 中,我们还可以拿到 attempts 中的错误信息来判断是否重试,如果重试只要发射一个正常事件即可,向上面的例子一样,那不想重试了呢?只要发射一个 onError() 错误事件即可。由上面 InnerRepeatObserver 源码可以看到如果发射了一个错误事件,会调用 RepeatWhenObserverinnerError() 方法:

void innerError(Throwable ex) {
    DisposableHelper.dispose(upstream);
    HalfSerializer.onError(downstream, ex, this, error);
}
public static void onError(Observer<?> observer, Throwable ex,
        AtomicInteger wip, AtomicThrowable error) {
    if (error.addThrowable(ex)) {
        if (wip.getAndIncrement() == 0) {
            observer.onError(error.terminate());
        }
    } else {
        RxJavaPlugins.onError(ex);
    }
}

可以看到内部会调用 downstreamonError() 方法,完成转发操作。

至此已经分析完了,下面附上流程图方便理解:

RWO : RepeatWhenObserver

IRO : InnerRepeatObserver