Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build-env/install-etcd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Fail on any error
set -e

ETCD_VERSION=v3.3.18
ETCD_VERSION=v3.3.22

INSTALL_DIR="${1:-$HOME/etcd}"

Expand Down
18 changes: 9 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@
</distributionManagement>

<properties>
<grpc-version>1.26.0</grpc-version>
<netty-version>4.1.42.Final</netty-version>
<netty-tcnative-version>2.0.26.Final</netty-tcnative-version>
<protoc-version>3.11.0</protoc-version>
<grpc-version>1.30.0</grpc-version>
<netty-version>4.1.48.Final</netty-version>
<netty-tcnative-version>2.0.30.Final</netty-tcnative-version>
<protobuf-version>3.12.2</protobuf-version>
<gson-version>2.8.6</gson-version>
<slf4j-version>1.7.30</slf4j-version>
<junit-version>4.13</junit-version>
<guava-version>28.2-jre</guava-version>
<annotation-version>1.3.2</annotation-version>
<guava-version>29.0-jre</guava-version>
<annotation-version>6.0.53</annotation-version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
Expand Down Expand Up @@ -129,8 +129,8 @@
gRPC stubs. "provided" scope is sufficient since this
annotation only has source retention. -->
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>${annotation-version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -163,7 +163,7 @@
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc-version}:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:${protobuf-version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc-version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
Expand Down
103 changes: 100 additions & 3 deletions src/main/java/com/ibm/etcd/client/EtcdClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.net.ssl.TrustManagerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
Expand All @@ -64,6 +65,7 @@

import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
Expand All @@ -82,6 +84,7 @@
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.FastThreadLocalThread;

Expand Down Expand Up @@ -131,18 +134,34 @@ public static class Builder {
this.chanBuilder = chanBuilder;
}

/**
* Set etcd credentials to use from {@link ByteString}s
*
* @param name
* @param password
*/
public Builder withCredentials(ByteString name, ByteString password) {
this.name = name;
this.password = password;
return this;
}

/**
* Set etcd credentials to use from {@link String}s as UTF-8.
*
* @param name
* @param password
*/
public Builder withCredentials(String name, String password) {
this.name = ByteString.copyFromUtf8(name);
this.password = ByteString.copyFromUtf8(password);
return this;
}

/**
* Attempt authentication immediately rather than if/when required.
* Applies only if etcd credentials have been provided.
*/
public Builder withImmediateAuth() {
preemptAuth = true;
return this;
Expand All @@ -156,16 +175,41 @@ public Builder withThreadCount(int threads) {
return this;
}

/**
* Control whether all RPC requests are sent via the underlying
* IO event loop. This helps to limit overall memory use due to
* internal thread-local buffer pooling.
* <p>
* Default is <code>true</code>
*
* @param sendViaEventLoop
*/
public Builder sendViaEventLoop(boolean sendViaEventLoop) {
this.sendViaEventLoop = sendViaEventLoop;
return this;
}

/**
* Provide executor to use for user call-backs. A default
* client-scoped executor will be used if not set.
*
* @param executor
*/
public Builder withUserExecutor(Executor executor) {
this.executor = executor;
return this;
}

/**
* Provide a default timeout to use for requests made by this client.
* This timeout value is used per-attempt, unlike {@link Deadline}s
* used per-call which also cover any/all retry attempts.
*
* The default for this default is 10 seconds
*
* @param value
* @param unit
*/
public Builder withDefaultTimeout(long value, TimeUnit unit) {
this.defaultTimeoutMs = TimeUnit.MILLISECONDS.convert(value, unit);
return this;
Expand All @@ -176,43 +220,80 @@ private SslContextBuilder sslBuilder() {
: (sslContextBuilder = GrpcSslContexts.forClient());
}

/**
* Disable TLS - to connect to insecure servers in development contexts
*/
public Builder withPlainText() {
chanBuilder.usePlaintext();
return this;
}

/**
* Provide CA certificate to use for TLS connection
*
* @param certSource
* @throws IOException if there is an error reading from the provided {@link ByteSource}
* @throws SSLException
*/
public Builder withCaCert(ByteSource certSource) throws IOException, SSLException {
try (InputStream cert = certSource.openStream()) {
chanBuilder.sslContext(sslBuilder().trustManager(cert).build());
}
return this;
}

/**
* Provide custom {@link TrustManagerFactory} to use for this
* client's TLS connection.
*
* @param tmf
* @throws SSLException
*/
public Builder withTrustManager(TrustManagerFactory tmf) throws SSLException {
chanBuilder.sslContext(sslBuilder().trustManager(tmf).build());
return this;
}

/**
* Configure the netty {@link SslContext} to be used by this client.
* The provided {@link Consumer} should make updates to the passed
* {@link SslContextBuilder} as needed, but should not call build().
*
* @param contextBuilder
* @throws SSLException
*/
public Builder withTlsConfig(Consumer<SslContextBuilder> contextBuilder) throws SSLException {
SslContextBuilder sslBuilder = sslBuilder();
contextBuilder.accept(sslBuilder);
chanBuilder.sslContext(sslBuilder.build());
return this;
}

/**
* Set the session timeout in seconds - this corresponds to the TTL of the
* session lease, see {@link EtcdClient#getSessionLease()}.
*
* @param timeoutSecs
*/
public Builder withSessionTimeoutSecs(int timeoutSecs) {
if (timeoutSecs < 1) {
throw new IllegalArgumentException("invalid session timeout: " + timeoutSecs);
}
Preconditions.checkArgument(timeoutSecs < 1, "invalid session timeout: %s", timeoutSecs);
this.sessTimeoutSecs = timeoutSecs;
return this;
}

/**
* Set the maximum inbound message size in bytes
*
* @param sizeInBytes
*/
public Builder withMaxInboundMessageSize(int sizeInBytes) {
chanBuilder.maxInboundMessageSize(sizeInBytes);
return this;
}

/**
* @return the built {@link EtcdClient} instance
*/
public EtcdClient build() {
return new EtcdClient(chanBuilder, defaultTimeoutMs, name, password,
preemptAuth, threads, executor, sendViaEventLoop, sessTimeoutSecs);
Expand All @@ -223,18 +304,34 @@ private static int defaultThreadCount() {
return Math.min(6, Runtime.getRuntime().availableProcessors());
}

/**
*
* @param host
* @param port
* @return
*/
public static Builder forEndpoint(String host, int port) {
String target = GrpcUtil.authorityFromHostAndPort(host, port);
return new Builder(NettyChannelBuilder.forTarget(target));
}

/**
*
* @param endpoints
* @return
*/
public static Builder forEndpoints(List<String> endpoints) {
NettyChannelBuilder ncb = NettyChannelBuilder
.forTarget(StaticEtcdNameResolverFactory.ETCD)
.nameResolverFactory(new StaticEtcdNameResolverFactory(endpoints));
return new Builder(ncb);
}

/**
*
* @param endpoints
* @return
*/
public static Builder forEndpoints(String endpoints) {
return forEndpoints(Arrays.asList(endpoints.split(",")));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/ibm/etcd/client/FluentRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public final ListenableFuture<RespT> async() {
}
@Override
public final RespT sync() {
return GrpcClient.waitFor(this::async);
return client.waitForCall(this::async);
}
}
}
Loading