Skip to content

Commit 2ea083d

Browse files
authored
[To dev/1.3] Pipe: Fixed the event clear logic of drop pipe (#17560) (#17619)
* Pipe: Fixed the event clear logic of drop pipe (#17560) * drop-1 * wd * drop * fix * local * triple * by * spt * bug-fix * no-pipe-task-key * Update IoTDBDataRegionAsyncSink.java * triple * Fix * comp * comp-fix02 * drop-n
1 parent 35482e4 commit 2ea083d

20 files changed

Lines changed: 599 additions & 72 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,26 +178,34 @@ private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDat
178178

179179
private void collectEvent(final Event event) {
180180
if (event instanceof EnrichedEvent) {
181-
if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) {
181+
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
182+
if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) {
182183
LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event);
183184
isFailedToIncreaseReferenceCount = true;
184185
return;
185186
}
186187

187188
// Assign a commit id for this event in order to report progress in order.
188189
PipeEventCommitManager.getInstance()
189-
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId);
190+
.enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId);
190191

191-
// Assign a rebootTime for pipeConsensus
192-
((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
192+
// Assign a rebootTime for iotConsensusV2
193+
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
194+
195+
if (enrichedEvent.getPipeName() != null
196+
&& pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) {
197+
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
198+
return;
199+
}
193200
}
194201

195202
if (event instanceof PipeHeartbeatEvent) {
196203
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
197204
}
198205

199-
pendingQueue.offer(event);
200-
collectInvocationCount.incrementAndGet();
206+
if (pendingQueue.offer(event)) {
207+
collectInvocationCount.incrementAndGet();
208+
}
201209
}
202210

203211
public void resetFlags() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ public PipeRealtimePriorityBlockingQueue() {
7373

7474
@Override
7575
public boolean offer(final Event event) {
76-
checkBeforeOffer(event);
76+
if (!checkBeforeOffer(event)) {
77+
return false;
78+
}
7779

7880
if (event instanceof TsFileInsertionEvent) {
7981
tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
@@ -356,13 +358,14 @@ public void discardAllEvents() {
356358
}
357359

358360
@Override
359-
public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
360-
super.discardEventsOfPipe(pipeNameToDrop, regionId);
361+
public void discardEventsOfPipe(
362+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
363+
super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
361364
tsfileInsertEventDeque.removeIf(
362365
event -> {
363366
if (event instanceof EnrichedEvent
364-
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
365-
&& regionId == ((EnrichedEvent) event).getRegionId()) {
367+
&& isEventFromPipe(
368+
((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) {
366369
if (((EnrichedEvent) event)
367370
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
368371
eventCounter.decreaseEventCount(event);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2626
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2727
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
28+
import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
2829
import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils;
2930
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
3031
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
@@ -199,9 +200,10 @@ public void close() {
199200
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
200201
* its queued events in the output pipe connector.
201202
*/
202-
public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
203+
public void discardEventsOfPipe(
204+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
203205
// Try to remove the events as much as possible
204-
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);
206+
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
205207

206208
try {
207209
increaseHighPriorityTaskCount();
@@ -215,6 +217,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
215217
// will.
216218
if (lastEvent instanceof EnrichedEvent
217219
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
220+
&& creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime()
218221
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
219222
// Do not clear the last event's reference counts because it may be on transferring
220223
lastEvent = null;
@@ -238,6 +241,7 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
238241
// "nonnull" detection.
239242
if (lastExceptionEvent instanceof EnrichedEvent
240243
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
244+
&& creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime()
241245
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
242246
clearReferenceCountAndReleaseLastExceptionEvent();
243247
}
@@ -246,8 +250,9 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
246250
decreaseHighPriorityTaskCount();
247251
}
248252

249-
if (outputPipeConnector instanceof IoTDBSink) {
250-
((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, regionId);
253+
if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) {
254+
((PipeConnectorWithEventDiscard) outputPipeConnector)
255+
.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
251256
}
252257
}
253258

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,13 @@ public synchronized void register() {
9292
* {@link PipeSinkSubtask} should never be used again
9393
* @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
9494
*/
95-
public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) {
95+
public synchronized boolean deregister(
96+
final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) {
9697
if (registeredTaskCount <= 0) {
9798
throw new IllegalStateException("registeredTaskCount <= 0");
9899
}
99100

100-
subtask.discardEventsOfPipe(pipeNameToDeregister, regionId);
101+
subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId);
101102

102103
try {
103104
if (registeredTaskCount > 1) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public synchronized void deregister(
209209
// Shall not be empty
210210
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
211211

212-
lifeCycles.removeIf(o -> o.deregister(pipeName, regionId));
212+
lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
213213

214214
if (lifeCycles.isEmpty()) {
215215
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,13 @@ public synchronized void close() {
154154
* Discard all events of the given pipe. This method only clears the reference count of the events
155155
* and discard them, but do not modify other objects (such as buffers) for simplicity.
156156
*/
157-
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
157+
public synchronized void discardEventsOfPipe(
158+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
158159
events.removeIf(
159160
event -> {
160-
if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) {
161+
if (pipeNameToDrop.equals(event.getPipeName())
162+
&& creationTimeToDrop == event.getCreationTime()
163+
&& regionId == event.getRegionId()) {
161164
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
162165
return true;
163166
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,12 @@ public boolean isEmpty() {
195195
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
196196
}
197197

198-
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
199-
defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId);
200-
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId));
198+
public synchronized void discardEventsOfPipe(
199+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
200+
defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
201+
endPointToBatch
202+
.values()
203+
.forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId));
201204
}
202205

203206
public int size() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.client.ThriftClient;
2424
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
2525
import org.apache.iotdb.commons.pipe.config.PipeConfig;
26+
import org.apache.iotdb.commons.pipe.datastructure.Triple;
2627
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2728
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
2829
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -74,6 +75,7 @@
7475
import java.util.List;
7576
import java.util.Map;
7677
import java.util.Objects;
78+
import java.util.Set;
7779
import java.util.concurrent.BlockingQueue;
7880
import java.util.concurrent.CompletableFuture;
7981
import java.util.concurrent.ConcurrentHashMap;
@@ -121,6 +123,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
121123
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
122124
new ConcurrentHashMap<>();
123125

126+
// Pipe name, creation time, region id
127+
private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
128+
ConcurrentHashMap.newKeySet();
129+
124130
private boolean enableSendTsFileLimit;
125131
private volatile boolean isConnectionException;
126132

@@ -660,8 +666,15 @@ private void retryTransfer(final PipeTsFileInsertionEvent tsFileInsertionEvent)
660666
public void addFailureEventToRetryQueue(final Event event, final Exception e) {
661667
isConnectionException =
662668
e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e);
663-
if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) {
664-
return;
669+
if (event instanceof EnrichedEvent) {
670+
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
671+
if (enrichedEvent.isReleased()) {
672+
return;
673+
}
674+
if (isDroppedPipe(enrichedEvent)) {
675+
enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
676+
return;
677+
}
665678
}
666679

667680
if (isClosed.get()) {
@@ -707,15 +720,18 @@ public boolean isEnableSendTsFileLimit() {
707720
//////////////////////////// Operations for close ////////////////////////////
708721

709722
@Override
710-
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
711-
if (isTabletBatchModeEnabled) {
712-
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
723+
public synchronized void discardEventsOfPipe(
724+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
725+
droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId));
726+
727+
if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
728+
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
713729
}
714730
retryEventQueue.removeIf(
715731
event -> {
716732
if (event instanceof EnrichedEvent
717-
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
718-
&& regionId == ((EnrichedEvent) event).getRegionId()) {
733+
&& isDroppedPipe(
734+
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
719735
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
720736
retryEventQueueEventCounter.decreaseEventCount(event);
721737
return true;
@@ -726,8 +742,8 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final
726742
retryTsFileQueue.removeIf(
727743
event -> {
728744
if (event instanceof EnrichedEvent
729-
&& pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName())
730-
&& regionId == ((EnrichedEvent) event).getRegionId()) {
745+
&& isDroppedPipe(
746+
(EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) {
731747
((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
732748
retryEventQueueEventCounter.decreaseEventCount(event);
733749
return true;
@@ -771,6 +787,7 @@ public synchronized void close() {
771787

772788
// clear reference count of events in retry queue after closing async client
773789
clearRetryEventsReferenceCount();
790+
droppedPipeTaskKeys.clear();
774791

775792
super.close();
776793
}
@@ -827,6 +844,21 @@ public void setTransferTsFileCounter(AtomicInteger transferTsFileCounter) {
827844
this.transferTsFileCounter = transferTsFileCounter;
828845
}
829846

847+
private boolean isDroppedPipe(final EnrichedEvent event) {
848+
return droppedPipeTaskKeys.contains(
849+
new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId()));
850+
}
851+
852+
private static boolean isDroppedPipe(
853+
final EnrichedEvent event,
854+
final String pipeNameToDrop,
855+
final long creationTimeToDrop,
856+
final int regionId) {
857+
return pipeNameToDrop.equals(event.getPipeName())
858+
&& creationTimeToDrop == event.getCreationTime()
859+
&& regionId == event.getRegionId();
860+
}
861+
830862
@Override
831863
public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) {
832864
if (tabletBatchBuilder != null) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,10 @@ public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOEx
521521
}
522522

523523
@Override
524-
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
524+
public synchronized void discardEventsOfPipe(
525+
final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) {
525526
if (Objects.nonNull(tabletBatchBuilder)) {
526-
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
527+
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
527528
}
528529
}
529530

0 commit comments

Comments
 (0)