Rx++ Return Operator proposal and few other observations/suggestions

Dec 2, 2012 at 5:06 PM
Edited Dec 2, 2012 at 8:43 PM

 

    template <typename T>
    auto Return(T value)
       -> std::shared_ptr<Observable<T>>
    {
        return CreateObservable<T>(
            [=](std::shared_ptr<Observer<T>> observer) -> Disposable
        {
            struct State 
            {
                bool cancel;
            };
            auto state = std::make_shared<State>();
            state->cancel = false;

            DefaultScheduler::Instance().Schedule(
                fix0([=](std::function<void()> self) // TODO:
            {
                try {
                    if (state->cancel)
                        return;

                    observer->OnNext(value);
                    observer->OnCompleted();
                } catch (...) {
                    observer->OnError(std::current_exception());
                }                
            }));

            return [=]{ state->cancel = true; };
        });
    }

 

Some comments on this proposal.

For cold observables to behave as expected, you practically need to schedule everything up front:

int _tmain(int argc, _TCHAR* argv[])
{
   int res = 0;
   DefaultScheduler::Instance().Schedule([argc,argv,&res]() { res = scheduledMain(argc,argv); });
   return res;
}

 

I see the unknown state of what the original author intended to do as a problem (i.e //TODO comments). Namely, a lot of code can be written following the same style as done with Return operator. Many operators will require Scheduler, thus any future change to Default Scheduler may require code using it to change. 

It is not easy to be motivated to develop on such foundation, especially since it is hard to predict, judging by his blog entries, if Aaron will ever come back to C++ :)

Alternatively, we may try to settle on the interface for the Scheduler. As long as the operators use that interface, changes in implementation shouldn't (in principle) require an operator to change.

I would propose to avoid diverging from existing .NET interfaces and operator semantics.
Divergence should be justified by either:

a) implementation language requirements (i.e. it simply can't be done in C++ as in C#)

b) improvements (i.e. C++ allows us to be very efficient in some cases. C++ implementation should take advantage of what the language offers)

Thus, I would be leaning towards having C++ equivalent of .NET Scheduler interface:

 

public virtual IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)

 

(other methods omitted for brevity)
but the current C++ version, where Schedule() takes function<void()> parameter, would be acceptable if that is the only way we can avoid that Scheduler implementation blocks us from doing anything at all.

Few other proposals:

While playing with Return I felt the need for more overloads to existing binder subscribe method:

 

  template <class OnNext>
  auto subscribe(OnNext onNext) -> decltype(Subscribe(obj, onNext)) {
     return Subscribe(obj, onNext);
  }

namely overloads including OnCompleted and OnError, maybe even the overload with the Observer parameter.It would simplify some code.

 

As somebody else also noted, it is better to keep any platform specific implementation, such as ObserveOnDispatcherOp,in a separate header (or cpp file).

Dec 2, 2012 at 5:23 PM
Edited Dec 2, 2012 at 5:46 PM

Please note that in the above proposal I included fix0 only to illustrate

// TODO:

comments which apply to any operator that schedules again. Return operator doesn't need it (even if you could schedule OnCompleted after OnNext), so cleaned-up proposal is:

    template <typename T>
    auto Return(T value)
       -> std::shared_ptr<Observable<T>>
    {
        return CreateObservable<T>(
            [=](std::shared_ptr<Observer<T>> observer) -> Disposable
        {
            struct State 
            {
                bool cancel;
            };
            auto state = std::make_shared<State>();
            state->cancel = false;

            DefaultScheduler::Instance().Schedule(
            [=]() 
            {
                try {
                    if (state->cancel)
                        return;

                    observer->OnNext(value);
                    observer->OnCompleted();
                } catch (...) {
                    observer->OnError(std::current_exception());
                }                
            });

            return [=]{ state->cancel = true; };
        });
    }

 
Dec 4, 2012 at 9:34 AM

I agree,

the 'scheduler' part of the library should be redesigned.

There is a 'bad' dependency on the DefaultScheduler now that must be avoided.