1
Vote

Observer<T>.OnError's assumptions seem to be inconsistent with Stubs.Throw

description

One of the overloads on IObservable.Subscribe is
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
Internally, the unspecified onError parameter is set to Stubs.Throw which is a simple lambda that rethrows the exception passed in.

The code in Observer<T>.OnError
public void OnError(Exception error)
{
    foreach (var observer in _observers.Data)
        observer.OnError(error);
}
Since Stubs.Throw throws the exception, the exception passed up through the foreach loop and other observers in _observers.Data never have their OnError called. The exception itself is swallowed somewhere inside Rx.

It seems to me that either Observer<T>.OnError should wrap observer.OnError in a try-catch, or Stubs.Throw should swallow the exception instead of throwing it. By not passing an onError parameter, the user of IObservable.Subscribe wishes the error to be ignored only for that Subscribe. The other subscribers registered with their own onError callbacks should be unaffected.

I've worked around this for now by making a new extension method on IObservable which passes ex => { } for the onError parameter to IObservable.Subscribe

comments

davedev wrote Jan 21, 2013 at 2:21 AM

I believe that Rx actually implements the correct behavior. See the Rx Design Guidelines, §6.4 Note:

"Note: do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods"

In summary, OnError isn't supposed to be called when OnError, OnCompleted or OnNext throws because an exception from these methods indicates an unhandled exception in user code. It's the user's responsibility to handle the exception, or else it goes unhandled and the application may crash.

Thus if you don't supply an onError handler when calling Subscribe, then you're opting to not catch the exception in your own code. It's similar to iterating an enumerable and not putting a try...catch block around the foreach statement.

In other words, the default behavior of Subscribe throws an exception because you've decided to not handle it yourself, and an exception thrown by OnError unwinds the stack (because Rx won't catch it). If you want to handle the exception, then simply pass in an onError handler.

Creating a Subscribe method that passes in a no-op onError handler by default is a global solution, but it's not safe. In general, it can make debugging your queries much harder.

Arnavion wrote Jan 21, 2013 at 4:33 AM

I don't think the analogy with IEnumerable works.

With an IEnumerable, if one foreach loop doesn't handle an exception, another foreach loop will be unaffected (assuming the unhandled exception didn't crash the application).

With IObservable though, one observer not providing its own onError handler means all other subscribers for the same observable don't have their own onError called because of the misbehaving Stubs.Throw handler.

For example, in my case specifically, none of my code throws an unhandled exception. I have a Subject as my IObservable and multiple observers registered with it using Subscribe. When something exceptional happens as part of processing the next unit of the observable, I catch the exception and call Subject.OnError(Exception) with it. It is the default onError handler registered by one of the subscribers that is throwing the exception unhandled and breaks the application.

This is totally different from the case of IEnumerables, where enumerators do not interfere with each other.

davedev wrote Jan 21, 2013 at 2:01 PM

I don't think the analogy with IEnumerable works.
It does, but perhaps you're missing the larger point. Sorry if I wasn't clear. The analogy was simply meant to justify the default behavior of Subscribe throwing an exception. The analogy is that eliding an onError handler when calling Subscribe is like not putting a try...catch block around a foreach block.

For example:
var enumerator = xs.GetEnumerator();

while (enumerator.MoveNext())  // assume this method succeeds, similar to calling IObserver<T>.OnNext
{
  try
  {
    while (enumerator.MoveNext()) { }  // assume this method throws, similar to calling IObserver<T>.OnError
  }
  catch  // similar to passing an onError argument to the Subscribe method
  {
    throw;   // rethrowing the exception is similar to Subscribe's default onError handler.
             // Note that it's also the default behavior in C# when you don't specify a catch block.
  }
}
Above, the example shows that re-throwing an exception observed from an enumerable is similar to the default behavior of Subscribe; namely, that an uncaught exception unwinds the stack. This is the abort semantics referred to in the Rx Design Guidelines, §6.6.
With an IEnumerable, if one foreach loop doesn't handle an exception, another foreach loop
will be unaffected [snip]
Well, to be clear, two foreach loops implies two separate enumerables, yet your issue was about a single observable with multiple observers, similar to the example above. Though I think your point is that the outer loop in the example is able to catch the exception when it's rethrown; however, if the example were written reactively, then the second observer wouldn't have a chance to handle the exception due to the behavior of Rx's implementation of IObserver<T>.

The major differences between enumerables and observables are the style in which the code is written and their relationship to time. The example above is imperative and synchronous, yet reactive code is written with continuations and is asynchronous.

So if Rx were to automatically catch the exception and pass it to other observers, that would reverse the default abort semantics on which we typically depend to debug our applications. In the above example, it would be as if C# inserted an implicit catch block around each while loop to suppress the exception. Clearly we don't need that in imperative code, and we also wouldn't want it. Why should observers be any different? An unhandled exception is an unhandled exception.

If you actually wanted that functionality, you could simply pass in an onError handler that doesn't rethrow the exception. Similarly, my example above could catch but not rethrow the exception; however, notice that in the imperative style of coding, suppressing an exception would allow the observer to continue pulling from the source, which of course may result in additional exceptions. In the observable world, which pushes notifications, an observer can't control whether OnNext will be called again simply by passing in an onError handler; however, an observer may control whether other observers will see the exception by catching it via an onError handler.

Another argument for the existing behavior is safety. Consider the note in the Rx Design Guidelines §6.4:
do not protect calls to Subscribe, Dispose, OnNext, OnError and OnCompleted methods.
These calls are on the edge of the monad. Calling the OnError method from these places will
lead to unexpected behavior.
My interpretation of the above warning is that an exception thrown by a call to OnError implies that the program's state may be corrupted (a safe assumption in general when an exception is thrown). An OnError handler could be user code or it could be additional operators. Rx simply cannot know whether or not it's safe to continue pushing the error to other observers when one observer throws. What if subsequent observers depend upon corrupted state? That could cause additional corruption or loss of user data. The only safe thing to do in library code is to not catch the exception at all.

Furthermore, catching an exception in library code makes debugging programs much harder. It may seem like debugging would be easier if every observer is guaranteed to see the exception, but actually a fail fast approach to error handling is better because it:
  1. allows an attached debugger to see that an exception was unhandled and immediately break into user code at the correct place.
  2. retains the complete original stack trace. This is especially useful for logging when a debugger is not attached.
  3. minimizes side-effects that can occur while the program unwinds the stack. This can make it much easier to track down the cause and effects of an unhandled exception, and to reverse side-effects if necessary.

Arnavion wrote Jan 21, 2013 at 4:14 PM

Well, to be clear, two foreach loops implies two separate enumerables
No, it implies a single IEnumerable with two enumerators. Consider an IEnumerator implementation that maintains a single index i and each call to MoveNext increments i and accesses underlyingEnumerable[i].
class ArrayEnumerator : IEnumerator<string>
{
    string[] underlyingEnumerable;
    int i;
    string Current;
    void MoveNext()
    {
        Current = underlyingEnumerable[i]; // This throws an IndexOutOfBoundsException for i = 6, say.
        i++;
    }
}
If accessing underlyingEnumerable[6] in enumerator1 throws an exception, enumerator2 will still be able to see the exception for itself when it accesses its own underlyingEnumerable[6].

When I think of passing an IObserver to an IObervable, I think in terms of the contract of IObserver. The contract states that it has an OnError method which will be called by the IObservable in case of an error.
The OnError method, which is typically called by the provider to indicate that data is unavailable, inaccessible, or corrupted, or that the provider has experienced some other error condition. (MSDN)
This contract appears to be invalidated not because of code in my observer, nor because of code in the observable, but because of a separate observer which did not pass in its own onError handler and happened to be subscribed to the observable before my own.

That said, I do see your point about how the presence of an exception potentially means the program is hosed, and so Rx should not try to prolong execution in this case. I take it the expected coding style for a situation where an exception should be handled by the observer instead of being considered non-fatal is to not treat it as an exception destined for OnError, but a part of the message sent via OnNext?
class ObservableResult
{
    string Result { get; }
    Exception Exception { get; } // null implies no exception and Result is valid
}
And so I would use an IObservable<ObservaleResult> instead of an IObservable<string> ?

That does sound like it inverts concerns somewhat, since now if the observable encounters a particular exception while preparing the next result, the observable itself must decide if all its observers will be able to handle it (put it in ObservableResult and call OnNext) or not (call OnError). Is my understanding correct?

If so, I can live with that, since I wish to handle all possible exceptions from a particular observer and it's all right for the rest to get automatically deregistered. If the exception is so serious that it warrants an immediate termination of the application, then there's nothing I could have done anyway.

Arnavion wrote Jan 21, 2013 at 4:24 PM

I take it the expected coding style for a situation where an exception should be handled by the observer instead of being considered non-fatal
Whoops! I meant "considered fatal".

davedev wrote Jan 21, 2013 at 6:41 PM

No, it implies a single IEnumerable with two enumerators
Not in general, which is what I meant. But yes, you're absolutely correct, when compared to a single observable with multiple observers, which is the point of this discussion.

But it's irrelevant anyway because the behavior of my example could be the same as if I had used two enumerators over a single shared enumerable. I had used nested while loops, but perhaps I should've avoided the loops altogether. Two sequential calls to MoveNext would have sufficed. Though I understand that you're thinking more abstractly, in the sense that the two enumerators don't even have to be used in the same method, or even at the same "time". They could be used asynchronously, or even concurrently. I guess that's a more accurate analogy; however, the point of the analogy in the first place was to reason about enumerables in a purely imperative, synchronous world. Thus, an exception thrown by one enumerator should have an immediate effect on the other: the stack should unwind and prevent any further use of either enumerator, unless of course you place a try...catch block and don't rethrow the exception. The fact that you can even do that is a major difference between imperative style vs. continuation passing style.
If accessing underlyingEnumerable[6] in enumerator1 throws an exception, enumerator2 will still
be able to see the exception for itself when it accesses its own underlyingEnumerable[6].
Perhaps, but that's just an assumption. Where is that contract defined for a shared enumerable? IEnumerable<T> doesn't mention it anywhere.

An enumerable isn't necessarily required to cache and replay an exception. For example, it could simply return false on subsequent calls to MoveNext instead of throwing again.

However, I agree that there doesn't seem to be any reason not to simply throw it again. But only because enumerable is a pull model. In a push model, where an observable invokes a method on an observer, and that observer throws an exception, then the observable doing the "pushing" shouldn't catch it and try to invoke other observers, for the reasons given previously. It's a safe design for a pull model, but not for a push model.
I think in terms of the contract of IObserver [snip]
This contract appears to be invalidated [snip]
Yes, but consider why it's being invalidated: an exceptional circumstance.

We've established that the default behavior of Subscribe is correct, to throw when an onError handler isn't specified. It correlates to the expected behavior in enumerables; namely, that C# doesn't provide implicit catch statements. So if Subscribe must throw by default, meaning that an observer's OnError method is throwing an exception, which may or may not be the exception that was passed to it, this is clearly an exceptional circumstance. It makes perfect sense that the contract is broken. Rx doesn't know what the exception means, so it shouldn't assume that it's safe to invoke other observers. But I think we're in agreement on this now.
the expected coding style for a situation where an exception should be handled by the observer
instead of being considered fatal is to not treat it as an exception destined for OnError, but a part
of the message sent via OnNext?
Exactly. In Rxx, I'll often model this with: Either<T, Exception>. For example, see the Retry operator overloads.
[snip] the observable itself must decide if all its observers will be able to handle it (put it in
ObservableResult and call OnNext) or not (call OnError).
Is my understanding correct?
Sort of. :)

Ultimately, the decision that you're making when defining an observable is whether or not you consider an exception fatal in terms of query abort semantics (§6.6). That's it. If you choose to make an exception fatal, by passing it to OnError, then how far it reaches is entirely up to the observers in the query; e.g., the Catch operator suppresses exceptions and continues the observable. The observers at the end of the query are typically user code, so it makes sense that the user has full control over whether they want to catch a fatal exception (by supplying an onError handler), and allowing other observers to see the exception (by not rethrowing it). It's not the responsibility of observables.

Arnavion wrote Jan 21, 2013 at 10:30 PM

Thanks, davedev. I get it now.