Skip to content

Commit ce4e692

Browse files
authored
HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2110)
Closes #2052 Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 33102a1 commit ce4e692

25 files changed

Lines changed: 1060 additions & 518 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2055,6 +2055,11 @@ public static SlowLogResponseRequest buildSlowLogResponseRequest(
20552055
} else {
20562056
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
20572057
}
2058+
if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
2059+
builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
2060+
} else {
2061+
builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
2062+
}
20582063
return builder.setLimit(logQueryFilter.getLimit()).build();
20592064
}
20602065

hbase-common/src/main/resources/hbase-default.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,4 +1996,16 @@ possible configurations would overwhelm and obscure the important.
19961996
too large batch request.
19971997
</description>
19981998
</property>
1999+
<property>
2000+
<name>hbase.namedqueue.provider.classes</name>
2001+
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
2002+
<description>
2003+
Default values for NamedQueueService implementors. This comma separated full class names
2004+
represent all implementors of NamedQueueService that we would like to be invoked by
2005+
LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
2006+
is used to store slow/large RPC logs in ringbuffer at each RegionServer.
2007+
All implementors of NamedQueueService should be found under package:
2008+
"org.apache.hadoop.hbase.namequeues.impl"
2009+
</description>
2010+
</property>
19992011
</configuration>

hbase-protocol-shaded/src/main/protobuf/Admin.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,18 @@ message SlowLogResponseRequest {
287287
OR = 1;
288288
}
289289

290+
enum LogType {
291+
SLOW_LOG = 0;
292+
LARGE_LOG = 1;
293+
}
294+
290295
optional string region_name = 1;
291296
optional string table_name = 2;
292297
optional string client_address = 3;
293298
optional string user_name = 4;
294299
optional uint32 limit = 5 [default = 10];
295300
optional FilterByOperator filter_by_operator = 6 [default = OR];
301+
optional LogType log_type = 7;
296302
}
297303

298304
message SlowLogResponses {

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,16 @@
3939
import org.apache.hadoop.hbase.CallQueueTooBigException;
4040
import org.apache.hadoop.hbase.CellScanner;
4141
import org.apache.hadoop.hbase.DoNotRetryIOException;
42+
import org.apache.hadoop.hbase.HConstants;
4243
import org.apache.hadoop.hbase.Server;
4344
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
4445
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
4546
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
4647
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
4748
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
4849
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
49-
import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
50-
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
50+
import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
51+
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
5152
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
5253
import org.apache.hadoop.hbase.security.SaslUtil;
5354
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -96,6 +97,7 @@ public abstract class RpcServer implements RpcServerInterface,
9697
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
9798

9899
private final boolean authorize;
100+
private final boolean isOnlineLogProviderEnabled;
99101
protected boolean isSecurityEnabled;
100102

101103
public static final byte CURRENT_VERSION = 0;
@@ -227,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
227229
/**
228230
* Use to add online slowlog responses
229231
*/
230-
private SlowLogRecorder slowLogRecorder;
232+
private NamedQueueRecorder namedQueueRecorder;
231233

232234
@FunctionalInterface
233235
protected interface CallCleanup {
@@ -302,6 +304,8 @@ public RpcServer(final Server server, final String name,
302304
saslProps = Collections.emptyMap();
303305
}
304306

307+
this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
308+
HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
305309
this.scheduler = scheduler;
306310
}
307311

@@ -430,11 +434,11 @@ public Pair<Message, CellScanner> call(RpcCall call,
430434
tooLarge, tooSlow,
431435
status.getClient(), startTime, processingTime, qTime,
432436
responseSize, userName);
433-
if (this.slowLogRecorder != null) {
437+
if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
434438
// send logs to ring buffer owned by slowLogRecorder
435-
final String className = server == null ? StringUtils.EMPTY :
436-
server.getClass().getSimpleName();
437-
this.slowLogRecorder.addSlowLogPayload(
439+
final String className =
440+
server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
441+
this.namedQueueRecorder.addRecord(
438442
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
439443
tooLarge));
440444
}
@@ -817,12 +821,8 @@ public void setRsRpcServices(RSRpcServices rsRpcServices) {
817821
}
818822

819823
@Override
820-
public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
821-
this.slowLogRecorder = slowLogRecorder;
824+
public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
825+
this.namedQueueRecorder = namedQueueRecorder;
822826
}
823827

824-
@Override
825-
public SlowLogRecorder getSlowLogRecorder() {
826-
return slowLogRecorder;
827-
}
828828
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
2828
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
2929
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
30-
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
30+
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
3131
import org.apache.hadoop.hbase.util.Pair;
3232
import org.apache.hadoop.security.authorize.PolicyProvider;
3333
import org.apache.yetus.audience.InterfaceAudience;
@@ -102,12 +102,8 @@ Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
102102
/**
103103
* Set Online SlowLog Provider
104104
*
105-
* @param slowLogRecorder instance of {@link SlowLogRecorder}
105+
* @param namedQueueRecorder instance of {@link NamedQueueRecorder}
106106
*/
107-
void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
107+
void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
108108

109-
/**
110-
* @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
111-
*/
112-
SlowLogRecorder getSlowLogRecorder();
113109
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java renamed to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
package org.apache.hadoop.hbase.regionserver.slowlog;
20+
package org.apache.hadoop.hbase.namequeues;
2121

2222
import com.lmax.disruptor.ExceptionHandler;
2323
import org.apache.yetus.audience.InterfaceAudience;
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.hadoop.hbase.namequeues;
21+
22+
import com.lmax.disruptor.EventHandler;
23+
import com.lmax.disruptor.RingBuffer;
24+
25+
import java.lang.reflect.InvocationTargetException;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
31+
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
32+
import org.apache.yetus.audience.InterfaceAudience;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
/**
37+
* Event Handler run by disruptor ringbuffer consumer.
38+
* Although this is generic implementation for namedQueue, it can have individual queue specific
39+
* logic.
40+
*/
41+
@InterfaceAudience.Private
42+
class LogEventHandler implements EventHandler<RingBufferEnvelope> {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
45+
46+
// Map that binds namedQueues to corresponding queue service implementation.
47+
// If NamedQueue of specific type is enabled, corresponding service will be used to
48+
// insert and retrieve records.
49+
// Individual queue sizes should be determined based on their individual configs within
50+
// each service.
51+
private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
52+
new HashMap<>();
53+
54+
private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
55+
56+
LogEventHandler(final Configuration conf) {
57+
for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
58+
Class<?> clz;
59+
try {
60+
clz = Class.forName(implName);
61+
} catch (ClassNotFoundException e) {
62+
LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
63+
continue;
64+
}
65+
66+
if (!NamedQueueService.class.isAssignableFrom(clz)) {
67+
LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
68+
continue;
69+
}
70+
71+
// add all service mappings here
72+
try {
73+
NamedQueueService namedQueueService =
74+
(NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
75+
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
76+
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
77+
| InvocationTargetException e) {
78+
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
79+
clz);
80+
}
81+
}
82+
}
83+
84+
/**
85+
* Called when a publisher has published an event to the {@link RingBuffer}.
86+
* This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
87+
* add, we should also provide specific consumer logic here.
88+
*
89+
* @param event published to the {@link RingBuffer}
90+
* @param sequence of the event being processed
91+
* @param endOfBatch flag to indicate if this is the last event in a batch from
92+
* the {@link RingBuffer}
93+
*/
94+
@Override
95+
public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
96+
final NamedQueuePayload namedQueuePayload = event.getPayload();
97+
// consume ringbuffer payload based on event type
98+
namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
99+
.consumeEventFromDisruptor(namedQueuePayload);
100+
}
101+
102+
/**
103+
* Cleans up queues maintained by services.
104+
*
105+
* @param namedQueueEvent type of queue to clear
106+
* @return true if queue is cleaned up, false otherwise
107+
*/
108+
boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
109+
return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
110+
}
111+
112+
/**
113+
* Add all in memory queue records to system table. The implementors can use system table
114+
* or direct HDFS file or ZK as persistence system.
115+
*/
116+
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
117+
namedQueueServices.get(namedQueueEvent).persistAll();
118+
}
119+
120+
/**
121+
* Retrieve in memory queue records from ringbuffer
122+
*
123+
* @param request namedQueue request with event type
124+
* @return queue records from ringbuffer after filter (if applied)
125+
*/
126+
NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
127+
return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
128+
}
129+
130+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java renamed to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20-
package org.apache.hadoop.hbase.regionserver.slowlog;
20+
package org.apache.hadoop.hbase.namequeues;
2121

2222
import java.util.ArrayList;
2323
import java.util.List;
@@ -30,7 +30,7 @@
3030
* Event Handler utility class
3131
*/
3232
@InterfaceAudience.Private
33-
class LogHandlerUtils {
33+
public class LogHandlerUtils {
3434

3535
private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
3636
int totalFilters = 0;
@@ -91,7 +91,7 @@ private static List<TooSlowLog.SlowLogPayload> filterLogs(
9191
return filteredSlowLogPayloads;
9292
}
9393

94-
static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
94+
public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
9595
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
9696
int totalFilters = getTotalFiltersCount(request);
9797
if (totalFilters > 0) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.hadoop.hbase.namequeues;
21+
22+
import org.apache.yetus.audience.InterfaceAudience;
23+
24+
/**
25+
* Base payload to be prepared by client to send various namedQueue events for in-memory
26+
* ring buffer storage in either HMaster or RegionServer.
27+
* e.g slowLog responses
28+
*/
29+
@InterfaceAudience.Private
30+
public class NamedQueuePayload {
31+
32+
public enum NamedQueueEvent {
33+
SLOW_LOG
34+
}
35+
36+
private final NamedQueueEvent namedQueueEvent;
37+
38+
public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
39+
if (namedQueueEvent == null) {
40+
throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
41+
}
42+
this.namedQueueEvent = namedQueueEvent;
43+
}
44+
45+
public NamedQueueEvent getNamedQueueEvent() {
46+
return namedQueueEvent;
47+
}
48+
49+
}

0 commit comments

Comments
 (0)