1
Vote

Reactive Overloads for Static Parameters

description

Several operators in Rx accept static parameters such as Int32 and TimeSpan. Given that Rx is a reactive framework, wouldn't it make sense to provide observable variants as well?

In many cases, it seems that this would help users avoid having to do complex things related to re-subscribing, publishing, scheduling and/or buffering when writing dynamic queries, if these operators were to offer reactive overloads that avoid re-subscription altogether.

In at least one known case it seems that it's absolutely required to prevent data loss: http://rx.codeplex.com/workitem/43

The new signatures in general would simply replace the static parameters of their corresponding overloads with IObservable<T>, though perhaps in some cases due to overload conflicts a new naming convention for operators may be called for.

Note that this was already done for some overloads of Window, Buffer, Throttle, Sample, Delay and Timeout in Rx 2.0. Although, instead of replacing TimeSpan parameters with IObservable<TimeSpan>, an IObservable<TOther> parameter was added instead because observable notifications carry time.

Furthermore, instead of overloading Skip and Take in Rx 1.0, different operators were introduced: SkipUntil and TakeUntil. It seems that the same basic principles apply.

These operators are potential candicates for overloading as of Rx 2.0:
Buffer<TSource>(this IObservable<TSource>, int);
Buffer<TSource>(this IObservable<TSource>, int, int);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, int);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, int, IScheduler);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan, IScheduler)

DistinctUntilChanged<TSource>(this IObservable<TSource>, IEqualityComparer<TSource>)

Merge<TSource>(this IEnumerable<IObservable<TSource>>, int)
Merge<TSource>(this IEnumerable<IObservable<TSource>>, int, IScheduler)
Merge<TSource>(this IObservable<IObservable<TSource>>, int)  (See the related work item above.)

ObserveOn<TSource>(this IObservable<TSource>, IScheduler)

Repeat<TSource>(this IObservable<TSource>, int)

Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, IScheduler)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, TimeSpan)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, TimeSpan, IScheduler)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, TimeSpan)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, TimeSpan, IScheduler)
Replay<TSource>(this IObservable<TSource>, int)
Replay<TSource>(this IObservable<TSource>, int, IScheduler)
Replay<TSource>(this IObservable<TSource>, int, TimeSpan)
Replay<TSource>(this IObservable<TSource>, int, TimeSpan, IScheduler)
Replay<TSource>(this IObservable<TSource>, TimeSpan)
Replay<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Retry<TSource>(this IObservable<TSource>, int)

Skip<TSource>(this IObservable<TSource>, int)

SkipLast<TSource>(this IObservable<TSource>, int)
SkipLast<TSource>(this IObservable<TSource>, TimeSpan)
SkipLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Take<TSource>(this IObservable<TSource>, int)
Take<TSource>(this IObservable<TSource>, int, IScheduler)

TakeLast<TSource>(this System.IObservable<TSource>, int)
TakeLast<TSource>(this IObservable<TSource>, int, IScheduler)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler, IScheduler)

TakeLastBuffer<TSource>(this IObservable<TSource>, int)
TakeLastBuffer<TSource>(this IObservable<TSource>, TimeSpan)
TakeLastBuffer<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Window<TSource>(this IObservable<TSource>, int)
Window<TSource>(this IObservable<TSource>, int, int)
Window<TSource>(this IObservable<TSource>, TimeSpan, int)
Window<TSource>(this IObservable<TSource>, TimeSpan, int, IScheduler)
Window<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan)
Window<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan, IScheduler)
Without giving much thought to semantics or to potential overload collisions, here are their coresponding reactive overloads:

Note: ObserveOn is the only operator that should have an observable IScheduler parameter?
Buffer<TSource>(this IObservable<TSource>, IObservable<int>);
Buffer<TSource>(this IObservable<TSource>, IObservable<int>, IObservable<int>);
Buffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>);
Buffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>, IScheduler);
Buffer<TSource, TDuration, TShift>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TShift>);
Buffer<TSource, TDuration, TShift>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TShift>, IScheduler)

DistinctUntilChanged<TSource>(this IObservable<TSource>, IObservable<IEqualityComparer<TSource>>)

Merge<TSource>(this IEnumerable<IObservable<TSource>>, IObservable<int>)
Merge<TSource>(this IEnumerable<IObservable<TSource>>, IObservable<int>, IScheduler)
Merge<TSource>(this IObservable<IObservable<TSource>>, IObservable<int>)  (See the related work item above.)

ObserveOn<TSource>(this IObservable<TSource>, IObservable<IScheduler>)

Repeat<TSource>(this IObservable<TSource>, IObservable<int>)

Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IScheduler)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IObservable<TDuration>)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IObservable<TDuration>, IScheduler)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<TDuration>)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<TDuration>, IScheduler)
Replay<TSource>(this IObservable<TSource>, IObservable<int>)
Replay<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<int>, IObservable<TDuration>)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<int>, IObservable<TDuration>, IScheduler)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Retry<TSource>(this IObservable<TSource>, IObservable<int>)

Skip<TSource>(this IObservable<TSource>, IObservable<int>)

SkipLast<TSource>(this IObservable<TSource>, IObservable<int>)
SkipLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
SkipLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Take<TSource>(this IObservable<TSource>, IObservable<int>)
Take<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)

TakeLast<TSource>(this IObservable<TSource>, IObservable<int>)
TakeLast<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler, IScheduler)

TakeLastBuffer<TSource>(this IObservable<TSource>, IObservable<int>)
TakeLastBuffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
TakeLastBuffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Window<TSource>(this IObservable<TSource>, IObservable<int>)
Window<TSource>(this IObservable<TSource>, IObservable<int>, IObservable<int>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>, IScheduler)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TDuration>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TDuration>, IScheduler)

comments

davedev wrote Oct 6 at 10:45 PM

I'm thinking now that Rxx's WindowIntrospective operator may in fact be another "dynamic" variant of ObserveOn. Although the scheduler doesn't vary, the size of the internal queue does.

See also: https://rx.codeplex.com/workitem/82

davedev wrote Oct 9 at 3:40 PM