Skip to content

Commit e824aff

Browse files
committed
2.x: Completable type for onError|onComplete only flows + unit tests
1 parent aa400d1 commit e824aff

13 files changed

+6636
-0
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 2251 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.*;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.Completable.*;
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.exceptions.MissingBackpressureException;
24+
import io.reactivex.internal.disposables.SerialResource;
25+
import io.reactivex.internal.queue.SpscArrayQueue;
26+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
27+
import io.reactivex.plugins.RxJavaPlugins;
28+
29+
public final class CompletableOnSubscribeConcat implements CompletableOnSubscribe {
30+
final Observable<? extends Completable> sources;
31+
final int prefetch;
32+
33+
public CompletableOnSubscribeConcat(Observable<? extends Completable> sources, int prefetch) {
34+
this.sources = sources;
35+
this.prefetch = prefetch;
36+
}
37+
38+
@Override
39+
public void accept(CompletableSubscriber s) {
40+
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
41+
sources.subscribe(parent);
42+
}
43+
44+
static final class CompletableConcatSubscriber
45+
extends AtomicInteger
46+
implements Subscriber<Completable>, Disposable {
47+
/** */
48+
private static final long serialVersionUID = 7412667182931235013L;
49+
final CompletableSubscriber actual;
50+
final int prefetch;
51+
final SerialResource<Disposable> sr;
52+
53+
final SpscArrayQueue<Completable> queue;
54+
55+
Subscription s;
56+
57+
volatile boolean done;
58+
59+
volatile int once;
60+
static final AtomicIntegerFieldUpdater<CompletableConcatSubscriber> ONCE =
61+
AtomicIntegerFieldUpdater.newUpdater(CompletableConcatSubscriber.class, "once");
62+
63+
final ConcatInnerSubscriber inner;
64+
65+
public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
66+
this.actual = actual;
67+
this.prefetch = prefetch;
68+
this.queue = new SpscArrayQueue<>(prefetch);
69+
this.sr = new SerialResource<>(Disposable::dispose);
70+
this.inner = new ConcatInnerSubscriber();
71+
}
72+
73+
@Override
74+
public void onSubscribe(Subscription s) {
75+
if (SubscriptionHelper.validateSubscription(this.s, s)) {
76+
return;
77+
}
78+
this.s = s;
79+
actual.onSubscribe(this);
80+
s.request(prefetch);
81+
}
82+
83+
@Override
84+
public void onNext(Completable t) {
85+
if (!queue.offer(t)) {
86+
onError(new MissingBackpressureException());
87+
return;
88+
}
89+
if (getAndIncrement() == 0) {
90+
next();
91+
}
92+
}
93+
94+
@Override
95+
public void onError(Throwable t) {
96+
if (ONCE.compareAndSet(this, 0, 1)) {
97+
actual.onError(t);
98+
return;
99+
}
100+
RxJavaPlugins.onError(t);
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
if (done) {
106+
return;
107+
}
108+
done = true;
109+
if (getAndIncrement() == 0) {
110+
next();
111+
}
112+
}
113+
114+
void innerError(Throwable e) {
115+
s.cancel();
116+
onError(e);
117+
}
118+
119+
void innerComplete() {
120+
if (decrementAndGet() != 0) {
121+
next();
122+
}
123+
if (!done) {
124+
s.request(1);
125+
}
126+
}
127+
128+
@Override
129+
public void dispose() {
130+
s.cancel();
131+
sr.dispose();
132+
}
133+
134+
void next() {
135+
boolean d = done;
136+
Completable c = queue.poll();
137+
if (c == null) {
138+
if (d) {
139+
if (ONCE.compareAndSet(this, 0, 1)) {
140+
actual.onComplete();
141+
}
142+
return;
143+
}
144+
RxJavaPlugins.onError(new IllegalStateException("Queue is empty?!"));
145+
return;
146+
}
147+
148+
c.subscribe(inner);
149+
}
150+
151+
final class ConcatInnerSubscriber implements CompletableSubscriber {
152+
@Override
153+
public void onSubscribe(Disposable d) {
154+
sr.set(d);
155+
}
156+
157+
@Override
158+
public void onError(Throwable e) {
159+
innerError(e);
160+
}
161+
162+
@Override
163+
public void onComplete() {
164+
innerComplete();
165+
}
166+
}
167+
}
168+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
import io.reactivex.Completable;
19+
import io.reactivex.Completable.*;
20+
import io.reactivex.disposables.*;
21+
22+
public final class CompletableOnSubscribeConcatArray implements CompletableOnSubscribe {
23+
final Completable[] sources;
24+
25+
public CompletableOnSubscribeConcatArray(Completable[] sources) {
26+
this.sources = sources;
27+
}
28+
29+
@Override
30+
public void accept(CompletableSubscriber s) {
31+
ConcatInnerSubscriber inner = new ConcatInnerSubscriber(s, sources);
32+
s.onSubscribe(inner.sd);
33+
inner.next();
34+
}
35+
36+
static final class ConcatInnerSubscriber extends AtomicInteger implements CompletableSubscriber {
37+
/** */
38+
private static final long serialVersionUID = -7965400327305809232L;
39+
40+
final CompletableSubscriber actual;
41+
final Completable[] sources;
42+
43+
int index;
44+
45+
final SerialDisposable sd;
46+
47+
public ConcatInnerSubscriber(CompletableSubscriber actual, Completable[] sources) {
48+
this.actual = actual;
49+
this.sources = sources;
50+
this.sd = new SerialDisposable();
51+
}
52+
53+
@Override
54+
public void onSubscribe(Disposable d) {
55+
sd.set(d);
56+
}
57+
58+
@Override
59+
public void onError(Throwable e) {
60+
actual.onError(e);
61+
}
62+
63+
@Override
64+
public void onComplete() {
65+
next();
66+
}
67+
68+
void next() {
69+
if (sd.isDisposed()) {
70+
return;
71+
}
72+
73+
if (getAndIncrement() != 0) {
74+
return;
75+
}
76+
77+
Completable[] a = sources;
78+
do {
79+
if (sd.isDisposed()) {
80+
return;
81+
}
82+
83+
int idx = index++;
84+
if (idx == a.length) {
85+
actual.onComplete();
86+
return;
87+
}
88+
89+
a[idx].subscribe(this);
90+
} while (decrementAndGet() != 0);
91+
}
92+
}
93+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import java.util.Iterator;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
import io.reactivex.Completable;
20+
import io.reactivex.Completable.*;
21+
import io.reactivex.disposables.*;
22+
import io.reactivex.internal.disposables.EmptyDisposable;
23+
24+
public final class CompletableOnSubscribeConcatIterable implements CompletableOnSubscribe {
25+
final Iterable<? extends Completable> sources;
26+
27+
public CompletableOnSubscribeConcatIterable(Iterable<? extends Completable> sources) {
28+
this.sources = sources;
29+
}
30+
31+
@Override
32+
public void accept(CompletableSubscriber s) {
33+
34+
Iterator<? extends Completable> it;
35+
36+
try {
37+
it = sources.iterator();
38+
} catch (Throwable e) {
39+
s.onSubscribe(EmptyDisposable.INSTANCE);
40+
s.onError(e);
41+
return;
42+
}
43+
44+
if (it == null) {
45+
s.onSubscribe(EmptyDisposable.INSTANCE);
46+
s.onError(new NullPointerException("The iterator returned is null"));
47+
return;
48+
}
49+
50+
ConcatInnerSubscriber inner = new ConcatInnerSubscriber(s, it);
51+
s.onSubscribe(inner.sd);
52+
inner.next();
53+
}
54+
55+
static final class ConcatInnerSubscriber extends AtomicInteger implements CompletableSubscriber {
56+
/** */
57+
private static final long serialVersionUID = -7965400327305809232L;
58+
59+
final CompletableSubscriber actual;
60+
final Iterator<? extends Completable> sources;
61+
62+
int index;
63+
64+
final SerialDisposable sd;
65+
66+
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends Completable> sources) {
67+
this.actual = actual;
68+
this.sources = sources;
69+
this.sd = new SerialDisposable();
70+
}
71+
72+
@Override
73+
public void onSubscribe(Disposable d) {
74+
sd.set(d);
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
actual.onError(e);
80+
}
81+
82+
@Override
83+
public void onComplete() {
84+
next();
85+
}
86+
87+
void next() {
88+
if (sd.isDisposed()) {
89+
return;
90+
}
91+
92+
if (getAndIncrement() != 0) {
93+
return;
94+
}
95+
96+
Iterator<? extends Completable> a = sources;
97+
do {
98+
if (sd.isDisposed()) {
99+
return;
100+
}
101+
102+
boolean b;
103+
try {
104+
b = a.hasNext();
105+
} catch (Throwable ex) {
106+
actual.onError(ex);
107+
return;
108+
}
109+
110+
if (!b) {
111+
actual.onComplete();
112+
return;
113+
}
114+
115+
Completable c;
116+
117+
try {
118+
c = a.next();
119+
} catch (Throwable ex) {
120+
actual.onError(ex);
121+
return;
122+
}
123+
124+
if (c == null) {
125+
actual.onError(new NullPointerException("The completable returned is null"));
126+
return;
127+
}
128+
129+
c.subscribe(this);
130+
} while (decrementAndGet() != 0);
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)