Skip to content

Commit ea45c4f

Browse files
authored
[To dev/1.3] Pipe: Fixed the first-chunk calculation bug for scan parser (#17597) (#17622)
* Pipe: Fixed the first-chunk calculation bug for scan parser (#17597) * Update TsFileInsertionDataContainerTest.java * Update TsFileInsertionDataContainerTest.java
1 parent f6995bb commit ea45c4f

2 files changed

Lines changed: 67 additions & 5 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,6 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
605605
final long chunkSize = timeChunkSize + valueChunkSize;
606606
if (chunkSize + chunkHeader.getDataSize()
607607
> allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
608-
if (valueChunkList.size() == 1
609-
&& chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
610-
PipeDataNodeResourceManager.memory()
611-
.forceResize(allocatedMemoryBlockForChunk, chunkSize);
612-
}
613608
needReturn = recordAlignedChunk(valueChunkList, marker);
614609
}
615610
}
@@ -619,9 +614,11 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
619614
firstChunkHeader4NextSequentialValueChunks = chunkHeader;
620615
return;
621616
}
617+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader);
622618
} else {
623619
chunkHeader = firstChunkHeader4NextSequentialValueChunks;
624620
firstChunkHeader4NextSequentialValueChunks = null;
621+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader);
625622
}
626623

627624
Chunk chunk =
@@ -690,6 +687,20 @@ private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final byte
690687
return false;
691688
}
692689

690+
private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
691+
final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
692+
if (!valueChunkList.isEmpty() || lastIndex < 0) {
693+
return;
694+
}
695+
696+
final long chunkSize =
697+
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
698+
+ valueChunkHeader.getDataSize();
699+
if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
700+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize);
701+
}
702+
}
703+
693704
@Override
694705
public void close() {
695706
super.close();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
2828
import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
2929
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
30+
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
3031
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
3132
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
3233
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -56,6 +57,7 @@
5657

5758
import java.io.File;
5859
import java.io.IOException;
60+
import java.lang.reflect.Field;
5961
import java.time.LocalDate;
6062
import java.util.ArrayList;
6163
import java.util.Arrays;
@@ -119,6 +121,47 @@ public void testScanContainer() throws Exception {
119121
System.out.println(System.currentTimeMillis() - startTime);
120122
}
121123

124+
@Test
125+
public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws Exception {
126+
final long originalPipeMaxReaderChunkSize =
127+
PipeConfig.getInstance().getPipeMaxReaderChunkSize();
128+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
129+
130+
alignedTsFile = new File("single-aligned-value-chunk.tsfile");
131+
final List<MeasurementSchema> schemaList = new ArrayList<>();
132+
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
133+
134+
final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
135+
tablet.rowSize = 2;
136+
tablet.addTimestamp(0, 1);
137+
tablet.addValue("s1", 0, 1L);
138+
tablet.addTimestamp(1, 2);
139+
tablet.addValue("s1", 1, 2L);
140+
141+
try {
142+
try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
143+
writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
144+
writer.writeAligned(tablet);
145+
}
146+
147+
try (final TsFileInsertionScanDataContainer parser =
148+
new TsFileInsertionScanDataContainer(
149+
alignedTsFile,
150+
new PrefixPipePattern("root"),
151+
Long.MIN_VALUE,
152+
Long.MAX_VALUE,
153+
null,
154+
null,
155+
false)) {
156+
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
157+
}
158+
} finally {
159+
CommonDescriptor.getInstance()
160+
.getConfig()
161+
.setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
162+
}
163+
}
164+
122165
public void testToTabletInsertionEvents(final boolean isQuery) throws Exception {
123166
// Test empty chunk
124167
testMixedTsFileWithEmptyChunk(isQuery);
@@ -645,4 +688,12 @@ private int getNonNullSize(final Tablet tablet) {
645688
}
646689
return count;
647690
}
691+
692+
private PipeMemoryBlock getAllocatedChunkMemory(final TsFileInsertionScanDataContainer parser)
693+
throws NoSuchFieldException, IllegalAccessException {
694+
final Field field =
695+
TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForChunk");
696+
field.setAccessible(true);
697+
return (PipeMemoryBlock) field.get(parser);
698+
}
648699
}

0 commit comments

Comments
 (0)