2
Vote

Bug: Task-Related Operator Overloads Introduce Concurrency

description

Hi,

I just ran into a problem related to the unexpected introduction of concurrency when using Task-based asynchrony in conjunction with Rx in a Windows 8 app. I've had this problem before and thought that I was doing something wrong or that it's the correct behavior of some kind of new multi-threaded UI model in WinRT, yet now I realize that it seems to be a bug in Rx.

If you search the entire Rx codebase for ".ContinueWith(" you'll see that every usage doesn't specify the ExecuteSynchronously flag, thus allowing the TPL to schedule the continuation on the thread pool, and it always does.

According to the documentation:
Creates a continuation that executes asynchronously when the target Task<TResult> completes.
But we don't want asynchronous execution, we want synchronous execution. I'd imagine this is true for all uses of ContinueWith throughout Rx, though I haven't looked at the finer details yet.

Executing ContinueWith asynchronously is a problem in general because the concurrency being introduced isn't related to the specified IScheduler, if one is supplied by the user, and it's entirely unexpected when using particular operators that should never introduce concurrency by default, as shown below. As a result, the threading context of notifications is incorrect and we have little control over it.

There are two situations in which I've run into this problem in my code:
  1. The ToObservable extension for Task<T> always introduces concurrency. For example, the following code illustrates how the UI threading context is automatically captured by await, since I'm not specifying ConfigureAwait(false), yet it's lost with ToObservable. My expectation has always been that the observable returned by ToObservable simply generates a notification within the completion context of the converted Task, without introducing concurrency unnecessarily. (NOTE: I'm not actually sure whether forcing Rx's usage of ContinueWith to execute synchronously would solve this problem, though I expect that it will.)
private async void _Loaded(object sender, RoutedEventArgs e)
{
    Contract.Assert(Dispatcher.HasThreadAccess);  // true
    await DoSomethingAsync();
    Contract.Assert(Dispatcher.HasThreadAccess);  // true
    DoSomethingAsync().ToObservable().Subscribe(_ =>
    {
        Contract.Assert(Dispatcher.HasThreadAccess);  // false!
    });
}

private async Task DoSomethingAsync()
{
    await Task.Delay(TimeSpan.FromSeconds(1));

    Contract.Assert(Dispatcher.HasThreadAccess);  // true
}
  1. The overloads of SelectMany that compose an observable with a Task<T> incorrectly introduce concurrency. This is even worse than the previous issue because if we don't realize that it's caused by Rx then we'll typically try to resolve the problem by composing another SelectMany query that awaits some kind of dispatcher yield operation, but as it turns out if it's the same Task-related SelectMany overload then the problem occurs again, though if it's an observable then it's likely that it was converted from a Task via ToObservable, which as I've shown above is also flawed in this way, so the problem remains. This is what led me to believe a while ago that it was an issue related to Windows 8 rather than Rx. No matter what I did the query was always running outside of the UI thread. ObserveOnDispatcher doesn't even solve the problem (at least not in my particular situation).
Can anyone else confirm that this behavior is indeed a bug?

I'd be happy to fix it and have it ready in a day or two, before the next planned release hopefully, though it's also possibly a breaking change (for the better, I guess) so I'd like to get a green light from the Rx team before doing the work.

Thanks,
Dave

comments

malayeri wrote Oct 16, 2013 at 6:39 PM

I asked Bart de Smet for some more context, since he designed some of these operators, and here's his reply:
It’s a matter of defaults once more, but some that you can’t always control right now as a user. Given that the completion of a Task that’s being converted to an observable can trigger the whole downstream processing pipeline (which can take a long time to run), ExecuteSynchronously was deemed inappropriate (cf. MSDN “Only very short-running continuations should be executed synchronously.”). I’d only introduce synchronous execution if we have a scheduler around and use that one to schedule the continuation on:
task.ContinueWith(t => … scheduler.Schedule(…) …, ExecuteSynchronously)
It’s fine to invoke, synchronously, the continuation that makes a call to Schedule which effectively is asynchronous (unless the immediate scheduler would be used). IIRC, the ToObservable conversion for Task<T> doesn’t take a scheduler, but it could, effectively doing what’s shown above. It’d be great for the overload without an IScheduler to retain the original behavior though.

That leaves us with SelectMany (etc. whose Task-returning methods are basically macros for using ToObservable. We wouldn’t want a bunch of o)verloads of those that add an IScheduler parameter for this reason alone, so I’d rather keep those untouched, stating they have the default ToObservable behavior with no parameter, and if one wants to control what’s going on, use the IScheduler-based overload by hand:
xs.SelectMany(x => ftask(x))   ==   xs.SelectMany(x => ftask(x).ToObservable(/* default behavior */))
If you want control, you have to convert yourself:
xs.SelectMany(x => ftask(x).ToObservable(/* specify some scheduler here */))
I’m curious though why ObserveOnDispatcher didn’t help in Dave’s case.
Does that help explain things?

davedev wrote Oct 17, 2013 at 12:15 AM

Yes, it does. Please thank Bart for me.

However, I'm not entirely convinced yet that it's the correct default behavior.

I now agree that Rx must respect ContinueWith's contract by only executing synchronously if the continuation is fast, per the cited MSDN documentation.

I also like the proposal for a new overload of ToObservable accepting a scheduler.

But I still don't like the default behavior for the parameterless ToObservable or SelectMany. Forcing a continuation to use the task pool when it's just going to turn around and marshal back to the UI thread anyway seems wasteful and for unsuspecting developers that fail to do so it could potentially introduce race conditions or other threading bugs in programs unexpectedly. (I'm proof that it happens :)

It seems that the primary use case nowadays for Task is to be used with await, in which we expect the synchronization context to be preserved unless it's explicitly avoided via ConfigureAwait, so I feel that converting a Task into an IObservable should respect the principal of least surprise and not force unnecessary concurrency.

Therefore, how about a compromise? The parameterless ToObservable operator could attempt to get a scheduler for the current synchronization context and pass it to Bart's proposed overload; otherwise, it would fallback to the existing behavior in Rx 2.0 (whatever ContinueWith decides). Furthermore, the proposed overload is perhaps sufficient for applying ConfigureAwait(false)-like behavior for conversions by simply passing in Scheduler.TaskPool.

At the very least, even if one could argue that this isn't a sensible default behavior for ToObservable, then would it at least be sensible for SelectMany?

For example, and to answer Bart's question about why ObserveOnDispatcher (alone) didn't help in my particular scenario, imagine a query that uses multiple consecutive SelectMany queries. I see this as a declarative program using observables analogous to an imperative program that uses await:
var q = from x in t1.ToObservable()
        from y in t2
        from z in t3
        select z;
==
var x = await t1;
var y = await t2;
var z = await t3;
All t* are Task<T>.

The problem now is that these two programs seem to be semantically the same, yet depending upon whether a synchronization context currently exists they'll behave differently. I find it really strange that Rx is actually the thing introducing concurrency, in this case. It seems like TPL was classically all about introducing concurrency to solve problems, while Rx has always been about taming concurrency. Async/await is now also about taming concurrency, yet interoperating with Rx introduces concurrency.

It's true that q could be "fixed" by applying ObserveOnDispatcher before subscribing; however, in reality it's not always so simple, or intuitive.

For example:
var q = from x in t1.ToObservable()
        from y in DoSomethingAsync()
        from z in DoSomethingElseAsync()
        select z;
The *Async methods return Task<T>.

In reality, it's often necessary to execute async methods on the UI thread, just like the default behavior of the await keyword when an async method begins executing on the UI thread.

To solve this problem in the above example, we have to explicitly force all observables onto the UI thread, even when we know that all of our async methods will complete on the UI thread. That's counterintuitive, IMHO.
var q = from x in t1.ToObservable().ObserveOnDispatcher()
        from y in DoSomethingAsync().ToObservable().ObserveOnDispatcher()
        from z in DoSomethingElseAsync().ToObservable().ObserveOnDispatcher()
        select z;
-OR, with the proposed overload-
var q = from x in t1.ToObservable().ObserveOnDispatcher()
        from y in DoSomethingAsync().ToObservable(DispatcherScheduler.Current)
        from z in DoSomethingElseAsync().ToObservable(DispatcherScheduler.Current)
        select z;
It's especially counterintuitive when we consider what's really happening with this "fix". The current Rx marshaling behavior goes from UI->ThreadPool->ThreadPool->ThreadPool->ThreadPool->ThreadPool because each use of ToObservable apparently marshals to a new pooled thread, even if the current continuation is already executing on a pooled thread. However, we want to begin each continuation on the UI instead, so we're avoiding the default behavior by introducing a no-op scheduling operation: UI->UI->UI->UI->UI->UI. In fact, what we actually want (and what I had expected) is synchronous behavior for continuations: UI->UI->UI.

Given async/await, it no longer seems like a sensible default behavior to require developers to add extra code on each line when converting from Task-based asynchrony to observables, which I suspect is the common case nowadays. At least, it certainly doesn't seem like it guides developers into a pit of success.

That example also illustrates the reason that ObserveOnDispatcher didn't work for me. I wasn't aware that Rx was introducing concurrency, so I was surprised to discover that adding ObserveOnDispatcher in one place, rather than in every place, wasn't enough to fix the problem. Of course, ObserveOnDispatcher is useless if the Dispatcher isn't available in the context of a subsequent from clause, and I should've realized that; although, ObserveOn(control.Dispatcher) still works. But it's all about expectations: I knew that my *Async methods were completing on the UI thread, but I was getting an error in Subscribe, so I was confused that applying ObserveOnDispatcher before Subscribe didn't work. After I applied it, I started getting threading errors in the other Async methods too (perhaps it was race conditions), until I realized that Rx was introducing concurrency in SelectMany. This was counterintuitive for me.

I guess it boils down to these questions:
  1. How do most developers use the conversion from Task to Observable, including overloads of SelectMany et. al.?
  2. Do they expect await semantics or do they expect concurrency to be introduced automatically regardless of the thread on which the Task completes?
Thanks for considering the matter,
Dave