Group By implementation not thread safe


Looks like the group by implementation is not thread safe. Internally it uses a Dictionary.Add, in the onnext of the Sink, without using proper locking mechanisms.
 if (!_map.TryGetValue(key, out writer)) {
     writer = new Subject<TElement>();
     _map.Add(key, writer);
    fireNewMapEntry = true;
I noticed by subscribing to an event pattern that is invoked by multiple threads at the same time. (In my case 24 threads that potentially call TransportMessageReceived concurrently)
                    h => _bus.Transport.TransportMessageReceived += h,
                    h => _bus.Transport.TransportMessageReceived -= h);
And simply subscribe to it with a group by expression
var messagesReceived = from e in observable
                       group e by "something" into c
                       select c ;

subscription = messagesReceived.Subscribe(r => Console.WriteLine("Triggered") ); 
This will occasionally throw a null reference in Dictionary.Insert ... as that implementation is not thread safe by default.


davedev wrote Jul 3, 2013 at 11:50 PM


That is the correct behavior.

One of Rx's contracts is that notifications must be pushed serially; i.e., you must not invoke OnNext concurrently within a single observable. There are probably many Rx operators that may cause threading bugs if you do not satisfy this contract.

See §4.2 in the Rx Design Guidelines document.
  • Dave

Bnaya wrote Aug 10, 2013 at 7:15 PM

Isn't it time to re-consider this limitation?
it is true that this is the current contract, but many operatoir can be thread-safe with
a little effort.
I believe that it should change, maybe by adding a set of thread-safe operators.
without doing it there is a need for synchronization and we may not be able to get full
utilization of a multi core environment (see Amdahl's law).

davedev wrote Aug 10, 2013 at 10:54 PM

I reject the premise. A thread-safe operator is thread safe because it ensures synchronization, so whether Rx does it or you do it yourself (e.g., via the Synchronize operator) it's going to end up being the same thing anyway. A set of thread-safe operators would essentially be, perhaps, identical to the set of existing operators with the addition of the Synchronize operator.