Alternative to Publish for subscriptions done after an Exception

Aug 12, 2013 at 7:19 PM
Edited Aug 12, 2013 at 7:20 PM
I need to share a single push data connection between multiple observers (i.e. a single "hot" observable). However, I don't want to keep the connection active all the time... Ideally the connection would be opened only when there is at least one observer, and would be closed again when all observers unsubscribe.

I tried using Publish/RefCount to this effect, with something like:
var source = Observable.Using(
    () => { Console.WriteLine("opened"); return Disposable.Create(() => Console.WriteLine("closed")); }, // open connection here
        // push data
    resource => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(1)))
    .Publish()
    .RefCount();

// Register one observer
var s1 = source.Subscribe(xs => Console.WriteLine("s1: {0}", xs));
Console.ReadLine();
s1.Dispose();

// Register another observer
var s2 = source.Subscribe(xs => Console.WriteLine("s2: {0}", xs));
Console.ReadLine();
s2.Dispose();
This seemed to work fine, with the output being something like:
s1: 0
s1: 1
s1: 2
completed

s2: 0
s2: 1
completed
which seemed to indicate that the connection was being restarted successfully.

However, in the real scenario, sometimes there is an exception establishing the connection, which must be propagated down to the subscribers so that they can decide what to do (e.g. give up and report to the user or try again).

I simulated this in the following way:
var exceptionState = false;
var source = Observable.Using(
    () =>
    {
        // open connection (throws or not depending on state)
        if (exceptionState) throw new Exception("connection failed");
        Console.WriteLine("opened");
        return Disposable.Create(() => Console.WriteLine("closed"));
    },
         // push data
    resource => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(1)))
    .Publish()
    .RefCount();

// Successful connection
exceptionState = false;
var s1 = source.Subscribe(xs => Console.WriteLine("s1: {0}", xs), ex => Console.WriteLine("s1 failed"));
Console.ReadLine();
s1.Dispose();

// Failing connection
exceptionState = true;
var s2 = source.Subscribe(xs => Console.WriteLine("s2: {0}", xs), ex => Console.WriteLine("s2 failed"));
Console.ReadLine();
s2.Dispose();

// Successful connection
exceptionState = false;
var s3 = source.Subscribe(xs => Console.WriteLine("s2: {0}", xs), ex => Console.WriteLine("s3 failed"));
Console.ReadLine();
s3.Dispose();
However, to my surprise, this was the output:
opened
s1: 0
s1: 1
s1: 2
closed

s2 failed

s3 failed
opened
closed
I was surprised by the fact that the 3rd observer could not restart the sequence; even more surprisingly, the connection was indeed opened and closed successfully, but instead of pushing down the data, the observable broadcasted the previous exception.

I kind of understand this behavior would make sense when Publishing a sequence, since it is the most reasonable way to enforce Rx semantics of sending out only one completed or error message...

I guess what I'm looking for is not so much sharing exactly one subscription, but more like an intermediate between "hot" and "cold" semantics. Every observer gets its own sequence which may or may not throw an exception (like a "cold" observable), but overlapping observers would share an existing subscription, if already available (like a "hot" observable).

Is there any way to make this with existing Rx operators? If not, what would be the easiest way to implement it? Implement a custom subject for Multicast? Create a custom Observable operator?

I hope I was able to explain this clearly enough... Thanks in advance for any help!
Aug 13, 2013 at 7:27 AM
Hi,

The failure of s2 and s3 is correct; however, I wouldn't have expected s3 to open and close the resource. That seems unnecessary. Though I was able to repro, and after looking at RefCount it seems to be acceptable behavior at least. RefCount subscribes to the source first and then it connects, regardless of whether the source calls OnError synchronously. Perhaps I could argue that it shouldn't call connect if the source fails synchronously, though changing it now may be too much of a breaking change anyway.

To answer your particular question, Rxx provides a new overload of the Multicast operator that accepts a subject factory as a parameter. A new subject is created each time that RefCount decides to reconnect, rather than keeping the same faulted subject around. As a result, the new subject knows nothing about the previous exception and s3 in your example gets a new sequence.

Here's the source: https://rxx.codeplex.com/SourceControl/latest#Main/Source/Rxx/System/Reactive/Linq/Observable2 - Multicast.cs
NOTE: Don't click that link; you must copy and paste it, including the trailing - Multicast.cs. CodePlex linking is apparently broken.

Here's a related lab.

In your original query, simply replace Publish with Rxx's Mutlicast like this:
...
// push data
resource => Observable.Timer(DateTimeOffset.Now, TimeSpan.FromSeconds(1)))
.Multicast(() => new Subject<long>())
.RefCount();
Example output:
opened
s1: 0
s1: 1
s1: 2
closed

s2 failed

opened
s3: 0
s3: 1
s3: 2
closed
- Dave
Aug 13, 2013 at 9:37 AM
Thanks so much for looking into this! The multicast overload is exactly what I was looking for :)
Brief aside: any reason why Rxx has a different open-source license than Rx?

Thanks again.
Aug 13, 2013 at 11:21 AM
Hi,
any reason why Rxx has a different open-source license than Rx?
Yes, it's simply because Rxx was open source before Rx and once Rx finally became open source I never really thought about changing the license to match.

Is this a problem for you at all?

- Dave
Aug 13, 2013 at 11:54 AM
I wanted to include this in my own open-source project which has a BSD license that is less restrictive than the Ms-PL. No worries, though. From what I realize, I only need the internal ReconnectableObservable logic, which I found available in this MSDN post. From what I understand from MSDN terms of use, this makes this particular implementation public domain for all practical purposes, correct?

I'm really sorry to bother you with these concerns, really... It always makes me sad when these technicalities get in the way of honest code-sharing...
Thanks so much again for pointing me in the right direction!

Loved your blog post about defining "hot" and "cold" observable based on side-effects, by the way. I had had this intuition before and was really nice to see it so well formalized.
Aug 13, 2013 at 12:20 PM
Hi,
I wanted to include this in my own open-source project
Rxx is also distributed as a NuGet package. Would you consider adding a NuGet reference instead? It would make even more sense if you're planning on distributing your project via NuGet as well. Rxx includes many useful features that you may want to depend on.

Admittedly, I had decided not to deploy Rxx 2.0 and instead encouraged the community to build it from the source code themselves while I worked on transferring several of Rxx's features into Rx. However, I've received a couple of requests for an "official" Rxx 2.0 deployment, so I know people are interested in it. Please let me know if you're giving this an up-vote, while I'm still reconsidering.
which has a BSD license that is less restrictive than the Ms-PL.
I see. I actually chose Ms-PL because I thought it was quite unrestrictive. But I'd be happy to change Rxx's license to match Rx. That seems like the right thing to do, if you're sure that Rx's license is less restrictive than Ms-PL.
I only need the internal ReconnectableObservable logic, which I found available [snip]
Yep, that's it. James Miles actually added it to Rxx himself.
MSDN terms of use [snip]
I'd also assume that it's very unrestrictive. Though I can assure you there won't be any problem if you rip the code straight from Rxx. That's why I linked you to it in the first place.
Loved your blog post
Thanks, I appreciate it!

- Dave
Aug 13, 2013 at 12:55 PM
davedev wrote:
Rxx is also distributed as a NuGet package. Would you consider adding a NuGet reference instead? It would make even more sense if you're planning on distributing your project via NuGet as well. Rxx includes many useful features that you may want to depend on.
I'm actually thinking of distributing my project through NuGet, yes, so this may make more sense, especially if I decide to include more Rxx operators (I'll look it up). I'm a bit torn whether to add a whole new dependency just for a single operator, but that is definitely my threshold; two would already be justification enough to add it in, for maintenance purposes.
Admittedly, I had decided not to deploy Rxx 2.0 and instead encouraged the community to build it from the source code themselves while I worked on transferring several of Rxx's features into Rx. However, I've received a couple of requests for an "official" Rxx 2.0 deployment, so I know people are interested in it. Please let me know if you're giving this an up-vote, while I'm still reconsidering.
I would definitely up-vote an Rxx 2.0 deployment through NuGet, since it seems an ideal way to manage dependencies between open-source projects (apt-get for .NET, yay!)
which has a BSD license that is less restrictive than the Ms-PL.
I see. I actually chose Ms-PL because I thought it was quite unrestrictive. But I'd be happy to change Rxx's license to match Rx. That seems like the right thing to do, if you're sure that Rx's license is less restrictive than Ms-PL.
I'm not entirely sure that Ms-PL is more restrictive than Apache License (Rx license), but I do know that it's more restrictive than BSD because of clause 3.D:

(D) If you distribute any portion of the software in source code form, you may do so only under this license by including a complete copy of this license with your distribution. If you distribute any portion of the software in compiled or object code form, you may only do so under a license that complies with this license.

Which I interpret to mean that if I include a portion of Ms-PL code into a BSD/MIT project, I cannot redistribute the whole thing only under BSD. You can do it the other way around, since BSD/MIT allows you to change the license, just as long as you keep the copyright notice, hence the reason I call it less restrictive.
I only need the internal ReconnectableObservable logic, which I found available [snip]
Yep, that's it. James Miles actually added it to Rxx himself.
MSDN terms of use [snip]
I'd also assume that it's very unrestrictive. Though I can assure you there won't be any problem if you rip the code straight from Rxx. That's why I linked you to it in the first place.
Cool, thanks so much! I'll consider adding Rxx as a NuGet dependency anyway.
Aug 13, 2013 at 8:54 PM
Edited Aug 16, 2013 at 9:56 PM
which has a BSD license that is less restrictive than the Ms-PL.
I see. I actually chose Ms-PL because I thought it was quite unrestrictive. But I'd be happy to change Rxx's license to match Rx. That seems like the right thing to do, if you're sure that Rx's license is less restrictive than Ms-PL.
I'm not entirely sure that Ms-PL is more restrictive than Apache License (Rx license), but I do know that it's more restrictive than BSD because of clause 3.D:

(D) If you distribute any portion of the software in source code form, you may do so only under this license by including a complete copy of this license with your distribution. If you distribute any portion of the software in compiled or object code form, you may only do so under a license that complies with this license.

Which I interpret to mean that if I include a portion of Ms-PL code into a BSD/MIT project, I cannot redistribute the whole thing only under BSD. You can do it the other way around, since BSD/MIT allows you to change the license, just as long as you keep the copyright notice, hence the reason I call it less restrictive.
Apache 2.0 might meet your needs, but please check the specific license terms. Note that most other MS OSS projects nowadays also tend to use Apache 2.0, so if you want to use the same license for related projects, that would be the best candidate.

Wrt to NuGet, I'm a fan, so I like the idea of an Rxx NuGet package. However, I don't know if I plan to take a dependency on Rxx anytime soon, so count this as a weak vote. :)

(edited to be clear I am not offering legal advice nor trying to interpret a license.)
Aug 13, 2013 at 10:12 PM
Hi,

It's done. Rxx is now under Apache License 2.0 (Apache).

Thanks for both of your feedback.

- Dave
Aug 14, 2013 at 2:44 AM
That was fast! Apache does sound like a nice OSS license :)

One last thought on the Publish question:
I started using the reconnectable Multicast (() => new Subject()) in a lot of places, enough for me to feel like I should write a Publish variant. I decided to call it PublishReconnectable.

Somehow I feel it would be a nice variant to have available, although I understand it could be confusing if you're not familiar with the subtleties of subscription...

Cheers,
Aug 14, 2013 at 4:04 AM
Hi,

Yep, we've already got a work item for it, though it's not very popular ;)

- Dave