Skip to content

Commit 5c9ee97

Browse files
authored
HBASE-25536 Remove 0 length wal file from logQueue if it belongs to old sources (#2912) (#2908)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Geoffrey Jacoby <gjacoby@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
1 parent cf337c4 commit 5c9ee97

2 files changed

Lines changed: 44 additions & 1 deletion

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
8181
private long totalBufferQuota;
8282

8383
private ReplicationSource source;
84+
private ReplicationSourceManager manager;
8485

8586
/**
8687
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
@@ -121,6 +122,7 @@ public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
121122
this.metrics = metrics;
122123
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
123124
this.source = source;
125+
this.manager = manager;
124126
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
125127
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
126128
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -229,7 +231,10 @@ private void resetStream(WALEntryStream stream) throws IOException {
229231
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
230232
// enabled, then dump the log
231233
private void handleEofException(Exception e) {
232-
if (e.getCause() instanceof EOFException && logQueue.size() > 1
234+
boolean isRecoveredSource = manager.getOldSources().contains(source);
235+
// Dump the log even if logQueue size is 1 if the source is from recovered Source since we don't
236+
// add current log to recovered source queue so it is safe to remove.
237+
if (e.getCause() instanceof EOFException && (isRecoveredSource || logQueue.size() > 1)
233238
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
234239
try {
235240
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import java.io.IOException;
3535
import java.util.ArrayList;
36+
import java.util.Arrays;
3637
import java.util.Collections;
3738
import java.util.HashMap;
3839
import java.util.List;
@@ -49,6 +50,7 @@
4950
import java.util.concurrent.atomic.AtomicLong;
5051

5152
import org.apache.hadoop.conf.Configuration;
53+
import org.apache.hadoop.fs.FSDataOutputStream;
5254
import org.apache.hadoop.fs.FileSystem;
5355
import org.apache.hadoop.fs.Path;
5456
import org.apache.hadoop.hbase.Cell;
@@ -692,4 +694,40 @@ public WALEntryBatch call() throws Exception {
692694
assertEquals(walPath, entryBatch.getLastWalPath());
693695
assertEquals(3, entryBatch.getNbRowKeys());
694696
}
697+
698+
/*
699+
Test removal of 0 length log from logQueue if the source is a recovered source and
700+
size of logQueue is only 1.
701+
*/
702+
@Test
703+
public void testEOFExceptionForRecoveredQueue() throws Exception {
704+
PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
705+
// Create a 0 length log.
706+
Path emptyLog = new Path("emptyLog");
707+
FSDataOutputStream fsdos = fs.create(emptyLog);
708+
fsdos.close();
709+
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
710+
queue.add(emptyLog);
711+
712+
ReplicationSource source = Mockito.mock(ReplicationSource.class);
713+
714+
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
715+
// Make it look like the source is from recovered source.
716+
when(mockSourceManager.getOldSources())
717+
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
718+
when(source.isPeerEnabled()).thenReturn(true);
719+
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
720+
// Override the max retries multiplier to fail fast.
721+
conf.setInt("replication.source.maxretriesmultiplier", 1);
722+
conf.setBoolean("replication.source.eof.autorecovery", true);
723+
// Create a reader thread.
724+
ReplicationSourceWALReaderThread reader =
725+
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
726+
queue, 0, fs, conf, getDummyFilter(),
727+
new MetricsSource("1"), (ReplicationSource) source);
728+
reader.run();
729+
// ReplicationSourceWALReaderThread#handleEofException method will
730+
// remove empty log from logQueue.
731+
assertEquals(0, queue.size());
732+
}
695733
}

0 commit comments

Comments
 (0)