Creating a custom, long-running observable

Dec 19, 2012 at 12:24 PM

Hi,

I want to wrap running an external process in an IObservable - output lines would get emitted OnNext, and the observable would get completed when the external process completes.

I have been looking around the Rx source code, and as far as I can see, multi-valued observables are created by deriving from a Producer<T> class - but that is unfortunately internal to Rx, so it's not meant to be used by mere mortals.

And Bart De Smet has stated that we should never create observables by implementing the interface, so what is the recommended way of doing something like this?

be well
-h-

Dec 19, 2012 at 4:01 PM

What do you mean by muti-valued observables? What you're describing sounds like a job for Observable.Create(). In the subscription delegate, start your external process and redirect the output to onNext(). That's all I can tell you without more details.

http://www.introtorx.com/content/v1.0.10621.0/04_CreatingObservableSequences.html#CreationOfObservables

Dec 19, 2012 at 4:20 PM

Hi,

I agree with adhemar82 that you should clarify what you want in more detail.  An example of the expected output may help.  But I'd like to add that there are two possibilities:

  1. Multiple calls to OnNext.  This can be achieved in several ways, including using Observable.Create.
  2. Multiple values per OnNext.  This can be achieved by changing the data type.  For example, you can use IObservable<Tuple<T1, T2>>.

- Dave

Dec 19, 2012 at 4:22 PM

Hi,

Also see this discussion:

http://social.msdn.microsoft.com/Forums/en/rx/thread/49da113d-6d2a-4ce9-85c4-3acdbfa28f56

- Dave

Dec 21, 2012 at 1:38 PM

Thanks for the feedback.

The full set of requirements are:

  1. Run an external program, specifying filename, arguments and working directory.
  2. The observable created should:
    1. Emit output lines as they happen (modulo buffering) for both stdandard ouput and standard error. It should be possible to see what lines are stdout and stderr.
    2. If the program terminates with an exit code of 0, the observable completes normally.
    3. If the program terminates with an exit code != 0, the observable terminates with an error.
  3. If there are more that one subscriber, the external program should still only run once.

The link that Dave provided hints at solution, unfortunately that suffers from the problem that the Process.Exited event fires before all output has been emitted from buffers, so you will miss some output.

However, after some more searching, I found this on stack overflow:

http://stackoverflow.com/questions/11702012/create-iconnectableobservablestring-from-process-output-buffer-issue

This is almost what I want, except that stdout and stderr are mixed and there is a way to specify stdin, which I don't want.

Working from there, I now have following code, which seems to work:

https://bitbucket.org/corvuscorax/rxprocessextensions/src

(I quickly packaged it up into a test project, that runs a python script to test the output functionality)

Again: thanks for the feedback.

be well
-h-