Skip to content

Commit c28e285

Browse files
committed
HBASE-28338 Bounded leak of FSDataInputStream buffers from checksum switching (apache#5660)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
1 parent 972471b commit c28e285

2 files changed

Lines changed: 23 additions & 51 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java

Lines changed: 9 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22-
import java.io.InputStream;
2322
import java.util.concurrent.atomic.AtomicInteger;
24-
import org.apache.hadoop.fs.CanUnbuffer;
2523
import org.apache.hadoop.fs.FSDataInputStream;
2624
import org.apache.hadoop.fs.FileSystem;
2725
import org.apache.hadoop.fs.Path;
2826
import org.apache.hadoop.hbase.fs.HFileSystem;
2927
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
3028
import org.apache.yetus.audience.InterfaceAudience;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
3329

3430
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
3531

@@ -40,8 +36,6 @@
4036
*/
4137
@InterfaceAudience.Private
4238
public class FSDataInputStreamWrapper implements Closeable {
43-
private static final Logger LOG = LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
44-
private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
4539

4640
private final HFileSystem hfs;
4741
private final Path path;
@@ -94,9 +88,6 @@ private static class ReadStatistics {
9488
long totalZeroCopyBytesRead;
9589
}
9690

97-
private Boolean instanceOfCanUnbuffer = null;
98-
private CanUnbuffer unbuffer = null;
99-
10091
protected Path readerPath;
10192

10293
public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
@@ -314,41 +305,22 @@ public HFileSystem getHfs() {
314305
* stream, the current socket will be closed and a new socket will be opened to serve the
315306
* requests.
316307
*/
317-
@SuppressWarnings({ "rawtypes" })
318308
public void unbuffer() {
309+
// todo: it may make sense to always unbuffer both streams. we'd need to carefully
310+
// research the usages to know if that is safe. for now just do the current.
319311
FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
320312
if (stream != null) {
321-
InputStream wrappedStream = stream.getWrappedStream();
322-
// CanUnbuffer interface was added as part of HDFS-7694 and the fix is available in Hadoop
323-
// 2.6.4+ and 2.7.1+ versions only so check whether the stream object implements the
324-
// CanUnbuffer interface or not and based on that call the unbuffer api.
325-
final Class<? extends InputStream> streamClass = wrappedStream.getClass();
326-
if (this.instanceOfCanUnbuffer == null) {
327-
// To ensure we compute whether the stream is instance of CanUnbuffer only once.
328-
this.instanceOfCanUnbuffer = false;
329-
if (wrappedStream instanceof CanUnbuffer) {
330-
this.unbuffer = (CanUnbuffer) wrappedStream;
331-
this.instanceOfCanUnbuffer = true;
332-
}
333-
}
334-
if (this.instanceOfCanUnbuffer) {
335-
try {
336-
this.unbuffer.unbuffer();
337-
} catch (UnsupportedOperationException e) {
338-
if (isLogTraceEnabled) {
339-
LOG.trace("Failed to invoke 'unbuffer' method in class " + streamClass
340-
+ " . So there may be the stream does not support unbuffering.", e);
341-
}
342-
}
343-
} else {
344-
if (isLogTraceEnabled) {
345-
LOG.trace("Failed to find 'unbuffer' method in class " + streamClass);
346-
}
347-
}
313+
stream.unbuffer();
348314
}
349315
}
350316

351317
public Path getReaderPath() {
352318
return readerPath;
353319
}
320+
321+
// For tests
322+
void setShouldUseHBaseChecksum() {
323+
useHBaseChecksumConfigured = true;
324+
useHBaseChecksum = true;
325+
}
354326
}

hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.io;
1919

20+
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertTrue;
2122

2223
import java.io.IOException;
@@ -31,6 +32,7 @@
3132
import org.apache.hadoop.fs.FSInputStream;
3233
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
3334
import org.apache.hadoop.fs.ReadOption;
35+
import org.apache.hadoop.fs.StreamCapabilities;
3436
import org.apache.hadoop.hbase.HBaseClassTestRule;
3537
import org.apache.hadoop.hbase.testclassification.SmallTests;
3638
import org.apache.hadoop.io.ByteBufferPool;
@@ -48,22 +50,22 @@ public class TestFSDataInputStreamWrapper {
4850
@Test
4951
public void testUnbuffer() throws Exception {
5052
InputStream pc = new ParentClass();
51-
FSDataInputStreamWrapper fsdisw1 = new FSDataInputStreamWrapper(new FSDataInputStream(pc));
53+
InputStream noChecksumPc = new ParentClass();
54+
FSDataInputStreamWrapper fsdisw1 =
55+
new FSDataInputStreamWrapper(new FSDataInputStream(pc), new FSDataInputStream(noChecksumPc));
5256
fsdisw1.unbuffer();
53-
// parent class should be true
57+
// should have called main stream unbuffer, but not no-checksum
5458
assertTrue(((ParentClass) pc).getIsCallUnbuffer());
59+
assertFalse(((ParentClass) noChecksumPc).getIsCallUnbuffer());
60+
// switch to checksums and call unbuffer again. should unbuffer the nochecksum stream now
61+
fsdisw1.setShouldUseHBaseChecksum();
62+
fsdisw1.unbuffer();
63+
assertTrue(((ParentClass) noChecksumPc).getIsCallUnbuffer());
5564
fsdisw1.close();
56-
57-
InputStream cc1 = new ChildClass1();
58-
FSDataInputStreamWrapper fsdisw2 = new FSDataInputStreamWrapper(new FSDataInputStream(cc1));
59-
fsdisw2.unbuffer();
60-
// child1 class should be true
61-
assertTrue(((ChildClass1) cc1).getIsCallUnbuffer());
62-
fsdisw2.close();
6365
}
6466

6567
private class ParentClass extends FSInputStream implements ByteBufferReadable, CanSetDropBehind,
66-
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
68+
CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
6769

6870
public boolean isCallUnbuffer = false;
6971

@@ -122,12 +124,10 @@ public long getPos() throws IOException {
122124
public boolean seekToNewSource(long paramLong) throws IOException {
123125
return false;
124126
}
125-
}
126127

127-
private class ChildClass1 extends ParentClass {
128128
@Override
129-
public void unbuffer() {
130-
isCallUnbuffer = true;
129+
public boolean hasCapability(String s) {
130+
return s.equals(StreamCapabilities.UNBUFFER);
131131
}
132132
}
133133
}

0 commit comments

Comments
 (0)