Skip to content

Commit 2c72225

Browse files
shahrs87virajjasani
authored andcommitted
HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2908)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent b930ea1 commit 2c72225

2 files changed

Lines changed: 33 additions & 1 deletion

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,10 @@ private void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedExcept
246246
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
247247
// enabled, then dump the log
248248
private void handleEofException(IOException e) {
249+
// Dump the log even if logQueue size is 1 if the source is from recovered Source
250+
// since we don't add current log to recovered source queue so it is safe to remove.
249251
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
250-
logQueue.size() > 1 && this.eofAutoRecovery) {
252+
(source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
251253
try {
252254
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
253255
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.concurrent.atomic.AtomicInteger;
4343
import java.util.concurrent.atomic.AtomicLong;
4444
import org.apache.hadoop.conf.Configuration;
45+
import org.apache.hadoop.fs.FSDataOutputStream;
4546
import org.apache.hadoop.fs.FileSystem;
4647
import org.apache.hadoop.fs.Path;
4748
import org.apache.hadoop.hbase.Cell;
@@ -652,4 +653,33 @@ public void testReadBeyondCommittedLength() throws IOException, InterruptedExcep
652653
assertFalse(entryStream.hasNext());
653654
}
654655
}
656+
657+
/*
658+
Test removal of 0 length log from logQueue if the source is a recovered source and
659+
size of logQueue is only 1.
660+
*/
661+
@Test
662+
public void testEOFExceptionForRecoveredQueue() throws Exception {
663+
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
664+
// Create a 0 length log.
665+
Path emptyLog = new Path("emptyLog");
666+
FSDataOutputStream fsdos = fs.create(emptyLog);
667+
fsdos.close();
668+
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
669+
queue.add(emptyLog);
670+
671+
Configuration conf = new Configuration(CONF);
672+
// Override the max retries multiplier to fail fast.
673+
conf.setInt("replication.source.maxretriesmultiplier", 1);
674+
conf.setBoolean("replication.source.eof.autorecovery", true);
675+
// Create a reader thread with source as recovered source.
676+
ReplicationSource source = mockReplicationSource(true, conf);
677+
when(source.isPeerEnabled()).thenReturn(true);
678+
ReplicationSourceWALReader reader =
679+
new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
680+
reader.run();
681+
// ReplicationSourceWALReaderThread#handleEofException method will
682+
// remove empty log from logQueue.
683+
assertEquals(0, queue.size());
684+
}
655685
}

0 commit comments

Comments
 (0)