Apr 10, 2013 at 1:38 AM
Edited Jul 31, 2013 at 10:48 PM
I love Rx, but I have a problem I keep running into.
Let's say we have a single upstream
, and N downsteam sequences attached to it, where each is only interested in those Foos that satisfy some simple predicate (say
foo.bar == someKey
Of course this is a simple job for the
IObservable<Foo> foos = ...;
foos.Where(foo => foo.bar == "abc").Subscribe(f => A(f));
foos.Where(foo => foo.bar == "xyz").Subscribe(f => B(f));
foos.Where(foo => foo.bar == "bla").Subscribe(f => C(f));
// Many more subscriptions for different bar values.
What will essentially happen here is that for each
produced upstream, the
predicate will be evaluated for that
N times. It acts like a linear search to find all subscribers that want this
. That's all well and good, and exactly what we (should) expect from using
The problem I have is that in my case, N may be very large, but the subset of subscribers that want any particular
is very small. Typically, there will be only one for each
. This means I'm essentially doing a slow linear search when I could be doing a very efficient lookup to find the few downstream sequences that this
needs to be propagated to. My apps run in a very performance critical environment and I cannot afford this inefficiency.
I've racked my brain trying to find some elegant way of doing this more efficiently, but I can only come up with solutions that involve storing a lot of state (mapping subscribers, etc) and having to manage concurrency very carefully, which defeats a lot of
the purpose of using Rx in the first place. I would prefer some way of dealing with this in terms of existing operators. Has anyone dealt with this issue before, or know of a good solution? I'm happy to provide more details.