Skip to content

Commit fa9518b

Browse files
rmdmattinglybbeaudreault
authored andcommitted
HBASE-28175 Deep copy RpcLogDetails' param field (#5481)
Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Bryan Beaudreault <bbeaudreault@apache.org>
1 parent 361bd51 commit fa9518b

3 files changed

Lines changed: 282 additions & 3 deletions

File tree

hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public boolean equals(Object o) {
8282
}
8383
SlowLogParams that = (SlowLogParams) o;
8484
return new EqualsBuilder().append(regionName, that.regionName).append(params, that.params)
85-
.append("scan", scan).isEquals();
85+
.append(scan, that.scan).isEquals();
8686
}
8787

8888
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121
import org.apache.commons.lang3.builder.ToStringBuilder;
2222
import org.apache.hadoop.hbase.ipc.RpcCall;
2323
import org.apache.yetus.audience.InterfaceAudience;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426

27+
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
2528
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
2629

2730
/**
@@ -32,8 +35,10 @@ public class RpcLogDetails extends NamedQueuePayload {
3235

3336
public static final int SLOW_LOG_EVENT = 0;
3437

38+
private static final Logger LOG = LoggerFactory.getLogger(RpcLogDetails.class.getName());
39+
3540
private final RpcCall rpcCall;
36-
private final Message param;
41+
private Message param;
3742
private final String clientAddress;
3843
private final long responseSize;
3944
private final long blockBytesScanned;
@@ -47,7 +52,6 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long
4752
long blockBytesScanned, String className, boolean isSlowLog, boolean isLargeLog) {
4853
super(SLOW_LOG_EVENT);
4954
this.rpcCall = rpcCall;
50-
this.param = param;
5155
this.clientAddress = clientAddress;
5256
this.responseSize = responseSize;
5357
this.blockBytesScanned = blockBytesScanned;
@@ -60,6 +64,16 @@ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long
6064
// would result in corrupted attributes
6165
this.connectionAttributes = rpcCall.getConnectionAttributes();
6266
this.requestAttributes = rpcCall.getRequestAttributes();
67+
68+
// We also need to deep copy the message because the CodedInputStream may be
69+
// overwritten before this slow log is consumed. Such overwriting could
70+
// cause the slow log payload to be corrupt
71+
try {
72+
this.param = param.newBuilderForType().mergeFrom(param.toByteArray()).build();
73+
} catch (InvalidProtocolBufferException e) {
74+
LOG.error("Failed to parse protobuf for message {}", param, e);
75+
this.param = param;
76+
}
6377
}
6478

6579
public RpcCall getRpcCall() {
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.namequeues;
19+
20+
import static org.junit.Assert.assertArrayEquals;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
24+
import java.io.IOException;
25+
import java.net.InetAddress;
26+
import java.nio.ByteBuffer;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.Map;
30+
import java.util.Optional;
31+
import org.apache.hadoop.hbase.CellScanner;
32+
import org.apache.hadoop.hbase.HBaseClassTestRule;
33+
import org.apache.hadoop.hbase.ipc.RpcCall;
34+
import org.apache.hadoop.hbase.ipc.RpcCallback;
35+
import org.apache.hadoop.hbase.security.User;
36+
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
37+
import org.apache.hadoop.hbase.testclassification.SmallTests;
38+
import org.apache.hadoop.hbase.util.Bytes;
39+
import org.junit.ClassRule;
40+
import org.junit.Test;
41+
import org.junit.experimental.categories.Category;
42+
43+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
44+
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
45+
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
46+
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
47+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
48+
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
49+
50+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
51+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
52+
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
53+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
54+
55+
@Category({ RegionServerTests.class, SmallTests.class })
56+
public class TestRpcLogDetails {
57+
58+
@ClassRule
59+
public static final HBaseClassTestRule CLASS_RULE =
60+
HBaseClassTestRule.forClass(TestRpcLogDetails.class);
61+
62+
private final ClientProtos.Scan scan =
63+
ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("abc")))
64+
.setStopRow(ByteString.copyFrom(Bytes.toBytes("xyz"))).build();
65+
private final ClientProtos.Scan otherScan =
66+
ClientProtos.Scan.newBuilder().setStartRow(ByteString.copyFrom(Bytes.toBytes("def")))
67+
.setStopRow(ByteString.copyFrom(Bytes.toBytes("uvw"))).build();
68+
private final ClientProtos.ScanRequest scanRequest = ClientProtos.ScanRequest
69+
.newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(scan).build();
70+
private final ClientProtos.ScanRequest otherScanRequest = ClientProtos.ScanRequest
71+
.newBuilder(ClientProtos.ScanRequest.getDefaultInstance()).setScan(otherScan).build();
72+
73+
@Test
74+
public void itDeepCopiesRpcLogDetailsParams() throws IOException {
75+
ByteBuffer buffer = ByteBuffer.allocate(scanRequest.toByteArray().length);
76+
CodedInputStream cis = UnsafeByteOperations.unsafeWrap(buffer).newCodedInput();
77+
cis.enableAliasing(true);
78+
buffer.put(scanRequest.toByteArray());
79+
Message.Builder messageBuilder = ClientProtos.ScanRequest.newBuilder();
80+
ProtobufUtil.mergeFrom(messageBuilder, cis, buffer.capacity());
81+
Message message = messageBuilder.build();
82+
RpcLogDetails rpcLogDetails =
83+
new RpcLogDetails(getRpcCall(message), message, null, 0L, 0L, null, true, false);
84+
85+
// log's scan should be equal
86+
ClientProtos.Scan logScan = ((ClientProtos.ScanRequest) rpcLogDetails.getParam()).getScan();
87+
assertEquals(logScan, scan);
88+
89+
// ensure we have a different byte array for testing
90+
assertFalse(Arrays.equals(scanRequest.toByteArray(), otherScanRequest.toByteArray()));
91+
92+
// corrupt the underlying buffer
93+
buffer.position(0);
94+
buffer.put(otherScanRequest.toByteArray(), 0, otherScanRequest.toByteArray().length);
95+
assertArrayEquals(otherScanRequest.toByteArray(), buffer.array());
96+
97+
// log scan should still be original scan
98+
assertEquals(logScan, scan);
99+
}
100+
101+
@SuppressWarnings("checkstyle:methodlength")
102+
private static RpcCall getRpcCall(Message message) {
103+
RpcCall rpcCall = new RpcCall() {
104+
@Override
105+
public BlockingService getService() {
106+
return null;
107+
}
108+
109+
@Override
110+
public Descriptors.MethodDescriptor getMethod() {
111+
return null;
112+
}
113+
114+
@Override
115+
public Message getParam() {
116+
return message;
117+
}
118+
119+
@Override
120+
public CellScanner getCellScanner() {
121+
return null;
122+
}
123+
124+
@Override
125+
public long getReceiveTime() {
126+
return 0;
127+
}
128+
129+
@Override
130+
public long getStartTime() {
131+
return 0;
132+
}
133+
134+
@Override
135+
public void setStartTime(long startTime) {
136+
}
137+
138+
@Override
139+
public int getTimeout() {
140+
return 0;
141+
}
142+
143+
@Override
144+
public int getPriority() {
145+
return 0;
146+
}
147+
148+
@Override
149+
public long getDeadline() {
150+
return 0;
151+
}
152+
153+
@Override
154+
public long getSize() {
155+
return 0;
156+
}
157+
158+
@Override
159+
public RPCProtos.RequestHeader getHeader() {
160+
return null;
161+
}
162+
163+
@Override
164+
public Map<String, byte[]> getConnectionAttributes() {
165+
return Collections.emptyMap();
166+
}
167+
168+
@Override
169+
public Map<String, byte[]> getRequestAttributes() {
170+
return Collections.emptyMap();
171+
}
172+
173+
@Override
174+
public byte[] getRequestAttribute(String key) {
175+
return null;
176+
}
177+
178+
@Override
179+
public int getRemotePort() {
180+
return 0;
181+
}
182+
183+
@Override
184+
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
185+
String error) {
186+
}
187+
188+
@Override
189+
public void sendResponseIfReady() throws IOException {
190+
}
191+
192+
@Override
193+
public void cleanup() {
194+
}
195+
196+
@Override
197+
public String toShortString() {
198+
return null;
199+
}
200+
201+
@Override
202+
public long disconnectSince() {
203+
return 0;
204+
}
205+
206+
@Override
207+
public boolean isClientCellBlockSupported() {
208+
return false;
209+
}
210+
211+
@Override
212+
public Optional<User> getRequestUser() {
213+
return null;
214+
}
215+
216+
@Override
217+
public InetAddress getRemoteAddress() {
218+
return null;
219+
}
220+
221+
@Override
222+
public HBaseProtos.VersionInfo getClientVersionInfo() {
223+
return null;
224+
}
225+
226+
@Override
227+
public void setCallBack(RpcCallback callback) {
228+
}
229+
230+
@Override
231+
public boolean isRetryImmediatelySupported() {
232+
return false;
233+
}
234+
235+
@Override
236+
public long getResponseCellSize() {
237+
return 0;
238+
}
239+
240+
@Override
241+
public void incrementResponseCellSize(long cellSize) {
242+
}
243+
244+
@Override
245+
public long getBlockBytesScanned() {
246+
return 0;
247+
}
248+
249+
@Override
250+
public void incrementBlockBytesScanned(long blockSize) {
251+
}
252+
253+
@Override
254+
public long getResponseExceptionSize() {
255+
return 0;
256+
}
257+
258+
@Override
259+
public void incrementResponseExceptionSize(long exceptionSize) {
260+
}
261+
};
262+
return rpcCall;
263+
}
264+
265+
}

0 commit comments

Comments
 (0)