Skip to content

Commit ccd1d8a

Browse files
vaijoshtaklwu
authored andcommitted
HBASE-27498: Added logic in ConnectionImplementation.getKeepAliveMasterService to avoid expensive rpc calls in synchronized block (#4889)
Signed-off-by: Tak Lon (Stephen) Wu <taklwu@apache.org>
1 parent f1bfda1 commit ccd1d8a

2 files changed

Lines changed: 222 additions & 21 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
import org.slf4j.Logger;
9090
import org.slf4j.LoggerFactory;
9191

92+
import org.apache.hbase.thirdparty.com.google.common.base.Supplier;
93+
import org.apache.hbase.thirdparty.com.google.common.base.Suppliers;
9294
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
9395
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
9496
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
@@ -158,6 +160,9 @@
158160
@InterfaceAudience.Private
159161
class ConnectionImplementation implements ClusterConnection, Closeable {
160162
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
163+
164+
public static final String MASTER_STATE_CACHE_TIMEOUT_SEC =
165+
"hbase.client.master.state.cache.timeout.sec";
161166
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
162167

163168
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
@@ -239,6 +244,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
239244
/** lock guards against multiple threads trying to query the meta region at the same time */
240245
private final ReentrantLock userRegionLock = new ReentrantLock();
241246

247+
/**
248+
* Supplier to get masterState.By default uses simple supplier without TTL cache. When
249+
* hbase.client.master.state.cache.timeout.sec > 0 it uses TTL Cache.
250+
*/
251+
private final Supplier<Boolean> masterStateSupplier;
252+
242253
private ChoreService choreService;
243254

244255
/**
@@ -368,6 +379,39 @@ public void newDead(ServerName sn) {
368379
default:
369380
// Doing nothing
370381
}
382+
383+
long masterStateCacheTimeout = conf.getLong(MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
384+
385+
Supplier<Boolean> masterConnSupplier = masterConnectionStateSupplier();
386+
if (masterStateCacheTimeout <= 0L) {
387+
this.masterStateSupplier = masterConnSupplier;
388+
} else {
389+
this.masterStateSupplier = Suppliers.memoizeWithExpiration(masterConnSupplier,
390+
masterStateCacheTimeout, TimeUnit.SECONDS);
391+
}
392+
}
393+
394+
/**
395+
* Visible for tests
396+
*/
397+
Supplier<Boolean> masterConnectionStateSupplier() {
398+
return () -> {
399+
if (this.masterServiceState.getStub() == null) {
400+
return false;
401+
}
402+
try {
403+
LOG.info("Getting master state using rpc call");
404+
return this.masterServiceState.isMasterRunning();
405+
} catch (UndeclaredThrowableException e) {
406+
// It's somehow messy, but we can receive exceptions such as
407+
// java.net.ConnectException but they're not declared. So we catch it...
408+
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
409+
return false;
410+
} catch (IOException se) {
411+
LOG.warn("Checking master connection", se);
412+
return false;
413+
}
414+
};
371415
}
372416

373417
private void spawnRenewalChore(final UserGroupInformation user) {
@@ -1216,7 +1260,6 @@ public void addError() {
12161260
* Class to make a MasterServiceStubMaker stub.
12171261
*/
12181262
private final class MasterServiceStubMaker {
1219-
12201263
private void isMasterRunning(MasterProtos.MasterService.BlockingInterface stub)
12211264
throws IOException {
12221265
try {
@@ -1318,6 +1361,13 @@ public BlockingInterface getClient(ServerName serverName) throws IOException {
13181361

13191362
final MasterServiceState masterServiceState = new MasterServiceState(this);
13201363

1364+
/**
1365+
* Visible for tests
1366+
*/
1367+
MasterServiceState getMasterServiceState() {
1368+
return this.masterServiceState;
1369+
}
1370+
13211371
@Override
13221372
public MasterKeepAliveConnection getMaster() throws IOException {
13231373
return getKeepAliveMasterService();
@@ -1328,13 +1378,16 @@ private void resetMasterServiceState(final MasterServiceState mss) {
13281378
}
13291379

13301380
private MasterKeepAliveConnection getKeepAliveMasterService() throws IOException {
1331-
synchronized (masterLock) {
1332-
if (!isKeepAliveMasterConnectedAndRunning(this.masterServiceState)) {
1333-
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1334-
this.masterServiceState.stub = stubMaker.makeStub();
1381+
if (!isKeepAliveMasterConnectedAndRunning()) {
1382+
synchronized (masterLock) {
1383+
if (!isKeepAliveMasterConnectedAndRunning()) {
1384+
MasterServiceStubMaker stubMaker = new MasterServiceStubMaker();
1385+
this.masterServiceState.stub = stubMaker.makeStub();
1386+
}
1387+
resetMasterServiceState(this.masterServiceState);
13351388
}
1336-
resetMasterServiceState(this.masterServiceState);
13371389
}
1390+
13381391
// Ugly delegation just so we can add in a Close method.
13391392
final MasterProtos.MasterService.BlockingInterface stub = this.masterServiceState.stub;
13401393
return new MasterKeepAliveConnection() {
@@ -1887,21 +1940,9 @@ private static void release(MasterServiceState mss) {
18871940
}
18881941
}
18891942

1890-
private boolean isKeepAliveMasterConnectedAndRunning(MasterServiceState mss) {
1891-
if (mss.getStub() == null) {
1892-
return false;
1893-
}
1894-
try {
1895-
return mss.isMasterRunning();
1896-
} catch (UndeclaredThrowableException e) {
1897-
// It's somehow messy, but we can receive exceptions such as
1898-
// java.net.ConnectException but they're not declared. So we catch it...
1899-
LOG.info("Master connection is not running anymore", e.getUndeclaredThrowable());
1900-
return false;
1901-
} catch (IOException se) {
1902-
LOG.warn("Checking master connection", se);
1903-
return false;
1904-
}
1943+
private boolean isKeepAliveMasterConnectedAndRunning() {
1944+
LOG.info("Getting master connection state from TTL Cache");
1945+
return masterStateSupplier.get();
19051946
}
19061947

19071948
void releaseMaster(MasterServiceState mss) {
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import java.io.IOException;
21+
import java.lang.reflect.InvocationTargetException;
22+
import java.lang.reflect.Method;
23+
import java.lang.reflect.UndeclaredThrowableException;
24+
import org.apache.commons.lang3.reflect.FieldUtils;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.HBaseClassTestRule;
27+
import org.apache.hadoop.hbase.IntegrationTestingUtility;
28+
import org.apache.hadoop.hbase.security.UserProvider;
29+
import org.apache.hadoop.hbase.testclassification.ClientTests;
30+
import org.apache.hadoop.hbase.testclassification.MediumTests;
31+
import org.junit.AfterClass;
32+
import org.junit.Assert;
33+
import org.junit.BeforeClass;
34+
import org.junit.ClassRule;
35+
import org.junit.Test;
36+
import org.junit.experimental.categories.Category;
37+
import org.junit.runner.RunWith;
38+
import org.mockito.Mockito;
39+
import org.mockito.junit.MockitoJUnitRunner;
40+
41+
@Category({ ClientTests.class, MediumTests.class })
42+
@RunWith(MockitoJUnitRunner.class)
43+
public class TestConnectionImplementation {
44+
@ClassRule
45+
public static final HBaseClassTestRule CLASS_RULE =
46+
HBaseClassTestRule.forClass(TestConnectionImplementation.class);
47+
private static final IntegrationTestingUtility TEST_UTIL = new IntegrationTestingUtility();
48+
49+
@BeforeClass
50+
public static void beforeClass() throws Exception {
51+
TEST_UTIL.startMiniCluster(1);
52+
}
53+
54+
@AfterClass
55+
public static void afterClass() throws Exception {
56+
TEST_UTIL.shutdownMiniCluster();
57+
}
58+
59+
@Test
60+
public void testGetMaster_noCachedMasterState() throws IOException, IllegalAccessException {
61+
Configuration conf = TEST_UTIL.getConfiguration();
62+
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0L);
63+
ConnectionImplementation conn =
64+
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
65+
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
66+
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
67+
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 1
68+
conn.getMaster(); // Calls isMasterRunning since stubs are initialized. Invocation 2
69+
Mockito.verify(masterServiceState, Mockito.times(2)).isMasterRunning();
70+
conn.close();
71+
}
72+
73+
@Test
74+
public void testGetMaster_masterStateCacheHit() throws IOException, IllegalAccessException {
75+
Configuration conf = TEST_UTIL.getConfiguration();
76+
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 15L);
77+
ConnectionImplementation conn =
78+
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
79+
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
80+
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
81+
conn.getMaster(); // Uses cached value, don't call isMasterRunning
82+
conn.getMaster(); // Uses cached value, don't call isMasterRunning
83+
Mockito.verify(masterServiceState, Mockito.times(0)).isMasterRunning();
84+
conn.close();
85+
}
86+
87+
@Test
88+
public void testGetMaster_masterStateCacheMiss()
89+
throws IOException, InterruptedException, IllegalAccessException {
90+
Configuration conf = TEST_UTIL.getConfiguration();
91+
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 5L);
92+
ConnectionImplementation conn =
93+
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
94+
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
95+
conn.getMaster(); // This initializes the stubs but don't call isMasterRunning
96+
conn.getMaster(); // Uses cached value, don't call isMasterRunning
97+
conn.getMaster(); // Uses cached value, don't call isMasterRunning
98+
Thread.sleep(10000);
99+
conn.getMaster(); // Calls isMasterRunning after cache expiry
100+
Mockito.verify(masterServiceState, Mockito.times(1)).isMasterRunning();
101+
conn.close();
102+
}
103+
104+
@Test
105+
public void testIsKeepAliveMasterConnectedAndRunning_UndeclaredThrowableException()
106+
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
107+
Configuration conf = TEST_UTIL.getConfiguration();
108+
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
109+
ConnectionImplementation conn =
110+
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
111+
conn.getMaster(); // Initializes stubs
112+
113+
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
114+
Mockito.doThrow(new UndeclaredThrowableException(new Exception("DUMMY EXCEPTION")))
115+
.when(masterServiceState).isMasterRunning();
116+
117+
// Verify that masterState is "false" because of to injected exception
118+
boolean isKeepAliveMasterRunning =
119+
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
120+
Assert.assertFalse(isKeepAliveMasterRunning);
121+
conn.close();
122+
}
123+
124+
@Test
125+
public void testIsKeepAliveMasterConnectedAndRunning_IOException()
126+
throws IOException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
127+
Configuration conf = TEST_UTIL.getConfiguration();
128+
conf.setLong(ConnectionImplementation.MASTER_STATE_CACHE_TIMEOUT_SEC, 0);
129+
ConnectionImplementation conn =
130+
new ConnectionImplementation(conf, null, UserProvider.instantiate(conf).getCurrent());
131+
conn.getMaster();
132+
133+
ConnectionImplementation.MasterServiceState masterServiceState = spyMasterServiceState(conn);
134+
Mockito.doThrow(new IOException("DUMMY EXCEPTION")).when(masterServiceState).isMasterRunning();
135+
136+
boolean isKeepAliveMasterRunning =
137+
(boolean) getIsKeepAliveMasterConnectedAndRunningMethod().invoke(conn);
138+
139+
// Verify that masterState is "false" because of to injected exception
140+
Assert.assertFalse(isKeepAliveMasterRunning);
141+
conn.close();
142+
}
143+
144+
// Spy the masterServiceState object using reflection
145+
private ConnectionImplementation.MasterServiceState
146+
spyMasterServiceState(ConnectionImplementation conn) throws IllegalAccessException {
147+
ConnectionImplementation.MasterServiceState spiedMasterServiceState =
148+
Mockito.spy(conn.getMasterServiceState());
149+
FieldUtils.writeDeclaredField(conn, "masterServiceState", spiedMasterServiceState, true);
150+
return spiedMasterServiceState;
151+
}
152+
153+
// Get isKeepAliveMasterConnectedAndRunning using reflection
154+
private Method getIsKeepAliveMasterConnectedAndRunningMethod() throws NoSuchMethodException {
155+
Method method =
156+
ConnectionImplementation.class.getDeclaredMethod("isKeepAliveMasterConnectedAndRunning");
157+
method.setAccessible(true);
158+
return method;
159+
}
160+
}

0 commit comments

Comments
 (0)