Skip to content

Commit 5bae04e

Browse files
authored
HBASE-26784 Use HIGH_QOS for ResultScanner.close requests (#4163)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
1 parent 2e2764f commit 5bae04e

10 files changed

Lines changed: 469 additions & 81 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ private void startScan(OpenScannerResponse resp) {
188188
private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
189189
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
190190
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
191+
.priority(scan.getPriority())
191192
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
192193
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
193194
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.TimeUnit;
3737
import org.apache.hadoop.hbase.CallQueueTooBigException;
3838
import org.apache.hadoop.hbase.DoNotRetryIOException;
39+
import org.apache.hadoop.hbase.HConstants;
3940
import org.apache.hadoop.hbase.HRegionLocation;
4041
import org.apache.hadoop.hbase.NotServingRegionException;
4142
import org.apache.hadoop.hbase.UnknownScannerException;
@@ -347,7 +348,7 @@ private long remainingTimeNs() {
347348

348349
private void closeScanner() {
349350
incRPCCallsMetrics(scanMetrics, regionServerRemote);
350-
resetController(controller, rpcTimeoutNs, priority);
351+
resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
351352
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
352353
stub.scan(controller, req, resp -> {
353354
if (controller.failed()) {

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hadoop.conf.Configuration;
3030
import org.apache.hadoop.hbase.DoNotRetryIOException;
3131
import org.apache.hadoop.hbase.HBaseIOException;
32+
import org.apache.hadoop.hbase.HConstants;
3233
import org.apache.hadoop.hbase.HRegionInfo;
3334
import org.apache.hadoop.hbase.HRegionLocation;
3435
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -38,6 +39,7 @@
3839
import org.apache.hadoop.hbase.UnknownScannerException;
3940
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
4041
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
42+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
4143
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
4244
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
4345
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -309,8 +311,16 @@ private void close() {
309311
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
310312
ScanRequest request =
311313
RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
314+
HBaseRpcController controller = rpcControllerFactory.newController();
315+
// pull in the original priority, but then try to set to HIGH.
316+
// it will take whatever is highest.
317+
controller.setPriority(controller.getPriority());
318+
controller.setPriority(HConstants.HIGH_QOS);
319+
if (controller.hasCallTimeout()) {
320+
controller.setCallTimeout(controller.getCallTimeout());
321+
}
312322
try {
313-
getStub().scan(getRpcController(), request);
323+
getStub().scan(controller, request);
314324
} catch (Exception e) {
315325
throw ProtobufUtil.handleRemoteException(e);
316326
}

hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java

Lines changed: 117 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.apache.hadoop.hbase.HConstants.HIGH_QOS;
2021
import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS;
2122
import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS;
2223
import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR;
24+
import static org.junit.Assert.assertEquals;
25+
import static org.junit.Assert.assertFalse;
2326
import static org.junit.Assert.assertNotNull;
27+
import static org.junit.Assert.assertTrue;
2428
import static org.mockito.ArgumentMatchers.any;
2529
import static org.mockito.ArgumentMatchers.anyInt;
2630
import static org.mockito.ArgumentMatchers.anyLong;
@@ -33,8 +37,13 @@
3337

3438
import java.io.IOException;
3539
import java.util.Arrays;
40+
import java.util.Optional;
3641
import java.util.concurrent.CompletableFuture;
42+
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
44+
import java.util.concurrent.TimeUnit;
3745
import java.util.concurrent.atomic.AtomicInteger;
46+
3847
import org.apache.hadoop.conf.Configuration;
3948
import org.apache.hadoop.hbase.Cell;
4049
import org.apache.hadoop.hbase.Cell.Type;
@@ -59,9 +68,7 @@
5968
import org.mockito.ArgumentMatcher;
6069
import org.mockito.invocation.InvocationOnMock;
6170
import org.mockito.stubbing.Answer;
62-
6371
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
64-
6572
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
6673
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
6774
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -77,6 +84,7 @@
7784
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
7885
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
7986

87+
8088
/**
8189
* Confirm that we will set the priority in {@link HBaseRpcController} for several table operations.
8290
*/
@@ -91,41 +99,18 @@ public class TestAsyncTableRpcPriority {
9199

92100
private ClientService.Interface stub;
93101

102+
private ExecutorService threadPool;
103+
94104
private AsyncConnection conn;
95105

96106
@Rule
97107
public TestName name = new TestName();
98108

99109
@Before
100110
public void setUp() throws IOException {
111+
this.threadPool = Executors.newSingleThreadExecutor();
101112
stub = mock(ClientService.Interface.class);
102-
AtomicInteger scanNextCalled = new AtomicInteger(0);
103-
doAnswer(new Answer<Void>() {
104113

105-
@Override
106-
public Void answer(InvocationOnMock invocation) throws Throwable {
107-
ScanRequest req = invocation.getArgument(1);
108-
RpcCallback<ScanResponse> done = invocation.getArgument(2);
109-
if (!req.hasScannerId()) {
110-
done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800)
111-
.setMoreResultsInRegion(true).setMoreResults(true).build());
112-
} else {
113-
if (req.hasCloseScanner() && req.getCloseScanner()) {
114-
done.run(ScanResponse.getDefaultInstance());
115-
} else {
116-
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put)
117-
.setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
118-
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
119-
.setValue(Bytes.toBytes("v")).build();
120-
Result result = Result.create(Arrays.asList(cell));
121-
done.run(
122-
ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true)
123-
.setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build());
124-
}
125-
}
126-
return null;
127-
}
128-
}).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any());
129114
doAnswer(new Answer<Void>() {
130115

131116
@Override
@@ -218,6 +203,16 @@ public boolean matches(HBaseRpcController controller) {
218203
});
219204
}
220205

206+
private ScanRequest assertScannerCloseRequest() {
207+
return argThat(new ArgumentMatcher<ScanRequest>() {
208+
209+
@Override
210+
public boolean matches(ScanRequest request) {
211+
return request.hasCloseScanner() && request.getCloseScanner();
212+
}
213+
});
214+
}
215+
221216
@Test
222217
public void testGet() {
223218
conn.getTable(TableName.valueOf(name.getMethodName()))
@@ -478,53 +473,112 @@ public void testCheckAndMutateMetaTable() throws IOException {
478473
any(ClientProtos.MultiRequest.class), any());
479474
}
480475

476+
private CompletableFuture<Void> mockScanReturnRenewFuture(int scanPriority) {
477+
int scannerId = 1;
478+
CompletableFuture<Void> future = new CompletableFuture<>();
479+
AtomicInteger scanNextCalled = new AtomicInteger(0);
480+
doAnswer(new Answer<Void>() {
481+
482+
@SuppressWarnings("FutureReturnValueIgnored")
483+
@Override
484+
public Void answer(InvocationOnMock invocation) throws Throwable {
485+
threadPool.submit(() -> {
486+
ScanRequest req = invocation.getArgument(1);
487+
RpcCallback<ScanResponse> done = invocation.getArgument(2);
488+
if (!req.hasScannerId()) {
489+
done.run(
490+
ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
491+
.setMoreResultsInRegion(true).setMoreResults(true).build());
492+
} else {
493+
if (req.hasRenew() && req.getRenew()) {
494+
future.complete(null);
495+
}
496+
497+
assertFalse("close scanner should not come in with scan priority " + scanPriority,
498+
req.hasCloseScanner() && req.getCloseScanner());
499+
500+
Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
501+
.setType(Type.Put).setRow(Bytes.toBytes(scanNextCalled.incrementAndGet()))
502+
.setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq"))
503+
.setValue(Bytes.toBytes("v")).build();
504+
Result result = Result.create(Arrays.asList(cell));
505+
done.run(
506+
ScanResponse.newBuilder().setScannerId(scannerId).setTtl(800)
507+
.setMoreResultsInRegion(true).setMoreResults(true)
508+
.addResults(ProtobufUtil.toResult(result)).build());
509+
}
510+
});
511+
return null;
512+
}
513+
}).when(stub).scan(assertPriority(scanPriority), any(ScanRequest.class), any());
514+
515+
doAnswer(new Answer<Void>() {
516+
517+
@SuppressWarnings("FutureReturnValueIgnored")
518+
@Override
519+
public Void answer(InvocationOnMock invocation) throws Throwable {
520+
threadPool.submit(() -> {
521+
ScanRequest req = invocation.getArgument(1);
522+
RpcCallback<ScanResponse> done = invocation.getArgument(2);
523+
assertTrue("close request should have scannerId", req.hasScannerId());
524+
assertEquals("close request's scannerId should match", scannerId,
525+
req.getScannerId());
526+
assertTrue("close request should have closerScanner set",
527+
req.hasCloseScanner() && req.getCloseScanner());
528+
529+
done.run(ScanResponse.getDefaultInstance());
530+
});
531+
return null;
532+
}
533+
}).when(stub).scan(assertPriority(HIGH_QOS), assertScannerCloseRequest(), any());
534+
return future;
535+
}
536+
481537
@Test
482-
public void testScan() throws IOException, InterruptedException {
483-
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
484-
.getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) {
485-
assertNotNull(scanner.next());
486-
Thread.sleep(1000);
487-
}
488-
Thread.sleep(1000);
489-
// open, next, several renew lease, and then close
490-
verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any());
538+
public void testScan() throws Exception {
539+
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(19);
540+
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(19));
491541
}
492542

493543
@Test
494-
public void testScanNormalTable() throws IOException, InterruptedException {
495-
try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName()))
496-
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
497-
assertNotNull(scanner.next());
498-
Thread.sleep(1000);
499-
}
500-
Thread.sleep(1000);
501-
// open, next, several renew lease, and then close
502-
verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any());
544+
public void testScanNormalTable() throws Exception {
545+
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(NORMAL_QOS);
546+
testForTable(TableName.valueOf(name.getMethodName()), renewFuture, Optional.of(NORMAL_QOS));
503547
}
504548

505549
@Test
506-
public void testScanSystemTable() throws IOException, InterruptedException {
507-
try (ResultScanner scanner =
508-
conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()))
509-
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
510-
assertNotNull(scanner.next());
511-
Thread.sleep(1000);
512-
}
513-
Thread.sleep(1000);
514-
// open, next, several renew lease, and then close
515-
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
550+
public void testScanSystemTable() throws Exception {
551+
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
552+
testForTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName()),
553+
renewFuture, Optional.empty());
516554
}
517555

518556
@Test
519-
public void testScanMetaTable() throws IOException, InterruptedException {
520-
try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME)
521-
.getScanner(new Scan().setCaching(1).setMaxResultSize(1))) {
557+
public void testScanMetaTable() throws Exception {
558+
CompletableFuture<Void> renewFuture = mockScanReturnRenewFuture(SYSTEMTABLE_QOS);
559+
testForTable(TableName.META_TABLE_NAME, renewFuture, Optional.empty());
560+
}
561+
562+
private void testForTable(TableName tableName, CompletableFuture<Void> renewFuture,
563+
Optional<Integer> priority) throws Exception {
564+
Scan scan = new Scan().setCaching(1).setMaxResultSize(1);
565+
priority.ifPresent(scan::setPriority);
566+
567+
try (ResultScanner scanner = conn.getTable(tableName).getScanner(scan)) {
522568
assertNotNull(scanner.next());
523-
Thread.sleep(1000);
569+
// wait for at least one renew to come in before closing
570+
renewFuture.join();
524571
}
525-
Thread.sleep(1000);
526-
// open, next, several renew lease, and then close
527-
verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any());
572+
573+
// ensures the close thread has time to finish before asserting
574+
threadPool.shutdown();
575+
threadPool.awaitTermination(5, TimeUnit.SECONDS);
576+
577+
// just verify that the calls happened. verification of priority occurred in the mocking
578+
// open, next, then one or more lease renewals, then close
579+
verify(stub, atLeast(4)).scan(any(), any(ScanRequest.class), any());
580+
// additionally, explicitly check for a close request
581+
verify(stub, times(1)).scan(any(), assertScannerCloseRequest(), any());
528582
}
529583

530584
@Test

0 commit comments

Comments
 (0)