RxJava2 наблюдаемый прием бросает UndeliverableException

Как я понимаю, RxJava2 values.take(1) создает другое Observable, которое содержит только один элемент из исходного Observable. Который НЕ ДОЛЖЕН бросать исключение, поскольку он отфильтровывается эффектом take(1) поскольку это произошло вторым.

как в следующем fragmentе кода

  Observable values = Observable.create(o -> { o.onNext(1); o.onError(new Exception("Oops")); }); values.take(1) .subscribe( System.out::println, e -> System.out.println("Error: " + e.getMessage()), () -> System.out.println("Completed") ); 

Вывод

 1 Completed io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more 

Мои вопросы :

  1. Правильно ли я понимаю это?
  2. Что действительно происходит, чтобы вызвать исключение.
  3. Как это решить от потребителя?

    1. Да, но поскольку наблюдаемые «концы» не означают, что код, запущенный внутри create(...) , остановлен. Чтобы быть в полной безопасности, в этом случае вам нужно использовать o.isDisposed() чтобы узнать, завершился ли наблюдаемый downstream.
    2. Исключение есть потому, что RxJava 2 имеет политику NEVER, позволяющую onError вызов onError . Он либо доставляется вниз, либо UndeliverableException как глобальное UndeliverableException если наблюдаемое уже завершено. Разработчик Observable должен «правильно» обрабатывать случай, когда наблюдаемое завершено, и возникает Исключение.
    3. Проблема заключается в том, что производитель ( Observable ) и потребитель ( Subscriber ) не согласны с тем, когда stream заканчивается. Так как продюсер переживает потребитель в этом случае, проблема может быть решена только у производителя.

    @Kiskae в предыдущем комментарии правильно ответил о причине возникновения такого исключения.

    Здесь ссылка на официальный документ по этой теме: RxJava2-wiki .

    Иногда вы не можете изменить это поведение, так что есть способ справиться с этим UndeliverableException . Вот fragment кода о том, как избежать сбоев и неправильного поведения:

     RxJavaPlugins.setErrorHandler(e -> { if (e instanceof UndeliverableException) { e = e.getCause(); } if ((e instanceof IOException) || (e instanceof SocketException)) { // fine, irrelevant network problem or API that throws on cancellation return; } if (e instanceof InterruptedException) { // fine, some blocking code was interrupted by a dispose call return; } if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) { // that's likely a bug in the application Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } if (e instanceof IllegalStateException) { // that's a bug in RxJava or in a custom operator Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } Log.warning("Undeliverable exception received, not sure what to do", e); }); 

    Этот код взят из ссылки выше.

    Важная заметка. Этот подход устанавливает глобальный обработчик ошибок в RxJava, поэтому, если вы можете избавиться от этих исключений – это будет лучший вариант.