The trouble with Where()

Apr 10, 2013 at 12:38 AM
Edited Jul 31, 2013 at 9:48 PM
I love Rx, but I have a problem I keep running into.

Let's say we have a single upstream IObservable<Foo>, and N downsteam sequences attached to it, where each is only interested in those Foos that satisfy some simple predicate (say foo.bar == someKey).

Of course this is a simple job for the Where() operator:
IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
...
// Many more subscriptions for different bar values.
What will essentially happen here is that for each Foo produced upstream, the Where() predicate will be evaluated for that Foo N times. It acts like a linear search to find all subscribers that want this Foo. That's all well and good, and exactly what we (should) expect from using Where() here.

The problem I have is that in my case, N may be very large, but the subset of subscribers that want any particular Foo is very small. Typically, there will be only one for each Foo. This means I'm essentially doing a slow linear search when I could be doing a very efficient lookup to find the few downstream sequences that this Foo needs to be propagated to. My apps run in a very performance critical environment and I cannot afford this inefficiency.

I've racked my brain trying to find some elegant way of doing this more efficiently, but I can only come up with solutions that involve storing a lot of state (mapping subscribers, etc) and having to manage concurrency very carefully, which defeats a lot of the purpose of using Rx in the first place. I would prefer some way of dealing with this in terms of existing operators. Has anyone dealt with this issue before, or know of a good solution? I'm happy to provide more details.
Apr 10, 2013 at 1:25 AM
Do you really need separate subscribers?

If you know all keys at compile time, then you can use one subscriber with a switch statement for it's OnNext.

If you want to assemble a list of "handlers" at runtime, then yes, I think you may need a dictionary to make the lookup fast. But I think you can get by with one subscriber, and a concurrentdictionary. Use GetOrCreate (?) to register new handlers for each new key. You can even make a disposable which removes the handler from the dict.

If you want to register multiple handlers, then you may need locks. Unless you're ok with key->empty sticking around.

Hope that helps.
Apr 10, 2013 at 2:53 AM
Edited Apr 10, 2013 at 2:55 AM
Hi,

How about using GroupBy or GroupByUntil with Publish?

For example: (Untested)
IConnectableObservable<IGroupedObservable<string, Foo>> foosByBar = 
    (from foo in foos
     group foo by foo.bar)
    .Publish();

foosByBar.Where(g => g.Key == "abc").Take(1).SelectMany(g => g).Subscribe(A);
foosByBar.Where(g => g.Key == "xyz").Take(1).SelectMany(g => g).Subscribe(B);
foosByBar.Where(g => g.Key == "bla").Take(1).SelectMany(g => g).Subscribe(C);

foosByBar.Connect();
GroupBy uses a dictionary lookup for every key to find the appropriate observable in which the value is pushed.

Publish broadcasts the group-by so that the dictionary lookup operation is shared by all observers.

Where / Take executes the predicate only once to locate the appropriate group, then it receives a broadcast of every value in that group along with any other observers interested in the same key.

Note that GroupBy doesn't replay IGroupedObservable so you must set up all of your subscriptions before connecting. If you'd rather use RefCount than Connect, then perhaps you should consider applying the Replay operator to the result of GroupBy.

- Dave
Apr 10, 2013 at 3:42 PM
Dave, you are a gentleman and a scholar. Your suggestion is working like a charm. Not only that, my profiling shows the performance is just as good if not better than my homegrown solution where I was mapping subscribers myself.

As for the Replay semantics, fortunately in my use case I'm not concerned with that because I'm getting an infinite and stateless upstream feed, i.e., I don't care about previous values for one particular key and I already have to wait for a new feed to come for each key anyway. So I just put the GroupBy behind a Publish/Refcount pair.

Thanks so much!