Merge with Dynamic Maximum Concurrency


Rx defines an overload of the Merge operator with a maxConcurrent parameter that is useful for controlling the level of concurrency through the number of active subscriptions to inner observables; however, the maxConcurrent parameter is static and in some cases it's impossible to write a query that changes the parameter dynamically. Unsubscribing and re-subscribing to Merge won't work for a hot source due to the fact that Merge uses an internal buffer. Unsubscribing will lose all of the inner observables that have already been buffered but aren't yet active, and since the source is hot, this causes permanent data loss.

Consider adding a Merge overload such as the one shown in the following discussion:


It has the following signature:

IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> source, IObservable<int> maxConcurrent)


davedev wrote Oct 9, 2014 at 5:19 PM