Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions src/main/java/com/ibm/etcd/client/utils/RangeCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,13 @@

import javax.annotation.concurrent.GuardedBy;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.ibm.etcd.client.EtcdClient;
import com.ibm.etcd.client.FutureListener;
Expand Down Expand Up @@ -210,22 +208,46 @@ protected ListenableFuture<Boolean> fullRefreshCache() {
.map(ResponseOp::getResponseRange)
.collect(Collectors.toList()), directExecutor());
}

final SettableFuture<Boolean> promise = new SettableFuture<Boolean>() {

final class StartPromise extends SettableFuture<Boolean> implements Runnable {
// Reference to parent future (etcd range request) held here just
// to be able to propagate cancellation if that happens before it completes.
volatile ListenableFuture<?> parent;
StartPromise(ListenableFuture<?> parent) {
this.parent = parent;
parent.addListener(this, directExecutor());
}
@Override
protected void interruptTask() {
if (rrfut.cancel(true)) {
public void run() {
// Ensure that we remove our reference to the "parent" future as soon
// as it completes, since RangeCache stores this promise in the
// startFuture field, and we want to allow the original future and its
// possibly-large value to be garbage collected.
parent = null;
}
@Override
protected void afterDone() {
if (!wasInterrupted()) {
return;
}
// propagate cancellation
final ListenableFuture<?> p = parent;
if (p != null && p.cancel(true)) {
return;
}
Watch theWatch;
synchronized(RangeCache.this) {
theWatch = watch;
closed = true;
Comment thread
njhill marked this conversation as resolved.
}
if (theWatch != null) {
theWatch.close();
}
}
};
}

final SettableFuture<Boolean> promise = new StartPromise(rrfut);

Futures.addCallback(rrfut, (FutureListener<List<RangeResponse>>) (rrs, err) -> {
if (rrs != null) try {
setupWatch(rrs, firstTime, promise);
Expand Down Expand Up @@ -323,7 +345,6 @@ public void onError(Throwable t) {
promise.setException(new CancellationException());
return;
}
assert startFuture == promise;
boolean isDone = promise.isDone();
if (isDone && promise.isCancelled()) {
return;
Expand Down Expand Up @@ -651,7 +672,7 @@ public boolean keyExistsRemote(ByteString key) {
/**
* Stores result of put operations
*/
public static class PutResult {
public static final class PutResult {
private final boolean succ;
private final KeyValue kv;
public PutResult(boolean success, KeyValue kv) {
Expand Down Expand Up @@ -1020,7 +1041,7 @@ public boolean isClosed() {

static class SettableFuture<V> extends AbstractFuture<V> {
@Override
public boolean set(@Nullable V value) {
public boolean set(V value) {
return super.set(value);
}
@Override
Expand Down