10
Vote

Optimise ReplaySubject for specific cases

description

Currently the ReplaySubject is a very versatile class that can perform 5 major use cases
  1. Replay-One. Caching and replaying the last value.
  2. Replay-Many. (i.e. a value > 1)
  3. Replay-Period. Cache data for a period of time for late subscribers
  4. Replay by count and time. A combination of caching by count and period.
  5. Replay-All. Cache and replay all values.
It seems to me that the most common usage* of Replay, "Replay(1)" incurs the cost of all the Scheduling and Buffering of the other implementations (Stopwatches, Queues, ImmutableLists of ScheduledObservers and the relevant checks).

My proposal is this: as the bufferSize and window are readonly values of a ReplaySubject, a specialization of the inner workings of the ReplaySubject can be loaded at construction time. For example if the new Replay(1) constructor was called then an internal implementation could be resolved that was specific and optimized for the behavior required by a Replay-one subject :
  • Use ImmutableList<IObserver<T>> instead of ImmutableList<SchduedledObserver<T>>,
  • no StopWatch,
  • no internal Queue of cached values
  • no Time and queue depth checks on every OnNext
For Replay(n) where n>1, then an implementation with an intenal queue could be used. Still this implementation would have no need for the ImmutableList<SchduedledObserver<T>>, StopWatch and the time checks on the OnNext call.

The two constructors that take a Window and a bufferSize+Window, could share an internal implemenation as I doubt there would be much perf difference between the two.

Finally the Reply-all behaviour where the parameterless constructor is used, could use the Replay-Many implementation passing Int.Max as a param, but this would still needlessly be calling the Trim method for each OnNext, so again is a candidate for specialization.

*In my opinion Replay(1) would be the most used operation of the ReplaySubject or the associated Replay extension method.

comments

davedev wrote Apr 17, 2013 at 8:42 PM

Hi Lee,

Out of curiosity, are you experiencing real problems with the overhead incurred by the count scenarios or did you simply notice that it's unnecessary in the implementation?

Note that you can use Publish(initial) instead of Replay(1). It uses BehaviorSubject internally, which is essentially just a specialization of ReplaySubject(1), perhaps due to its commonality as you've mentioned. If you don't need the initial value it can be excluded with Where, though in some cases you need to use Select also if there is no sensible value to indicate exclusion in a particular domain; e.g., integers can be projected into nullable integers.

For example:
IObservable<int> xs = GetObservable();
IObservable<int?> ys = xs.Select(x => new int?(x));
IConnectableObservable<int?> p = ys.Publish(initialValue: null);
IObservable<int> q = from x in p
                     where x.HasValue
                     select x.Value;
q.Subscribe(...);
...
p.Connect();
- Dave

LeeCampbell wrote Apr 17, 2013 at 9:46 PM

The short answer is "Yes, we are seeing perf issues". We switched out a quick modification of the ReplaySubject and noticed a large change in the number of allocations being made.
In ReplaySubject the root of the allocations seems to come from
private ImmutableList<ScheduledObserver<T>> _observers;
Which in turn ScheduledObserver<T> has
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
which is excessively allocating arrays. I say excessively, but really I mean redundantly as we have no scheduling requirements, so dont need to cost of the ScheduledObserver<T>.

I will try to create a test pack to reproduce our findings.

w.r.t your proposed solution, thanks for the thought. However, I see this as a work around/hack where perhaps a far better implementation can be provided. I imagine as Rx gains in popularity (which it certainly seems to be doing), there will be an expectation that it performs optimally.

I have seen that Rx has gathered quite a ground swell in the Capital Markets/Investment Banking industry where often data is provided at a pretty quick rate (100s of values per second). With a high tick rate and unnecessary allocations happening Garbage Collection can have an impact on performance. Essentially, I don't think that new users should have to learn a hack to get assumed performance, when the library could be optimized.

Having said this, I have taken a fork of the repo and am working on a proposed fix. I am not expecting someone else to do this for me, but thought I would raise an "Issue" for it so it can be tracked.

Lee

LeeCampbell wrote Jun 5, 2013 at 10:00 PM

FYI: Code fork is currently at
http://rx.codeplex.com/SourceControl/network/forks/LeeCampbell/Rx

I will be looking to get code reviews done on it (feel free anyone) and then submit it as a pull request once I have collated some performance tests to justify the change.

LeeCampbell wrote Jan 1 at 1:23 AM

Using a performance test suite (https://github.com/LeeCampbell/RxPerfTests) I can see large improvements are available for ReplaySubject instances created for Replay-one, Replay-Many and Replay-All.

As I have had trouble with codeplex, I have moved the code changes to a github fork (https://github.com/LeeCampbell/Rx.NET)

LeeCampbell wrote Jan 5 at 4:04 AM

raybooysen wrote Jan 6 at 12:34 PM

+1 on this.

On our project, for Replay(1) scenarios at least, we have a custom subject because of this specific issue.

Lee_Oades wrote Mar 26 at 11:38 AM

+1 Good work Lee.