Best practices for operator creation

Aug 20, 2013 at 5:37 PM
I've been recently developing reactive operators with initialization and finalization code like this:
static IObservable<int> OperatorCreate(IObservable<int> source)
{
    return Observable.Create<int>(observer =>
    {
        Console.WriteLine("initialization");
        var process = source
            .Do(xs => Console.WriteLine("process: {0}", xs))
            .Subscribe(observer);
        var close = Disposable.Create(() => Console.WriteLine("dispose"));
        return new CompositeDisposable(process, close);
    });
}
However, I realized I could also do it entirely using Reactive combinators like this:
static IObservable<int> OperatorCombinators(IObservable<int> source)
{
    return Observable.Defer(() =>
    {
        Console.WriteLine("initialization");
        return source
            .Do(xs => Console.WriteLine("process: {0}", xs))
            .Finally(() => Console.WriteLine("dispose"));
    });
}
Question is: are these two forms completely equivalent? If not, what are advantages/disadvantes of each relative to exception handling, performance, etc, etc.

Thanks!
Aug 21, 2013 at 11:13 AM
Edited Aug 21, 2013 at 11:17 AM
Hi,

You may be interested in this related discussion as well.
Question is: are these two forms completely equivalent?
Functionally, yes. Semantically, no.
If not, what are advantages/disadvantes of each relative to exception handling, performance, etc, etc.
Exception handling: no difference. Performance: not much, if any at all.

Ultimately, the difference is a matter of style.

Rx is a framework that allows us to query data. The idea is to declare what we want, not how to get it. A declarative syntax makes our code much more compact and closer to the specification, which makes it simpler to understand. Your Defer query reads like this:
  1. Defer the source to add a subscription side effect.
  2. Do a computation side effect for every element in the source.
  3. Finally, when the source terminates or is canceled, execute a side effect.
The query reads just like a specification.

Create is the most flexible generator that Rx has to offer. It gives us a context in which we can functionally or imperatively define any cold observable behavior that we may need. Most operators in Rx could be implemented in terms of Create alone. Your Create query reads like this:
  1. Create a new observable.
  2. Start with a subscription side effect.
  3. Create a query: Do a computation side effect for every element in the source.
  4. Subscribe the observer to the previous query.
  5. Create a disposable side effect.
  6. return a composite disposable that disposes of the previous subscription and then executes the previous disposable side effect, after the source terminates or is canceled.
Use the right tool for the job. You probably want to be declarative when possible, so your Defer query is certainly better in this case.

In general, it's better to write composite queries with existing operators rather than using Create. You should only fallback to Create when you need custom behavior that isn't easily or elegantly defined by composing existing operators, which shouldn't be very often. Create is also useful for when you need to create an observable from nothing; i.e., you have no observable source, as in your example, or anything that can be converted into one. However, Rx also fills the gap by providing specialized generators that are more semantic than Create, allowing us to create observables from nothing in a declarative way:

Rx Generators
  • Never
  • Empty
  • Throw
  • Return
  • Range
  • Timer
  • Interval
  • Generate
  • Start
Sometimes Create isn't necessary in more advanced scenarios as well; e.g., defining operators in a general-purpose library, such as Rx itself. Create isn't used internally by Rx – all closures are implemented manually for the sake of improving stack traces while debugging queries. Ironically, perhaps, the way that Rx implements its native operators seems to be quite the opposite of functional composition.

- Dave