Skip to content

Commit 8bfcc0a

Browse files
committed
HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used
1 parent da9a53e commit 8bfcc0a

34 files changed

Lines changed: 287 additions & 157 deletions

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,8 @@ public static ImmutableByteArray wrap(byte[] b) {
5151
public String toStringUtf8() {
5252
return Bytes.toString(b);
5353
}
54+
55+
public String toStringBinary() {
56+
return Bytes.toStringBinary(b);
57+
}
5458
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ public void testPartialRead() throws Exception {
135135
long ts = System.currentTimeMillis();
136136
WALEdit edit = new WALEdit();
137137
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
138-
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
138+
log.append(info, getWalKeyImpl(ts, scopes), edit, true, false);
139139
edit = new WALEdit();
140140
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
141-
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
141+
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true, false);
142142
log.sync();
143143
LOG.info("Before 1st WAL roll " + log.toString());
144144
log.rollWriter();
@@ -149,10 +149,10 @@ public void testPartialRead() throws Exception {
149149

150150
edit = new WALEdit();
151151
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
152-
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
152+
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true, false);
153153
edit = new WALEdit();
154154
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
155-
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
155+
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true, false);
156156
log.sync();
157157
log.shutdown();
158158
walfactory.shutdown();
@@ -193,7 +193,8 @@ public void testWALRecordReader() throws Exception {
193193
WALEdit edit = new WALEdit();
194194
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
195195
System.currentTimeMillis(), value));
196-
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
196+
long txid =
197+
log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true, false);
197198
log.sync(txid);
198199

199200
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@@ -203,7 +204,7 @@ public void testWALRecordReader() throws Exception {
203204
edit = new WALEdit();
204205
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
205206
System.currentTimeMillis(), value));
206-
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
207+
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true, false);
207208
log.sync(txid);
208209
log.shutdown();
209210
walfactory.shutdown();
@@ -253,17 +254,17 @@ public void testWALRecordReaderActiveArchiveTolerance() throws Exception {
253254
WAL log = walfactory.getWAL(info);
254255
byte [] value = Bytes.toBytes("value");
255256
WALEdit edit = new WALEdit();
256-
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
257-
System.currentTimeMillis(), value));
258-
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
257+
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value));
258+
long txid =
259+
log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true, false);
259260
log.sync(txid);
260261

261262
Thread.sleep(10); // make sure 2nd edit gets a later timestamp
262263

263264
edit = new WALEdit();
264265
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
265266
System.currentTimeMillis(), value));
266-
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
267+
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true, false);
267268
log.sync(txid);
268269
log.shutdown();
269270

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7990,7 +7990,7 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID
79907990
}
79917991
WriteEntry writeEntry = null;
79927992
try {
7993-
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
7993+
long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true, false);
79947994
// Call sync on our edit.
79957995
if (txid != 0) {
79967996
sync(txid, durability);

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,8 +1001,13 @@ protected final boolean append(W writer, FSWALEntry entry) throws IOException {
10011001
doAppend(writer, entry);
10021002
assert highestUnsyncedTxid < entry.getTxid();
10031003
highestUnsyncedTxid = entry.getTxid();
1004-
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1005-
entry.isInMemStore());
1004+
if (entry.isCloseRegion()) {
1005+
// let's clean all the records of this region
1006+
sequenceIdAccounting.onRegionClose(encodedRegionName);
1007+
} else {
1008+
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
1009+
entry.isInMemStore());
1010+
}
10061011
coprocessorHost.postWALWrite(entry.getRegionInfo(), entry.getKey(), entry.getEdit());
10071012
// Update metrics.
10081013
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);
@@ -1052,11 +1057,11 @@ protected final void postSync(long timeInNanos, int handlerSyncs) {
10521057
}
10531058

10541059
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
1055-
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
1056-
throws IOException {
1060+
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
1061+
throws IOException {
10571062
if (this.closed) {
10581063
throw new IOException(
1059-
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
1064+
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
10601065
}
10611066
MutableLong txidHolder = new MutableLong();
10621067
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
@@ -1066,7 +1071,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
10661071
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
10671072
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
10681073
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
1069-
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
1074+
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
10701075
entry.stampRegionSequenceId(we);
10711076
ringBuffer.get(txid).load(entry);
10721077
} finally {
@@ -1115,8 +1120,8 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {
11151120
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
11161121
*/
11171122
@Override
1118-
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
1119-
throws IOException;
1123+
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
1124+
boolean closeRegion) throws IOException;
11201125

11211126
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
11221127

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -615,13 +615,13 @@ protected boolean markerEditOnly() {
615615
}
616616

617617
@Override
618-
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
619-
throws IOException {
618+
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
619+
boolean closeRegion) throws IOException {
620620
if (markerEditOnly() && !edits.isMetaEdit()) {
621621
throw new IOException("WAL is closing, only marker edit is allowed");
622622
}
623-
long txid =
624-
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
623+
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
624+
waitingConsumePayloads);
625625
if (shouldScheduleConsumer()) {
626626
consumeExecutor.execute(consumer);
627627
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,12 +433,10 @@ protected void doShutdown() throws IOException {
433433
}
434434
}
435435

436-
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
437-
justification = "Will never be null")
438436
@Override
439437
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
440-
final boolean inMemstore) throws IOException {
441-
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
438+
final boolean inMemstore, boolean closeRegion) throws IOException {
439+
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
442440
disruptor.getRingBuffer());
443441
}
444442

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,16 @@ class FSWALEntry extends Entry {
5151
// they are only in memory and held here while passing over the ring buffer.
5252
private final transient long txid;
5353
private final transient boolean inMemstore;
54+
private final transient boolean closeRegion;
5455
private final transient RegionInfo regionInfo;
5556
private final transient Set<byte[]> familyNames;
5657
private final transient ServerCall<?> rpcCall;
5758

5859
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
59-
final boolean inMemstore, ServerCall<?> rpcCall) {
60+
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
6061
super(key, edit);
6162
this.inMemstore = inMemstore;
63+
this.closeRegion = closeRegion;
6264
this.regionInfo = regionInfo;
6365
this.txid = txid;
6466
if (inMemstore) {
@@ -98,6 +100,10 @@ boolean isInMemStore() {
98100
return this.inMemstore;
99101
}
100102

103+
boolean isCloseRegion() {
104+
return closeRegion;
105+
}
106+
101107
RegionInfo getRegionInfo() {
102108
return this.regionInfo;
103109
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.ConcurrentMap;
31+
import java.util.stream.Collectors;
3132
import org.apache.hadoop.hbase.HConstants;
3233
import org.apache.hadoop.hbase.util.Bytes;
3334
import org.apache.hadoop.hbase.util.ImmutableByteArray;
@@ -184,6 +185,30 @@ void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
184185
}
185186
}
186187

188+
/**
189+
* Clear all the records of the given region as it is going to be closed.
190+
* <p/>
191+
* We will call this once we get the region close marker. We need this because that, if we use
192+
* Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries
193+
* that has not been processed yet, this will lead to orphan records in the
194+
* lowestUnflushedSequenceIds and then cause too many WAL files.
195+
* <p/>
196+
* See HBASE-23157 for more details.
197+
*/
198+
void onRegionClose(byte[] encodedRegionName) {
199+
synchronized (tieLock) {
200+
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
201+
Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName);
202+
if (flushing != null) {
203+
LOG.warn("Still have flushing records when closing {}, {}",
204+
Bytes.toString(encodedRegionName),
205+
flushing.entrySet().stream().map(e -> e.getKey().toStringBinary() + "->" + e.getValue())
206+
.collect(Collectors.joining(",", "{", "}")));
207+
}
208+
}
209+
this.highestSequenceIds.remove(encodedRegionName);
210+
}
211+
187212
/**
188213
* Update the store sequence id, e.g., upon executing in-memory compaction
189214
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java

Lines changed: 41 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,19 @@ private WALUtil() {
5959
}
6060

6161
/**
62-
* Write the marker that a compaction has succeeded and is about to be committed.
63-
* This provides info to the HMaster to allow it to recover the compaction if this regionserver
64-
* dies in the middle. It also prevents the compaction from finishing if this regionserver has
65-
* already lost its lease on the log.
66-
*
67-
* <p>This write is for internal use only. Not for external client consumption.
62+
* Write the marker that a compaction has succeeded and is about to be committed. This provides
63+
* info to the HMaster to allow it to recover the compaction if this regionserver dies in the
64+
* middle. It also prevents the compaction from finishing if this regionserver has already lost
65+
* its lease on the log.
66+
* <p/>
67+
* This write is for internal use only. Not for external client consumption.
6868
* @param mvcc Used by WAL to get sequence Id for the waledit.
6969
*/
7070
public static WALKeyImpl writeCompactionMarker(WAL wal,
71-
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
72-
MultiVersionConcurrencyControl mvcc)
73-
throws IOException {
71+
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
72+
MultiVersionConcurrencyControl mvcc) throws IOException {
7473
WALKeyImpl walKey =
75-
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
74+
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null);
7675
if (LOG.isTraceEnabled()) {
7776
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
7877
}
@@ -81,30 +80,30 @@ public static WALKeyImpl writeCompactionMarker(WAL wal,
8180

8281
/**
8382
* Write a flush marker indicating a start / abort or a complete of a region flush
84-
*
85-
* <p>This write is for internal use only. Not for external client consumption.
83+
* <p/>
84+
* This write is for internal use only. Not for external client consumption.
8685
*/
8786
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
88-
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
89-
throws IOException {
87+
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
88+
throws IOException {
9089
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
91-
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
90+
WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync);
9291
if (LOG.isTraceEnabled()) {
9392
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
9493
}
9594
return walKey;
9695
}
9796

9897
/**
99-
* Write a region open marker indicating that the region is opened.
100-
* This write is for internal use only. Not for external client consumption.
98+
* Write a region open marker indicating that the region is opened. This write is for internal use
99+
* only. Not for external client consumption.
101100
*/
102101
public static WALKeyImpl writeRegionEventMarker(WAL wal,
103-
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri,
104-
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
105-
throws IOException {
106-
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
107-
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
102+
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final RegionEventDescriptor r,
103+
final MultiVersionConcurrencyControl mvcc) throws IOException {
104+
WALKeyImpl walKey =
105+
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r),
106+
r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null);
108107
if (LOG.isTraceEnabled()) {
109108
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
110109
}
@@ -122,48 +121,44 @@ public static WALKeyImpl writeRegionEventMarker(WAL wal,
122121
* @throws IOException We will throw an IOException if we can not append to the HLog.
123122
*/
124123
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
125-
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
126-
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
127-
throws IOException {
128-
WALKeyImpl walKey =
129-
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
124+
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
125+
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
126+
throws IOException {
127+
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
128+
WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null);
130129
if (LOG.isTraceEnabled()) {
131130
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
132131
}
133132
return walKey;
134133
}
135134

136135
private static WALKeyImpl writeMarker(final WAL wal,
137-
final NavigableMap<byte[], Integer> replicationScope,
138-
final RegionInfo hri,
139-
final WALEdit edit,
140-
final MultiVersionConcurrencyControl mvcc,
141-
final Map<String, byte[]> extendedAttributes)
142-
throws IOException {
136+
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
137+
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
138+
final Map<String, byte[]> extendedAttributes) throws IOException {
143139
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
144-
return doFullAppendTransaction(wal, replicationScope, hri,
145-
edit, mvcc, extendedAttributes, true);
140+
return doFullAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc,
141+
extendedAttributes, true);
146142
}
147143

148144
/**
149-
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append,
150-
* an optional sync, and then a call to complete the mvcc transaction. This method does it all.
151-
* Good for case of adding a single edit or marker to the WAL.
152-
*
153-
* <p>This write is for internal use only. Not for external client consumption.
145+
* A 'full' WAL transaction involves starting an mvcc transaction followed by an append, an
146+
* optional sync, and then a call to complete the mvcc transaction. This method does it all. Good
147+
* for case of adding a single edit or marker to the WAL.
148+
* <p/>
149+
* This write is for internal use only. Not for external client consumption.
154150
* @return WALKeyImpl that was added to the WAL.
155151
*/
156152
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
157-
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
158-
final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
159-
final Map<String, byte[]> extendedAttributes, final boolean sync)
160-
throws IOException {
153+
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
154+
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
155+
final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
161156
// TODO: Pass in current time to use?
162157
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
163-
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
158+
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
164159
long trx = MultiVersionConcurrencyControl.NONE;
165160
try {
166-
trx = wal.append(hri, walKey, edit, false);
161+
trx = wal.append(hri, walKey, edit, false, closeRegion);
167162
if (sync) {
168163
wal.sync(trx);
169164
}

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ public void close() {
161161
}
162162

163163
@Override
164-
public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
165-
throws IOException {
164+
public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
165+
boolean closeRegion) throws IOException {
166166
WriteEntry writeEntry = key.getMvcc().begin();
167167
if (!edits.isReplay()) {
168168
for (Cell cell : edits.getCells()) {

0 commit comments

Comments
 (0)