2121
2222import rx .*;
2323import rx .Observable ;
24- import rx .exceptions .Exceptions ;
25- import rx .exceptions .OnErrorThrowable ;
24+ import rx .exceptions .*;
2625import rx .functions .*;
26+ import rx .internal .util .OpenHashSet ;
2727import rx .observables .ConnectableObservable ;
2828import rx .schedulers .Timestamped ;
2929import rx .subscriptions .Subscriptions ;
@@ -303,8 +303,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
303303 /** Indicates a terminated ReplaySubscriber. */
304304 static final InnerProducer [] TERMINATED = new InnerProducer [0 ];
305305
306- /** Tracks the subscribed producers. */
307- final AtomicReference <InnerProducer []> producers ;
306+ /** Indicates no further InnerProducers are accepted. */
307+ volatile boolean terminated ;
308+ /** Tracks the subscribed producers. Guarded by itself. */
309+ final OpenHashSet <InnerProducer <T >> producers ;
310+ /** Contains a copy of the producers. Modified only from the source side. */
311+ InnerProducer <T >[] producersCache ;
312+ /** Contains number of modifications to the producers set.*/
313+ volatile long producersVersion ;
314+ /** Contains the number of modifications that the producersCache holds. */
315+ long producersCacheVersion ;
308316 /**
309317 * Atomically changed from false to true by connect to make sure the
310318 * connection is only performed by one thread.
@@ -324,12 +332,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
324332 /** The upstream producer. */
325333 volatile Producer producer ;
326334
335+ /** The queue that holds producers with request changes that need to be coordinated. */
336+ List <InnerProducer <T >> coordinationQueue ;
337+ /** Indicate that all request amounts should be considered. */
338+ boolean coordinateAll ;
339+
340+ @ SuppressWarnings ("unchecked" )
327341 public ReplaySubscriber (AtomicReference <ReplaySubscriber <T >> current ,
328342 ReplayBuffer <T > buffer ) {
329343 this .buffer = buffer ;
330344
331345 this .nl = NotificationLite .instance ();
332- this .producers = new AtomicReference <InnerProducer []>(EMPTY );
346+ this .producers = new OpenHashSet <InnerProducer <T >>();
347+ this .producersCache = EMPTY ;
333348 this .shouldConnect = new AtomicBoolean ();
334349 // make sure the source doesn't produce values until the child subscribers
335350 // expressed their request amounts
@@ -340,7 +355,15 @@ void init() {
340355 add (Subscriptions .create (new Action0 () {
341356 @ Override
342357 public void call () {
343- ReplaySubscriber .this .producers .getAndSet (TERMINATED );
358+ if (!terminated ) {
359+ synchronized (producers ) {
360+ if (!terminated ) {
361+ producers .terminate ();
362+ producersVersion ++;
363+ terminated = true ;
364+ }
365+ }
366+ }
344367 // unlike OperatorPublish, we can't null out the terminated so
345368 // late subscribers can still get replay
346369 // current.compareAndSet(ReplaySubscriber.this, null);
@@ -359,76 +382,34 @@ boolean add(InnerProducer<T> producer) {
359382 if (producer == null ) {
360383 throw new NullPointerException ();
361384 }
362- // the state can change so we do a CAS loop to achieve atomicity
363- for (;;) {
364- // get the current producer array
365- InnerProducer [] c = producers .get ();
366- // if this subscriber-to-source reached a terminal state by receiving
367- // an onError or onCompleted, just refuse to add the new producer
368- if (c == TERMINATED ) {
385+ if (terminated ) {
386+ return false ;
387+ }
388+ synchronized (producers ) {
389+ if (terminated ) {
369390 return false ;
370391 }
371- // we perform a copy-on-write logic
372- int len = c .length ;
373- InnerProducer [] u = new InnerProducer [len + 1 ];
374- System .arraycopy (c , 0 , u , 0 , len );
375- u [len ] = producer ;
376- // try setting the producers array
377- if (producers .compareAndSet (c , u )) {
378- return true ;
379- }
380- // if failed, some other operation succeeded (another add, remove or termination)
381- // so retry
392+
393+ producers .add (producer );
394+ producersVersion ++;
382395 }
396+ return true ;
383397 }
384398
385399 /**
386400 * Atomically removes the given producer from the producers array.
387401 * @param producer the producer to remove
388402 */
389403 void remove (InnerProducer <T > producer ) {
390- // the state can change so we do a CAS loop to achieve atomicity
391- for (;;) {
392- // let's read the current producers array
393- InnerProducer [] c = producers .get ();
394- // if it is either empty or terminated, there is nothing to remove so we quit
395- if (c == EMPTY || c == TERMINATED ) {
396- return ;
397- }
398- // let's find the supplied producer in the array
399- // although this is O(n), we don't expect too many child subscribers in general
400- int j = -1 ;
401- int len = c .length ;
402- for (int i = 0 ; i < len ; i ++) {
403- if (c [i ].equals (producer )) {
404- j = i ;
405- break ;
406- }
407- }
408- // we didn't find it so just quit
409- if (j < 0 ) {
410- return ;
411- }
412- // we do copy-on-write logic here
413- InnerProducer [] u ;
414- // we don't create a new empty array if producer was the single inhabitant
415- // but rather reuse an empty array
416- if (len == 1 ) {
417- u = EMPTY ;
418- } else {
419- // otherwise, create a new array one less in size
420- u = new InnerProducer [len - 1 ];
421- // copy elements being before the given producer
422- System .arraycopy (c , 0 , u , 0 , j );
423- // copy elements being after the given producer
424- System .arraycopy (c , j + 1 , u , j , len - j - 1 );
425- }
426- // try setting this new array as
427- if (producers .compareAndSet (c , u )) {
404+ if (terminated ) {
405+ return ;
406+ }
407+ synchronized (producers ) {
408+ if (terminated ) {
428409 return ;
429410 }
430- // if we failed, it means something else happened
431- // (a concurrent add/remove or termination), we need to retry
411+ producers . remove ( producer );
412+ producersVersion ++;
432413 }
433414 }
434415
@@ -439,7 +420,7 @@ public void setProducer(Producer p) {
439420 throw new IllegalStateException ("Only a single producer can be set on a Subscriber." );
440421 }
441422 producer = p ;
442- manageRequests ();
423+ manageRequests (null );
443424 replay ();
444425 }
445426
@@ -482,81 +463,157 @@ public void onCompleted() {
482463 /**
483464 * Coordinates the request amounts of various child Subscribers.
484465 */
485- void manageRequests () {
466+ void manageRequests (InnerProducer < T > inner ) {
486467 // if the upstream has completed, no more requesting is possible
487468 if (isUnsubscribed ()) {
488469 return ;
489470 }
490471 synchronized (this ) {
491472 if (emitting ) {
473+ if (inner != null ) {
474+ List <InnerProducer <T >> q = coordinationQueue ;
475+ if (q == null ) {
476+ q = new ArrayList <InnerProducer <T >>();
477+ coordinationQueue = q ;
478+ }
479+ q .add (inner );
480+ } else {
481+ coordinateAll = true ;
482+ }
492483 missed = true ;
493484 return ;
494485 }
495486 emitting = true ;
496487 }
488+
489+ long ri = maxChildRequested ;
490+ long maxTotalRequested ;
491+
492+ if (inner != null ) {
493+ maxTotalRequested = Math .max (ri , inner .totalRequested .get ());
494+ } else {
495+ maxTotalRequested = ri ;
496+
497+ InnerProducer <T >[] a = copyProducers ();
498+ for (InnerProducer <T > rp : a ) {
499+ if (rp != null ) {
500+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
501+ }
502+ }
503+
504+ }
505+ makeRequest (maxTotalRequested , ri );
506+
497507 for (;;) {
498508 // if the upstream has completed, no more requesting is possible
499509 if (isUnsubscribed ()) {
500510 return ;
501511 }
502512
503- @ SuppressWarnings ("unchecked" )
504- InnerProducer <T >[] a = producers .get ();
505-
506- long ri = maxChildRequested ;
507- long maxTotalRequests = ri ;
508-
509- for (InnerProducer <T > rp : a ) {
510- maxTotalRequests = Math .max (maxTotalRequests , rp .totalRequested .get ());
513+ List <InnerProducer <T >> q ;
514+ boolean all ;
515+ synchronized (this ) {
516+ if (!missed ) {
517+ emitting = false ;
518+ return ;
519+ }
520+ missed = false ;
521+ q = coordinationQueue ;
522+ coordinationQueue = null ;
523+ all = coordinateAll ;
524+ coordinateAll = false ;
511525 }
512526
513- long ur = maxUpstreamRequested ;
514- Producer p = producer ;
527+ ri = maxChildRequested ;
528+ maxTotalRequested = ri ;
515529
516- long diff = maxTotalRequests - ri ;
517- if (diff != 0 ) {
518- maxChildRequested = maxTotalRequests ;
519- if (p != null ) {
520- if (ur != 0L ) {
521- maxUpstreamRequested = 0L ;
522- p .request (ur + diff );
523- } else {
524- p .request (diff );
525- }
526- } else {
527- // collect upstream request amounts until there is a producer for them
528- long u = ur + diff ;
529- if (u < 0 ) {
530- u = Long .MAX_VALUE ;
530+ if (q != null ) {
531+ for (InnerProducer <T > rp : q ) {
532+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
533+ }
534+ }
535+
536+ if (all ) {
537+ InnerProducer <T >[] a = copyProducers ();
538+ for (InnerProducer <T > rp : a ) {
539+ if (rp != null ) {
540+ maxTotalRequested = Math .max (maxTotalRequested , rp .totalRequested .get ());
531541 }
532- maxUpstreamRequested = u ;
533542 }
534- } else
535- // if there were outstanding upstream requests and we have a producer
536- if (ur != 0L && p != null ) {
537- maxUpstreamRequested = 0L ;
538- // fire the accumulated requests
539- p .request (ur );
540543 }
541544
542- synchronized (this ) {
543- if (!missed ) {
544- emitting = false ;
545- return ;
545+ makeRequest (maxTotalRequested , ri );
546+ }
547+ }
548+
549+ InnerProducer <T >[] copyProducers () {
550+ synchronized (producers ) {
551+ Object [] a = producers .values ();
552+ int n = a .length ;
553+ @ SuppressWarnings ("unchecked" )
554+ InnerProducer <T >[] result = new InnerProducer [n ];
555+ System .arraycopy (a , 0 , result , 0 , n );
556+ return result ;
557+ }
558+ }
559+
560+ void makeRequest (long maxTotalRequests , long previousTotalRequests ) {
561+ long ur = maxUpstreamRequested ;
562+ Producer p = producer ;
563+
564+ long diff = maxTotalRequests - previousTotalRequests ;
565+ if (diff != 0 ) {
566+ maxChildRequested = maxTotalRequests ;
567+ if (p != null ) {
568+ if (ur != 0L ) {
569+ maxUpstreamRequested = 0L ;
570+ p .request (ur + diff );
571+ } else {
572+ p .request (diff );
546573 }
547- missed = false ;
574+ } else {
575+ // collect upstream request amounts until there is a producer for them
576+ long u = ur + diff ;
577+ if (u < 0 ) {
578+ u = Long .MAX_VALUE ;
579+ }
580+ maxUpstreamRequested = u ;
548581 }
582+ } else
583+ // if there were outstanding upstream requests and we have a producer
584+ if (ur != 0L && p != null ) {
585+ maxUpstreamRequested = 0L ;
586+ // fire the accumulated requests
587+ p .request (ur );
549588 }
550589 }
551590
552591 /**
553592 * Tries to replay the buffer contents to all known subscribers.
554593 */
594+ @ SuppressWarnings ("unchecked" )
555595 void replay () {
556- @ SuppressWarnings ("unchecked" )
557- InnerProducer <T >[] a = producers .get ();
558- for (InnerProducer <T > rp : a ) {
559- buffer .replay (rp );
596+ InnerProducer <T >[] pc = producersCache ;
597+ if (producersCacheVersion != producersVersion ) {
598+ synchronized (producers ) {
599+ pc = producersCache ;
600+ // if the producers hasn't changed do nothing
601+ // otherwise make a copy of the current set of producers
602+ Object [] a = producers .values ();
603+ int n = a .length ;
604+ if (pc .length != n ) {
605+ pc = new InnerProducer [n ];
606+ producersCache = pc ;
607+ }
608+ System .arraycopy (a , 0 , pc , 0 , n );
609+ producersCacheVersion = producersVersion ;
610+ }
611+ }
612+ ReplayBuffer <T > b = buffer ;
613+ for (InnerProducer <T > rp : pc ) {
614+ if (rp != null ) {
615+ b .replay (rp );
616+ }
560617 }
561618 }
562619 }
@@ -635,7 +692,7 @@ public void request(long n) {
635692 addTotalRequested (n );
636693 // if successful, notify the parent dispatcher this child can receive more
637694 // elements
638- parent .manageRequests ();
695+ parent .manageRequests (this );
639696
640697 parent .buffer .replay (this );
641698 return ;
@@ -716,7 +773,7 @@ public void unsubscribe() {
716773 // let's assume this child had 0 requested before the unsubscription while
717774 // the others had non-zero. By removing this 'blocking' child, the others
718775 // are now free to receive events
719- parent .manageRequests ();
776+ parent .manageRequests (this );
720777 }
721778 }
722779 }
@@ -856,8 +913,6 @@ public void replay(InnerProducer<T> output) {
856913
857914 /**
858915 * Represents a node in a bounded replay buffer's linked list.
859- *
860- * @param <T> the contained value type
861916 */
862917 static final class Node extends AtomicReference <Node > {
863918 /** */
0 commit comments