Skip to content

Commit 82c354e

Browse files
authored
HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner next calls (#5402)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent ff09667 commit 82c354e

10 files changed

Lines changed: 247 additions & 62 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@ public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableN
6666
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
6767
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
6868
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
69-
Map<String, byte[]> requestAttributes) throws IOException {
69+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
70+
throws IOException {
7071
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
71-
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
72+
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
73+
connectionConfiguration, requestAttributes);
7274
exceptionsQueue = new ConcurrentLinkedQueue<>();
7375
final Context context = Context.current();
7476
final Runnable runnable = context.wrap(new PrefetchRunnable());

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.commons.lang3.mutable.MutableBoolean;
3434
import org.apache.hadoop.conf.Configuration;
3535
import org.apache.hadoop.hbase.DoNotRetryIOException;
36-
import org.apache.hadoop.hbase.HConstants;
3736
import org.apache.hadoop.hbase.HRegionInfo;
3837
import org.apache.hadoop.hbase.NotServingRegionException;
3938
import org.apache.hadoop.hbase.TableName;
@@ -78,6 +77,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
7877
protected final TableName tableName;
7978
protected final int readRpcTimeout;
8079
protected final int scannerTimeout;
80+
private final boolean useScannerTimeoutForNextCalls;
8181
protected boolean scanMetricsPublished = false;
8282
protected RpcRetryingCaller<Result[]> caller;
8383
protected RpcControllerFactory rpcControllerFactory;
@@ -104,7 +104,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
104104
public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
105105
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
106106
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
107-
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
107+
int scannerTimeout, int primaryOperationTimeout,
108+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
108109
throws IOException {
109110
if (LOG.isTraceEnabled()) {
110111
LOG.trace(
@@ -116,16 +117,15 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
116117
this.connection = connection;
117118
this.pool = pool;
118119
this.primaryOperationTimeout = primaryOperationTimeout;
119-
this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
120-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
120+
this.retries = connectionConfiguration.getRetriesNumber();
121121
if (scan.getMaxResultSize() > 0) {
122122
this.maxScannerResultSize = scan.getMaxResultSize();
123123
} else {
124-
this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
125-
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
124+
this.maxScannerResultSize = connectionConfiguration.getScannerMaxResultSize();
126125
}
127126
this.readRpcTimeout = scanReadRpcTimeout;
128127
this.scannerTimeout = scannerTimeout;
128+
this.useScannerTimeoutForNextCalls = connectionConfiguration.isUseScannerTimeoutForNextCalls();
129129
this.requestAttributes = requestAttributes;
130130

131131
// check if application wants to collect scan metrics
@@ -135,8 +135,7 @@ public ClientScanner(final Configuration conf, final Scan scan, final TableName
135135
if (this.scan.getCaching() > 0) {
136136
this.caching = this.scan.getCaching();
137137
} else {
138-
this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
139-
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
138+
this.caching = connectionConfiguration.getScannerCaching();
140139
}
141140

142141
this.caller = rpcFactory.<Result[]> newCaller();
@@ -255,7 +254,7 @@ protected boolean moveToNextRegion() {
255254
this.currentRegion = null;
256255
this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(),
257256
createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout,
258-
scannerTimeout, caching, conf, caller);
257+
scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
259258
this.callable.setCaching(this.caching);
260259
incRegionCountMetrics(scanMetrics);
261260
return true;

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ public ClientSimpleScanner(Configuration configuration, Scan scan, TableName nam
3939
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
4040
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout,
4141
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
42-
Map<String, byte[]> requestAttributes) throws IOException {
42+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
43+
throws IOException {
4344
super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
44-
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan, requestAttributes);
45+
scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
46+
connectionConfiguration, requestAttributes);
4547
}
4648

4749
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ public class ConnectionConfiguration {
7676
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
7777
"hbase.client.meta.scanner.timeout.period";
7878

79+
public static final String HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS =
80+
"hbase.client.use.scanner.timeout.period.for.next.calls";
81+
82+
public static final boolean HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT =
83+
false;
84+
7985
private final long writeBufferSize;
8086
private final long writeBufferPeriodicFlushTimeoutMs;
8187
private final long writeBufferPeriodicFlushTimerTickMs;
@@ -99,6 +105,7 @@ public class ConnectionConfiguration {
99105
private final boolean clientScannerAsyncPrefetch;
100106
private final long pauseMs;
101107
private final long pauseMsForServerOverloaded;
108+
private final boolean useScannerTimeoutForNextCalls;
102109

103110
/**
104111
* Constructor
@@ -158,6 +165,9 @@ public class ConnectionConfiguration {
158165
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
159166

160167
this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout);
168+
this.useScannerTimeoutForNextCalls =
169+
conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
170+
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT);
161171

162172
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE);
163173
long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
@@ -201,6 +211,8 @@ protected ConnectionConfiguration() {
201211
this.metaScanTimeout = scanTimeout;
202212
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
203213
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
214+
this.useScannerTimeoutForNextCalls =
215+
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT;
204216
}
205217

206218
public int getReadRpcTimeout() {
@@ -275,6 +287,10 @@ public int getScanTimeout() {
275287
return scanTimeout;
276288
}
277289

290+
public boolean isUseScannerTimeoutForNextCalls() {
291+
return useScannerTimeoutForNextCalls;
292+
}
293+
278294
public int getMetaScanTimeout() {
279295
return metaScanTimeout;
280296
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,7 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
10521052
ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME,
10531053
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
10541054
connectionConfig.getMetaReadRpcTimeout(), connectionConfig.getMetaScanTimeout(),
1055-
metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
1055+
metaReplicaCallTimeoutScanInMicroSecond, connectionConfig, Collections.emptyMap())) {
10561056
boolean tableNotFound = true;
10571057
for (;;) {
10581058
Result regionInfoRow = rcs.next();

hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,16 +321,16 @@ public ResultScanner getScanner(Scan scan) throws IOException {
321321
if (scan.isReversed()) {
322322
return new ReversedClientScanner(getConfiguration(), scan, getName(), connection,
323323
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
324-
replicaTimeout, requestAttributes);
324+
replicaTimeout, connConfiguration, requestAttributes);
325325
} else {
326326
if (async) {
327327
return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection,
328328
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
329-
replicaTimeout, requestAttributes);
329+
replicaTimeout, connConfiguration, requestAttributes);
330330
} else {
331331
return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection,
332332
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout,
333-
replicaTimeout, requestAttributes);
333+
replicaTimeout, connConfiguration, requestAttributes);
334334
}
335335
}
336336
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
4040
public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName,
4141
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
4242
RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout,
43-
int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]> requestAttributes)
43+
int scannerTimeout, int primaryOperationTimeout,
44+
ConnectionConfiguration connectionConfiguration, Map<String, byte[]> requestAttributes)
4445
throws IOException {
4546
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
46-
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, requestAttributes);
47+
scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout, connectionConfiguration,
48+
requestAttributes);
4749
}
4850

4951
@Override

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
5757
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
5858
private final ClusterConnection cConnection;
5959
protected final ExecutorService pool;
60+
private final boolean useScannerTimeoutForNextCalls;
6061
protected final int timeBeforeReplicas;
6162
private final Scan scan;
6263
private final int retries;
@@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
7273

7374
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
7475
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
75-
int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf,
76-
RpcRetryingCaller<Result[]> caller) {
76+
int retries, int readRpcTimeout, int scannerTimeout, boolean useScannerTimeoutForNextCalls,
77+
int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
7778
this.currentScannerCallable = baseCallable;
7879
this.cConnection = cConnection;
7980
this.pool = pool;
81+
this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
8082
if (timeBeforeReplicas < 0) {
8183
throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
8284
}
@@ -187,9 +189,12 @@ public Result[] call(int timeout) throws IOException {
187189
pool, regionReplication * 5);
188190

189191
AtomicBoolean done = new AtomicBoolean(false);
192+
// make sure we use the same rpcTimeout for current and other replicas
193+
int rpcTimeoutForCall = getRpcTimeout();
194+
190195
replicaSwitched.set(false);
191196
// submit call for the primary replica or user specified replica
192-
addCallsForCurrentReplica(cs);
197+
addCallsForCurrentReplica(cs, rpcTimeoutForCall);
193198
int startIndex = 0;
194199

195200
try {
@@ -234,7 +239,7 @@ public Result[] call(int timeout) throws IOException {
234239
endIndex = 1;
235240
} else {
236241
// TODO: this may be an overkill for large region replication
237-
addCallsForOtherReplicas(cs, 0, regionReplication - 1);
242+
addCallsForOtherReplicas(cs, 0, regionReplication - 1, rpcTimeoutForCall);
238243
}
239244

240245
try {
@@ -326,15 +331,41 @@ public Cursor getCursor() {
326331
return currentScannerCallable != null ? currentScannerCallable.getCursor() : null;
327332
}
328333

329-
private void
330-
addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
334+
private void addCallsForCurrentReplica(
335+
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int rpcTimeout) {
331336
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
332337
outstandingCallables.add(currentScannerCallable);
333-
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id);
338+
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, currentScannerCallable.id);
339+
}
340+
341+
/**
342+
* As we have a call sequence for scan, it is useless to have a different rpc timeout which is
343+
* less than the scan timeout. If the server does not respond in time(usually this will not happen
344+
* as we have heartbeat now), we will get an OutOfOrderScannerNextException when resending the
345+
* next request and the only way to fix this is to close the scanner and open a new one.
346+
* <p>
347+
* The legacy behavior of ScannerCallable has been to use readRpcTimeout despite the above. If
348+
* using legacy behavior, we always use that.
349+
* <p>
350+
* If new behavior is enabled, we determine the rpc timeout to use based on whether the scanner is
351+
* open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
352+
*/
353+
private int getRpcTimeout() {
354+
if (useScannerTimeoutForNextCalls) {
355+
return isNextCall() ? scannerTimeout : readRpcTimeout;
356+
} else {
357+
return readRpcTimeout;
358+
}
359+
}
360+
361+
private boolean isNextCall() {
362+
return currentScannerCallable != null && currentScannerCallable.scannerId != -1
363+
&& !currentScannerCallable.renew && !currentScannerCallable.closed;
334364
}
335365

336366
private void addCallsForOtherReplicas(
337-
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max) {
367+
ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int min, int max,
368+
int rpcTimeout) {
338369

339370
for (int id = min; id <= max; id++) {
340371
if (currentScannerCallable.id == id) {
@@ -344,7 +375,7 @@ private void addCallsForOtherReplicas(
344375
setStartRowForReplicaCallable(s);
345376
outstandingCallables.add(s);
346377
RetryingRPC retryingOnReplica = new RetryingRPC(s);
347-
cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
378+
cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
348379
}
349380
}
350381

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ private static class MockClientScanner extends ClientSimpleScanner {
108108

109109
public MockClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
110110
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
111-
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
112-
throws IOException {
111+
RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
112+
ConnectionConfiguration connectionConfig) throws IOException {
113113
super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
114114
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
115115
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout,
116-
Collections.emptyMap());
116+
connectionConfig, Collections.emptyMap());
117117
}
118118

119119
@Override
@@ -178,7 +178,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
178178

179179
try (MockClientScanner scanner =
180180
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
181-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
181+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
182182

183183
scanner.setRpcFinished(true);
184184

@@ -242,7 +242,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
242242

243243
try (MockClientScanner scanner =
244244
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
245-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
245+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
246246
InOrder inOrder = Mockito.inOrder(caller);
247247

248248
scanner.loadCache();
@@ -305,7 +305,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
305305

306306
try (MockClientScanner scanner =
307307
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
308-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
308+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
309309
InOrder inOrder = Mockito.inOrder(caller);
310310

311311
scanner.loadCache();
@@ -376,7 +376,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
376376

377377
try (MockClientScanner scanner =
378378
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
379-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
379+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
380380
scanner.setRpcFinished(true);
381381

382382
InOrder inOrder = Mockito.inOrder(caller);
@@ -443,7 +443,7 @@ public Result[] answer(InvocationOnMock invocation) throws Throwable {
443443

444444
try (MockClientScanner scanner =
445445
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
446-
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
446+
rpcFactory, controllerFactory, pool, Integer.MAX_VALUE, connectionConfig)) {
447447
InOrder inOrder = Mockito.inOrder(caller);
448448
scanner.setRpcFinished(true);
449449

@@ -488,7 +488,7 @@ public void testExceptionsFromReplicasArePropagated() throws IOException {
488488

489489
try (MockClientScanner scanner =
490490
new MockClientScanner(conf, scan, TableName.valueOf(name.getMethodName()), clusterConn,
491-
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
491+
rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE, connectionConfig)) {
492492
Iterator<Result> iter = scanner.iterator();
493493
while (iter.hasNext()) {
494494
iter.next();

0 commit comments

Comments
 (0)