Skip to content

Commit 5118e71

Browse files
bbeaudreaultapurtell
authored andcommitted
HBASE-26703 Allow configuration of IPC queue balancer (#4063)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Conflicts: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
1 parent 2a7b413 commit 5118e71

6 files changed

Lines changed: 263 additions & 48 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
package org.apache.hadoop.hbase.ipc;
1919

2020
import java.util.concurrent.BlockingQueue;
21-
2221
import org.apache.hadoop.conf.Configuration;
2322
import org.apache.hadoop.hbase.Abortable;
2423
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
24+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2525
import org.apache.yetus.audience.InterfaceAudience;
2626
import org.apache.yetus.audience.InterfaceStability;
2727

@@ -47,18 +47,27 @@ public BalancedQueueRpcExecutor(final String name, final int handlerCount,
4747
final String callQueueType, final int maxQueueLength, final PriorityFunction priority,
4848
final Configuration conf, final Abortable abortable) {
4949
super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable);
50-
this.balancer = getBalancer(this.numCallQueues);
5150
initializeQueues(this.numCallQueues);
51+
this.balancer = getBalancer(name, conf, getQueues());
5252
}
5353

5454
@Override
5555
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
56-
int queueIndex = balancer.getNextQueue();
56+
int queueIndex = balancer.getNextQueue(callTask);
5757
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
5858
// that means we can overflow by at most <num reader> size (5), that's ok
5959
if (queue.size() >= currentQueueLimit) {
6060
return false;
6161
}
6262
return queue.offer(callTask);
6363
}
64+
65+
@Override
66+
public void onConfigurationChange(Configuration conf) {
67+
super.onConfigurationChange(conf);
68+
69+
if (balancer instanceof ConfigurationObserver) {
70+
((ConfigurationObserver) balancer).onConfigurationChange(conf);
71+
}
72+
}
6473
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.apache.hadoop.hbase.ipc;
20+
21+
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
import org.apache.yetus.audience.InterfaceStability;
24+
25+
/**
26+
* Interface for balancing requests across IPC queues
27+
*/
28+
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
29+
@InterfaceStability.Stable
30+
public interface QueueBalancer {
31+
/**
32+
* @return the index of the next queue to which a request should be inserted
33+
*/
34+
int getNextQueue(CallRunner callRunner);
35+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,22 @@
2121

2222
import java.util.Queue;
2323
import java.util.concurrent.atomic.AtomicInteger;
24-
2524
import org.apache.hadoop.conf.Configuration;
2625
import org.apache.hadoop.hbase.Abortable;
2726
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
27+
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
2828
import org.apache.yetus.audience.InterfaceAudience;
2929
import org.apache.yetus.audience.InterfaceStability;
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
32+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
3233
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
3334
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
3435
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
35-
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
3636
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
3737
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
3838
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
39-
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
39+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
4040

4141
/**
4242
* RPC Executor that uses different queues for reads and writes.
@@ -96,14 +96,17 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
9696
numScanQueues = scanQueues;
9797
scanHandlersCount = scanHandlers;
9898

99-
this.writeBalancer = getBalancer(numWriteQueues);
100-
this.readBalancer = getBalancer(numReadQueues);
101-
this.scanBalancer = numScanQueues > 0 ? getBalancer(numScanQueues) : null;
102-
10399
initializeQueues(numWriteQueues);
104100
initializeQueues(numReadQueues);
105101
initializeQueues(numScanQueues);
106102

103+
this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues));
104+
this.readBalancer = getBalancer(name, conf, queues.subList(numWriteQueues, numWriteQueues + numReadQueues));
105+
this.scanBalancer = numScanQueues > 0 ?
106+
getBalancer(name, conf, queues.subList(numWriteQueues + numReadQueues,
107+
numWriteQueues + numReadQueues + numScanQueues)) :
108+
null;
109+
107110
LOG.info(getName() + " writeQueues=" + numWriteQueues + " writeHandlers=" + writeHandlersCount
108111
+ " readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount + " scanQueues="
109112
+ numScanQueues + " scanHandlers=" + scanHandlersCount);
@@ -138,11 +141,11 @@ protected boolean dispatchTo(boolean toWriteQueue, boolean toScanQueue,
138141
final CallRunner callTask) {
139142
int queueIndex;
140143
if (toWriteQueue) {
141-
queueIndex = writeBalancer.getNextQueue();
144+
queueIndex = writeBalancer.getNextQueue(callTask);
142145
} else if (toScanQueue) {
143-
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
146+
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue(callTask);
144147
} else {
145-
queueIndex = numWriteQueues + readBalancer.getNextQueue();
148+
queueIndex = numWriteQueues + readBalancer.getNextQueue(callTask);
146149
}
147150

148151
Queue<CallRunner> queue = queues.get(queueIndex);
@@ -234,6 +237,18 @@ protected boolean isWriteRequest(final RequestHeader header, final Message param
234237
return false;
235238
}
236239

240+
QueueBalancer getWriteBalancer() {
241+
return writeBalancer;
242+
}
243+
244+
QueueBalancer getReadBalancer() {
245+
return readBalancer;
246+
}
247+
248+
QueueBalancer getScanBalancer() {
249+
return scanBalancer;
250+
}
251+
237252
private boolean isScanRequest(final RequestHeader header, final Message param) {
238253
return param instanceof ScanRequest;
239254
}
@@ -266,4 +281,18 @@ private static int calcNumWriters(final int count, final float readShare) {
266281
private static int calcNumReaders(final int count, final float readShare) {
267282
return count - calcNumWriters(count, readShare);
268283
}
284+
285+
@Override
286+
public void onConfigurationChange(Configuration conf) {
287+
super.onConfigurationChange(conf);
288+
propagateBalancerConfigChange(writeBalancer, conf);
289+
propagateBalancerConfigChange(readBalancer, conf);
290+
propagateBalancerConfigChange(scanBalancer, conf);
291+
}
292+
293+
private void propagateBalancerConfigChange(QueueBalancer balancer, Configuration conf) {
294+
if (balancer instanceof ConfigurationObserver) {
295+
((ConfigurationObserver) balancer).onConfigurationChange(conf);
296+
}
297+
}
269298
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.hadoop.hbase.ipc;
21+
22+
import java.util.List;
23+
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.ThreadLocalRandom;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.yetus.audience.InterfaceAudience;
27+
import org.apache.yetus.audience.InterfaceStability;
28+
29+
/**
30+
* Queue balancer that just randomly selects a queue in the range [0, num queues).
31+
*/
32+
@InterfaceAudience.Private
33+
@InterfaceStability.Stable
34+
public class RandomQueueBalancer implements QueueBalancer {
35+
private final int queueSize;
36+
private final List<BlockingQueue<CallRunner>> queues;
37+
38+
public RandomQueueBalancer(Configuration conf, String executorName, List<BlockingQueue<CallRunner>> queues) {
39+
this.queueSize = queues.size();
40+
this.queues = queues;
41+
}
42+
43+
@Override
44+
public int getNextQueue(CallRunner callRunner) {
45+
return ThreadLocalRandom.current().nextInt(queueSize);
46+
}
47+
48+
/**
49+
* Exposed for use in tests
50+
*/
51+
List<BlockingQueue<CallRunner>> getQueues() {
52+
return queues;
53+
}
54+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,25 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.Comparator;
23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Locale;
26+
import java.util.Map;
2527
import java.util.Optional;
2628
import java.util.concurrent.BlockingQueue;
2729
import java.util.concurrent.LinkedBlockingQueue;
28-
import java.util.concurrent.ThreadLocalRandom;
2930
import java.util.concurrent.atomic.AtomicInteger;
3031
import java.util.concurrent.atomic.LongAdder;
31-
import java.util.Map;
32-
import java.util.HashMap;
33-
3432
import org.apache.hadoop.conf.Configuration;
3533
import org.apache.hadoop.hbase.Abortable;
3634
import org.apache.hadoop.hbase.HConstants;
3735
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
36+
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
37+
import org.apache.hadoop.hbase.util.ReflectionUtils;
3838
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
3939
import org.apache.yetus.audience.InterfaceAudience;
4040
import org.slf4j.Logger;
4141
import org.slf4j.LoggerFactory;
42-
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
43-
import org.apache.hadoop.hbase.util.ReflectionUtils;
44-
4542
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
4643
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
4744

@@ -71,6 +68,10 @@ public abstract class RpcExecutor {
7168
public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
7269
public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
7370

71+
public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class";
72+
public static final Class<?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer.class;
73+
74+
7475
// These 3 are only used by Codel executor
7576
public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
7677
public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
@@ -295,19 +296,13 @@ protected void startHandlers(final String nameSuffix, final int numHandlers,
295296
handlers.size(), threadPrefix, qsize, port);
296297
}
297298

298-
public static abstract class QueueBalancer {
299-
/**
300-
* @return the index of the next queue to which a request should be inserted
301-
*/
302-
public abstract int getNextQueue();
303-
}
304-
305-
public static QueueBalancer getBalancer(int queueSize) {
306-
Preconditions.checkArgument(queueSize > 0, "Queue size is <= 0, must be at least 1");
307-
if (queueSize == 1) {
299+
public static QueueBalancer getBalancer(String executorName, Configuration conf, List<BlockingQueue<CallRunner>> queues) {
300+
Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1");
301+
if (queues.size() == 1) {
308302
return ONE_QUEUE;
309303
} else {
310-
return new RandomQueueBalancer(queueSize);
304+
Class<?> balancerClass = conf.getClass(CALL_QUEUE_QUEUE_BALANCER_CLASS, CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT);
305+
return (QueueBalancer) ReflectionUtils.newInstance(balancerClass, conf, executorName, queues);
311306
}
312307
}
313308

@@ -316,27 +311,11 @@ public static QueueBalancer getBalancer(int queueSize) {
316311
*/
317312
private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
318313
@Override
319-
public int getNextQueue() {
314+
public int getNextQueue(CallRunner callRunner) {
320315
return 0;
321316
}
322317
};
323318

324-
/**
325-
* Queue balancer that just randomly selects a queue in the range [0, num queues).
326-
*/
327-
private static class RandomQueueBalancer extends QueueBalancer {
328-
private final int queueSize;
329-
330-
public RandomQueueBalancer(int queueSize) {
331-
this.queueSize = queueSize;
332-
}
333-
334-
@Override
335-
public int getNextQueue() {
336-
return ThreadLocalRandom.current().nextInt(queueSize);
337-
}
338-
}
339-
340319
/**
341320
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
342321
* uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have

0 commit comments

Comments
 (0)