Skip to content

Commit 8c3d2b6

Browse files
authored
Pipe: Removed the failed partial insert from auth check & Implemented skipReportOnCommit for PipeRealtimeEvent to avoid premature report for unordered flush (#17561)
* schema * sptls * fix
1 parent 44398ab commit 8c3d2b6

4 files changed

Lines changed: 173 additions & 0 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,10 @@ private void checkTreePattern(final IDeviceID deviceID, final String[] measureme
325325
throws IllegalPathException {
326326
final List<MeasurementPath> measurementList = new ArrayList<>();
327327
for (final String measurement : measurements) {
328+
// Ignore failed measurements in partial inserts, consistent with downstream matching/parsing.
329+
if (measurement == null) {
330+
continue;
331+
}
328332
if (treePattern.matchesMeasurement(deviceID, measurement)) {
329333
measurementList.add(new MeasurementPath(deviceID, measurement));
330334
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,16 @@ public ProgressIndex getProgressIndex() {
164164
return event.getProgressIndex();
165165
}
166166

167+
@Override
168+
public void skipReportOnCommit() {
169+
event.skipReportOnCommit();
170+
}
171+
172+
@Override
173+
public boolean isShouldReportOnCommit() {
174+
return event.isShouldReportOnCommit();
175+
}
176+
167177
@Override
168178
public void skipParsingPattern() {
169179
event.skipParsingPattern();
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event;
21+
22+
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
23+
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
24+
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
25+
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
26+
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
27+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
28+
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
29+
30+
import org.junit.Assert;
31+
import org.junit.Test;
32+
33+
public class PipeRealtimeEventTest {
34+
35+
@Test
36+
public void testSkipReportOnCommitIsDelegatedToInnerEvent() {
37+
final TestEnrichedEvent innerEvent = new TestEnrichedEvent();
38+
final PipeRealtimeEvent realtimeEvent = new PipeRealtimeEvent(innerEvent, null, null);
39+
40+
Assert.assertTrue(innerEvent.isShouldReportOnCommit());
41+
Assert.assertTrue(realtimeEvent.isShouldReportOnCommit());
42+
43+
realtimeEvent.skipReportOnCommit();
44+
45+
Assert.assertFalse(innerEvent.isShouldReportOnCommit());
46+
Assert.assertFalse(realtimeEvent.isShouldReportOnCommit());
47+
}
48+
49+
private static class TestEnrichedEvent extends EnrichedEvent {
50+
51+
private TestEnrichedEvent() {
52+
super(null, 0, null, null, null, null, null, null, false, Long.MIN_VALUE, Long.MAX_VALUE);
53+
}
54+
55+
@Override
56+
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
57+
return true;
58+
}
59+
60+
@Override
61+
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
62+
return true;
63+
}
64+
65+
@Override
66+
public ProgressIndex getProgressIndex() {
67+
return MinimumProgressIndex.INSTANCE;
68+
}
69+
70+
@Override
71+
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
72+
final String pipeName,
73+
final long creationTime,
74+
final PipeTaskMeta pipeTaskMeta,
75+
final TreePattern treePattern,
76+
final TablePattern tablePattern,
77+
final String userId,
78+
final String userName,
79+
final String cliHostname,
80+
final boolean skipIfNoPrivileges,
81+
final long startTime,
82+
final long endTime) {
83+
return this;
84+
}
85+
86+
@Override
87+
public boolean isGeneratedByPipe() {
88+
return false;
89+
}
90+
91+
@Override
92+
public boolean mayEventTimeOverlappedWithTimeRange() {
93+
return true;
94+
}
95+
96+
@Override
97+
public boolean mayEventPathsOverlappedWithPattern() {
98+
return true;
99+
}
100+
}
101+
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.apache.iotdb.db.pipe.event;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.audit.IAuditEntity;
24+
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
2225
import org.apache.iotdb.commons.exception.IllegalPathException;
2326
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
2427
import org.apache.iotdb.commons.path.PartialPath;
@@ -471,4 +474,59 @@ public void testAuthCheck() {
471474
event.close();
472475
}
473476
}
477+
478+
@Test
479+
public void testAuthCheckIgnoresNullMeasurementInPartialInsert() throws Exception {
480+
insertRowNode.markFailedMeasurement(1);
481+
482+
final PipeInsertNodeTabletInsertionEvent event =
483+
new PipeInsertNodeTabletInsertionEvent(
484+
false,
485+
"root.db",
486+
insertRowNode,
487+
null,
488+
0,
489+
null,
490+
new PrefixTreePattern(pattern),
491+
new TablePattern(true, null, null),
492+
"0",
493+
"user",
494+
"localhost",
495+
false,
496+
Long.MIN_VALUE,
497+
Long.MAX_VALUE);
498+
final AccessControl oldControl = AuthorityChecker.getAccessControl();
499+
final NullMeasurementRejectingAccessControl accessControl =
500+
new NullMeasurementRejectingAccessControl();
501+
try {
502+
AuthorityChecker.setAccessControl(accessControl);
503+
504+
event.throwIfNoPrivilege();
505+
506+
Assert.assertFalse(accessControl.hasNullMeasurementPath);
507+
Assert.assertFalse(event.shouldParse4Privilege());
508+
} finally {
509+
AuthorityChecker.setAccessControl(oldControl);
510+
event.close();
511+
}
512+
}
513+
514+
private static class NullMeasurementRejectingAccessControl
515+
extends PipeTsFileInsertionEventTest.TestAccessControl {
516+
517+
private boolean hasNullMeasurementPath = false;
518+
519+
@Override
520+
public TSStatus checkSeriesPrivilege4Pipe(
521+
final IAuditEntity context,
522+
final java.util.List<? extends PartialPath> checkedPathsSupplier,
523+
final PrivilegeType permission) {
524+
hasNullMeasurementPath =
525+
checkedPathsSupplier.stream().anyMatch(path -> path.getFullPath().endsWith(".null"));
526+
return hasNullMeasurementPath
527+
? AuthorityChecker.getTSStatus(
528+
Collections.singletonList(0), checkedPathsSupplier, permission)
529+
: new TSStatus(org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode());
530+
}
531+
}
474532
}

0 commit comments

Comments
 (0)