Subscription IDisposable from Observable.Generate can cause StackOverflowException?

Jan 31, 2013 at 3:49 PM
Consider this toy program:
public static void Main(string[] args)
{
    var generate = Observable.Generate(
        Unit.Default,
        u => true,
        u => Unit.Default,
        u => Unit.Default,
        u => TimeSpan.FromTicks(1000));
    var generateSub = generate.Subscribe(x => { });
    Console.ReadLine();
    generateSub.Dispose();
}
If I run this program for even a second, I get a StackOverflowException in mscorlib.dll. This happens during the last line, the Dispose() of the subscription. Obviously this is a short interval but even with a longer interval (say hundreds of milliseconds), the same will happen if the program runs for long enough. I wrote the program above to track down an issue we were seeing in a much larger app, on shutdown, after it had run for at least an hour or so.

I've looked into the code for Generate() a bit since then and I assume this is due to some recursion that happens under the hood. I've since refactored our app to not use Generate(), but my question is, is this dangerous behaviour by design? It seems that at the very least, Observable.Generate() should come with a hefty warning label that it should not be used for long-running sequences (or with short intervals). It took us quite some time and effort to track this issue down to Rx being the problem.

I love the power of Rx, and I'm a strong evangelist for it in my team, but that's hard to do when we find these subtle, undocumented caveats...
Jan 31, 2013 at 6:19 PM
Edited Jan 31, 2013 at 6:21 PM
Hi,

I was able to repro on .NET 4.5.

Looking at the code, it appears that Generate schedules each iteration by invoking Schedule recursively. The scheduler builds up a chain of disposables that gets executed when the subscription is disposed. So it seems that the number of calls is directly proportional to the number of iterations.

Perhaps this could be solved in one of two ways:
  • Change Generate so that it uses a loop to schedule recursively; however, I assume the Rx team didn't do this already because it would mean avoiding the concurrency abstraction layer, and thus Rx's timer optimizations would have to be duplicated.
  • Change the types in the System.Reactive.Disposables namespace so that they can reach into the internals of each other and dispose recursively in a loop. I'd imagine that an ICompositeDisposable interface could be useful; i.e., all of the disposable classes would implement this interface, and in their Dispose implementations they would check to see whether their composited disposables implement ICompositeDisposable, and if so, reach in and dispose them in a loop, recursively.
Assuming that my analysis is correct, I wonder how the Rx team feels about the second option. It seems like a nice general optimization to me.

-- Dave
Jan 31, 2013 at 10:45 PM
Hi,

FYI, I've just implemented the second option (ICompositeDisposable) in Rx and it seems to have solved the problem. Note that only CompositeDisposable, SingleAssignmentDisposable, MultipleAssignmentDisposable and SerialDisposable were affected.

Unfortunately, it seems that eliminating the stack overflow has revealed that the time it takes to execute Dispose is directly proportional to how long the observable was running, and it's not necessarily cheap. Perhaps ICompositeDisposable is a good general optimization to avoid stack overflows in Rx, but this behavior may be an indication that Generate could use some additional performance tweaking of its own.

-- Dave
Jan 31, 2013 at 10:59 PM
<div dir="ltr">Is there some reason we need disposable chaining at all? Why not use a cancellation token approach and pass the same &quot;cookie&quot; object from one iteration to the next? Then Dispose can just change the state of the cookie and tell the head iteration to avoid the next reschedule.<br> </div>
Feb 1, 2013 at 12:39 AM
Hi,

Generate simply calls Schedule recursively and the concurrency abstraction layer (CAL) handles the timer. To support a tokenized approach, without duplicating scheduler/CAL internals in Generate, I think the IScheduler interface would have to be updated. Not sure that's the best approach.

Alternatively, Generate could be rewritten to perform its own recursion via a loop, which was the first option I proposed above. That way a single disposable could be used. But as noted, it may break encapsulation of the scheduler/CAL. This also may not be the best approach.

The disposable chaining works fine with the ICompositeDisposable optimization in place. Though now I've noticed a direct correlation between running time and disposal time, so the next question is: does it matter?

In my quick test, running the OP's code in DEBUG mode with Visual Studio attached and pressing Enter after ~10 seconds caused the call to generateSub.Dispose(); to take ~3 seconds. Running it for ~4 seconds caused Dispose to take ~1 second. I didn't test a release build without the debugger attached, but if it's similar results then it does seem like it matters. Generate could be running for much longer than 10 seconds in a production environment. And consider the effect on memory usage.

But perhaps Generate does exactly what it was intended to do. There are alternatives. As noted, the OP has already worked around the issue.
  • Dave
Feb 1, 2013 at 5:12 AM
While it is not directly related to the OP (and should probably be a new thread, if there is any interest) I would love to discuss the code below.

It comes from recursive scheduler and I am interested both in its current C# and possible C++ implementation.

By discussing I mean opinions, explanations and insights regarding the implementation, performance, heap and stack consumption, where, how and if to use C++ shared pointer for disposables, composite disposable in particular:
        static IDisposable InvokeRec1<TState>(IScheduler scheduler, Pair<TState, Action<TState, Action<TState>>> pair)
        {
            var group = new CompositeDisposable(1);
            var gate = new object();
            var state = pair.First;
            var action = pair.Second;

            Action<TState> recursiveAction = null;
            recursiveAction = state1 => action(state1, state2 =>
            {
                var isAdded = false;
                var isDone = false;
                var d = default(IDisposable);
                d = scheduler.Schedule(state2, (scheduler1, state3) =>
                {
                    lock (gate)
                    {
                        if (isAdded)
                            group.Remove(d);
                        else
                            isDone = true;
                    }
                    recursiveAction(state3);
                    return Disposable.Empty;
                });

                lock (gate)
                {
                    if (!isDone)
                    {
                        group.Add(d);
                        isAdded = true;
                    }
                }
            });

            recursiveAction(state);

            return group;
        }
Feb 1, 2013 at 5:33 PM
Dave,

Is your code accessible somewhere, maybe on a branch in the codeplex repo?
Feb 1, 2013 at 5:53 PM
No, I simply "cloned" the latest source code to my local computer and made the changes. I'm not sure how I could make it public. I guess I'd have to create another fork first, but I'm not sure what to do next. Do you know the proper GIT commands that I need to use?
  • Dave
Feb 1, 2013 at 7:56 PM
davedev wrote:
But perhaps Generate does exactly what it was intended to do. There are alternatives. As noted, the OP has already worked around the issue.
True, but the workaround isn't as nice as it was before. Also, there should be better documentation of this behaviour, and a clear warning. Others could easily fall into the trap of using long-running Generate()'s. This is my main point.

I also agree with your point that the Dispose() running time should not depend on how long the Generate() sequence ran. That's not intuitive.
Feb 2, 2013 at 1:05 AM
Edited Feb 2, 2013 at 3:08 AM
In my plays with this toy example, and keeping a breakpoint inside lambda for OnNext(), I see this as some sort of race condition.

For TimeSpan of 1000 ticks I can clearly see the stack constantly growing, invoking the recursive procedure:
System.Reactive.Core.dll!System.Reactive.AnonymousSafeObserver<System.Reactive.Unit>.OnNext(System.Reactive.Unit value) + 0x57 bytes    
    System.Reactive.Linq.dll!System.Reactive.Linq.Observαble.Generate<System.Reactive.Unit,System.Reactive.Unit>.δ.InvokeRec(System.Reactive.Concurrency.IScheduler self, System.Reactive.Unit state) + 0x72 bytes  
and finally crashing with stack overflow.

For 10,000 ticks, or using Generate without time selector parameter, stack is staying at constant size, even after millions of invocations.

Calling Schedule with recursive Action ends up running the algorithm, the simplest form of which is what I quoted in previous post, to the best of my understanding.
Schedule method with TimeSpan runs literally the one below (or some variation of it):
        static IDisposable InvokeRec2<TState>(IScheduler scheduler, Pair<TState, Action<TState, Action<TState, TimeSpan>>> pair)
        {
            var group = new CompositeDisposable(1);
            var gate = new object();
            var state = pair.First;
            var action = pair.Second;

            Action<TState> recursiveAction = null;
            recursiveAction = state1 => action(state1, (state2, dueTime1) =>
            {
                var isAdded = false;
                var isDone = false;
                var d = default(IDisposable);
                d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
                {
                    lock (gate)
                    {
                        if (isAdded)
                            group.Remove(d);
                        else
                            isDone = true;
                    }
                    recursiveAction(state3);
                    return Disposable.Empty;
                });
                
                lock (gate)
                {
                    if (!isDone)
                    {
                        group.Add(d);
                        isAdded = true;
                    }
                }
            });

            recursiveAction(state);

            return group;
        }

Feb 2, 2013 at 4:35 AM
Edited Feb 2, 2013 at 4:46 AM
Hi,

I can repro your results, though I disagree with your conclusion. Changing the number of ticks to 10,000 doesn't cause a stack overflow; Edit: however, it also doesn't generate nearly close to the number of iterations as 1000 ticks does because it introduces a longer delay; i.e., 10,000 ticks = 1ms delay between iterations; 1000 ticks = 1/10ms between iterations, so there are many more in a shorter period of time.

For example, change the OP's code to count the number of iterations:
var count = 0;
var generateSub = generate.Subscribe(_ => count++);
Console.ReadLine();
Console.WriteLine(count);
1000 ticks produced 15657 iterations for me after only ~2 seconds in DEBUG mode with the debugger attached. Changing the number of ticks to 10000 and running the same program again produces only ~100 iterations after ~2 seconds. These results are consistent for me. So it's clear that the stack overflow isn't due to a race condition; instead, it's directly related to the number of iterations. And I can reproduce the stack overflow every time with 1000 ticks, so it seems highly unlikely to me that a race condition could produce such consistent results for only one particular value.

Furthermore, the code that the OP's example executes for Generate with a TimeSpan doesn't invoke the recursive action that you posted. It invokes the scheduler's Schedule methods directly and recursively, verified with breakpoints and by stepping through.

http://rx.codeplex.com/SourceControl/changeset/view/791e000c00ed#Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs

The constructor being called by the OP's example is this:
public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
Note that it sets _timeSelectorR, so Run chooses the second Sink implementation: δ

The sink's Run method contains:
return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
which calls Schedule directly on the IScheduler interface, not an extension method:
IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action);
It passes InvokeRec as the action, which contains:
return self.Schedule(state, time, InvokeRec);
that calls a different Schedule method directly on the IScheduler interface:
IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action);
It's clear that the managed stack is being used to perform the recursion. The specified scheduler isn't aware of the recursion, so it simply builds up a chain of disposables.

Generate uses DefaultScheduler by default. Here are its implementations of the relevant Schedule methods. Note that s_cal represents the concurrency abstraction layer (CAL).
public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
    if (action == null)
        throw new ArgumentNullException("action");

    var d = new SingleAssignmentDisposable();

    var cancel = s_cal.QueueUserWorkItem(_ =>
    {
        if (!d.IsDisposed)
            d.Disposable = action(this, state);
    }, null);

    return new CompositeDisposable(
        d,
        cancel
    );
}

public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
{
    if (action == null)
        throw new ArgumentNullException("action");

    var dt = Scheduler.Normalize(dueTime);
    if (dt.Ticks == 0)
        return Schedule(state, action);

    var d = new SingleAssignmentDisposable();

    var cancel = s_cal.StartTimer(_ =>
    {
        if (!d.IsDisposed)
            d.Disposable = action(this, state);
    }, null, dt);

    return new CompositeDisposable(
        d,
        cancel
    );
}
It's clear that the disposables are being chained together, without any coalescing. Notice that action returns a disposable, the call to Schedule returns a disposable, and the implementation of Generate returns the latter disposable as the former disposable, thus each time Schedule is called (once per iteration) it creates a new SingleAssignmentDisposable and combines it into a new CompositeDisposable along with the CompositeDisposable of the following iteration.

-- Dave
Feb 2, 2013 at 5:38 AM
Edited Feb 2, 2013 at 8:35 AM
Ah, good to understand it better!

I was looking at non-optimized implementation with AnonymousObservable, which is easier to analyze, and it appeared that, barring Producer and Sink optimizations, they invoke the same underlying extension method.

Also confusing was that version with FromTicks(0) doesn't degrade to the version with no time selector:
         var generate = Observable.Generate(
            Unit.Default,
            u => true,
            u => Unit.Default,
            u => Unit.Default);
which has constant stack and runs merrily forever. I would guess that it simply doesn't use CompositeDisposable then?!

EDIT1: Ah, I see that no time version runs on a different Scheduler:
   internal static IScheduler Iteration { get { return CurrentThreadScheduler.Instance; } }
and changing the schedulers doesn't help timed implementation, even if stack appears to grow less slowly for ImmediateScheduler:
         var generate1 = Observable.Generate(
            Unit.Default,
            u => true,
            u => Unit.Default,
            u => Unit.Default,
            u => TimeSpan.FromTicks(1000),
            CurrentThreadScheduler.Instance);

         var generate = Observable.Generate(
            Unit.Default,
            u => true,
            u => Unit.Default,
            u => Unit.Default,
            u => TimeSpan.FromTicks(1000),
            ImmediateScheduler.Instance);
EDIT2

Sorry, I still can't run and debug with the full C# source code, so I am playing with extracts of code, with the hope of understanding this fully.

In my next test with unoptimized implementation I see notably higher CPU consumption, but fairly stable stack size:
Go figure!
         var generate3 = Generate_(
            Unit.Default,
            u => true,
            u => Unit.Default,
            u => Unit.Default,
            u => TimeSpan.FromTicks(10),
            DefaultScheduler.Instance);

         var generateSub = generate3.Subscribe(x => { });
         Console.ReadLine();
         generateSub.Dispose();
      }

      private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
      {
         return new AnonymousObservable<TResult>(observer =>
         {
            var state = initialState;
            var first = true;
            var hasResult = false;
            var result = default(TResult);
            var time = default(TimeSpan);
            return scheduler.Schedule(TimeSpan.Zero, self =>
            {
               if (hasResult)
                  observer.OnNext(result);
               try
               {
                  if (first)
                     first = false;
                  else
                     state = iterate(state);
                  hasResult = condition(state);
                  if (hasResult)
                  {
                     result = resultSelector(state);
                     time = timeSelector(state);
                  }
               }
               catch (Exception exception)
               {
                  observer.OnError(exception);
                  return;
               }

               if (hasResult)
                  self(time);
               else
                  observer.OnCompleted();
            });
         });
      }
Feb 2, 2013 at 6:54 PM
Edited Feb 3, 2013 at 9:55 PM
Just to confirm:
Indeed both optimized and non-optimized versions of Generate end up calling default scheduler in DefaultSchduler.cs:
      public override IDisposable Schedule<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
        {

           if (action == null)
                throw new ArgumentNullException("action");

            var dt = Scheduler.Normalize(dueTime);
            if (dt.Ticks == 0)
                return Schedule(state, action);

            var d = new SingleAssignmentDisposable();

            var cancel = s_cal.StartTimer(_ =>
            {
                if (!d.IsDisposed)
                    d.Disposable = action(this, state);
            }, null, dt);

            return new CompositeDisposable(
                d,
                cancel
            );
        }
except that optimized version calls OnNext by starting the timer above, which calls the action in Generate.cs, class δ: Sink:
            private IDisposable InvokeRec(IScheduler self, TState state)
            {
                var time = default(TimeSpan);

                if (_hasResult)
                    base._observer.OnNext(_result);
                try
                {
                    if (_first)
                        _first = false;
                    else
                        state = _parent._iterate(state);
                    _hasResult = _parent._condition(state);
                    if (_hasResult)
                    {
                        _result = _parent._resultSelector(state);
                        time = _parent._timeSelectorR(state);
                    }
                }
                catch (Exception exception)
                {
                    base._observer.OnError(exception);
                    base.Dispose();
                    return Disposable.Empty;
                }

                if (!_hasResult)
                {
                    base._observer.OnCompleted();
                    base.Dispose();
                    return Disposable.Empty;
                }

                return self.Schedule(state, time, InvokeRec);
            }
as Dave correctly pointed out.

For non-optimized version of Generate, Default Scheduler timer action is indeed InvokeRec2 from Scheduler.Recursive.cs, as quoted in previous post.

This is all with rx-ef6a42709f49.zip source code, compiled in Debug for Desktop CLR version 4.0. I have problems compiling 4.5 version on my desktop and am sorry for the confusion that my running non-optimized version and reporting for that case (which is not built by default) may have caused.

Additionally, with (default) optimized version I sometimes get OutOfMemory exception, rather than StackOverflow exception (the latter one is still more frequent in many trials so far). In any case, using Windows TaskManager I can always see memory for optimized Generate growing rather fast. Code never calls Dispose or Remove methods in CompositeDisposable.cs (verified by putting breakpoints on entry to these methods).

Quite on the contrary, non-optimized version calls Dispose and Remove in CompositeDisposable.cs often; memory and stack consumption appear constant at 1000 tick count.

I find this interesting for possible C++ re-implementation. Current implementation has only one Scheduler, which I can at best characterize as a mix of the equivalent .NET immediate, current thread and new thread scheduler. Namely, in C++ the fact of removing a Disposable object from the container in CompositeDisposable has important implications on Disposable implementation. Removal from a container requires Disposable object to have equality operator, so that it can be found. Current C++ Disposable doesn't have any equality (in good part because std::function has no == operator).

EDIT: put equality instead of identity
Dec 19, 2013 at 4:21 PM
Has anything happened about this behaviour? I just stumbled across it when a long-running Observable.Generate instance gradually ate up most of my machine's memory over a period of a fortnight or so. I've now worked around it, but the problem took quite a long time to track down so it would be good either to have a warning in the documentation somewhere, or an implementation without the memory leak.

(Of course people only come to a place like this to complain so I'd like to counterbalance the paragraph above by thanking the Rx team for a fantastic product that has made my life enormously better over the last couple of years.)