Skip to content

Commit f6995bb

Browse files
authored
[To dev/1.3] Load: Fixed multiple bugs (#17565) (#17623)
* Load: Fixed multiple bugs (#17565) * fix * test-del
1 parent 2ea083d commit f6995bb

3 files changed

Lines changed: 154 additions & 19 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,34 @@ public LoadTsFileManager() {
128128
activeLoadAgent.start();
129129
}
130130

131+
private long getCleanupTaskDelayInMs() {
132+
return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L;
133+
}
134+
135+
private void createCleanupTaskIfAbsent(final String uuid) {
136+
synchronized (uuid2CleanupTask) {
137+
if (uuid2CleanupTask.containsKey(uuid)) {
138+
return;
139+
}
140+
141+
final CleanupTask cleanupTask = new CleanupTask(uuid, getCleanupTaskDelayInMs());
142+
uuid2CleanupTask.put(uuid, cleanupTask);
143+
cleanupTaskQueue.add(cleanupTask);
144+
}
145+
}
146+
147+
private void rescheduleCleanupTask(final CleanupTask cleanupTask) {
148+
synchronized (uuid2CleanupTask) {
149+
if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) {
150+
return;
151+
}
152+
153+
cleanupTaskQueue.remove(cleanupTask);
154+
cleanupTask.resetScheduledTime();
155+
cleanupTaskQueue.add(cleanupTask);
156+
}
157+
}
158+
131159
private void registerCleanupTaskExecutor() {
132160
PipeDataNodeAgent.runtime()
133161
.registerPeriodicalJob(
@@ -199,26 +227,13 @@ private void recover() {
199227
uuid2WriterManager.put(uuid, writerManager);
200228
writerManager.close();
201229

202-
synchronized (uuid2CleanupTask) {
203-
final CleanupTask cleanupTask =
204-
new CleanupTask(
205-
uuid, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
206-
uuid2CleanupTask.put(uuid, cleanupTask);
207-
cleanupTaskQueue.add(cleanupTask);
208-
}
230+
createCleanupTaskIfAbsent(uuid);
209231
});
210232
}
211233

212234
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid)
213235
throws IOException, PageException {
214-
if (!uuid2WriterManager.containsKey(uuid)) {
215-
synchronized (uuid2CleanupTask) {
216-
final CleanupTask cleanupTask =
217-
new CleanupTask(uuid, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000);
218-
uuid2CleanupTask.put(uuid, cleanupTask);
219-
cleanupTaskQueue.add(cleanupTask);
220-
}
221-
}
236+
createCleanupTaskIfAbsent(uuid);
222237

223238
final Optional<CleanupTask> cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid));
224239
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
@@ -293,6 +308,8 @@ public boolean loadAll(
293308
return false;
294309
}
295310

311+
createCleanupTaskIfAbsent(uuid);
312+
296313
final Optional<CleanupTask> cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid));
297314
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
298315
try {
@@ -315,9 +332,10 @@ public boolean deleteAll(String uuid) {
315332

316333
private void clean(String uuid) {
317334
synchronized (uuid2CleanupTask) {
318-
final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
335+
final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid);
319336
if (cleanupTask != null) {
320337
cleanupTask.cancel();
338+
cleanupTaskQueue.remove(cleanupTask);
321339
}
322340
}
323341

@@ -744,12 +762,12 @@ private CleanupTask(String uuid, long delayInMs) {
744762

745763
public void markLoadTaskRunning() {
746764
isLoadTaskRunning = true;
747-
resetScheduledTime();
765+
rescheduleCleanupTask(this);
748766
}
749767

750768
public void markLoadTaskNotRunning() {
751769
isLoadTaskRunning = false;
752-
resetScheduledTime();
770+
rescheduleCleanupTask(this);
753771
}
754772

755773
public void resetScheduledTime() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,18 @@ public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock(
126126
if (dataCacheMemoryBlock == null) {
127127
final long actuallyAllocateMemoryInBytes =
128128
tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2);
129-
dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
129+
try {
130+
dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes);
131+
} catch (RuntimeException e) {
132+
if (actuallyAllocateMemoryInBytes > 0) {
133+
try {
134+
releaseToQuery(actuallyAllocateMemoryInBytes);
135+
} catch (RuntimeException releaseException) {
136+
e.addSuppressed(releaseException);
137+
}
138+
}
139+
throw e;
140+
}
130141
LOGGER.info(
131142
"Create Data Cache Memory Block {}, allocate memory {}",
132143
dataCacheMemoryBlock,
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.storageengine.load.memory;
21+
22+
import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException;
23+
24+
import org.junit.Assert;
25+
import org.junit.Test;
26+
27+
import java.lang.reflect.Constructor;
28+
import java.lang.reflect.Field;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
31+
import static org.mockito.ArgumentMatchers.anyLong;
32+
import static org.mockito.Mockito.doAnswer;
33+
import static org.mockito.Mockito.spy;
34+
35+
public class LoadTsFileMemoryManagerTest {
36+
37+
@Test
38+
public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory() throws Exception {
39+
final long allocatedMemoryInBytes = 2L * 1024 * 1024;
40+
final LoadTsFileMemoryManager manager = spy(newMemoryManager());
41+
42+
doAnswer(
43+
invocation -> {
44+
setUsedMemorySize(manager, allocatedMemoryInBytes);
45+
return allocatedMemoryInBytes;
46+
})
47+
.when(manager)
48+
.tryAllocateFromQuery(anyLong());
49+
50+
manager.allocateDataCacheMemoryBlock();
51+
52+
Assert.assertEquals(allocatedMemoryInBytes, manager.getUsedMemorySizeInBytes());
53+
Assert.assertNotNull(getDataCacheMemoryBlock(manager));
54+
}
55+
56+
@Test
57+
public void testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure()
58+
throws Exception {
59+
final long allocatedMemoryInBytes = 512L;
60+
final LoadTsFileMemoryManager manager = spy(newMemoryManager());
61+
62+
doAnswer(
63+
invocation -> {
64+
setUsedMemorySize(manager, allocatedMemoryInBytes);
65+
return allocatedMemoryInBytes;
66+
})
67+
.when(manager)
68+
.tryAllocateFromQuery(anyLong());
69+
doAnswer(
70+
invocation -> {
71+
setUsedMemorySize(manager, 0L);
72+
return null;
73+
})
74+
.when(manager)
75+
.releaseToQuery(anyLong());
76+
77+
try {
78+
manager.allocateDataCacheMemoryBlock();
79+
Assert.fail("Expected LoadRuntimeOutOfMemoryException");
80+
} catch (LoadRuntimeOutOfMemoryException e) {
81+
Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes());
82+
Assert.assertNull(getDataCacheMemoryBlock(manager));
83+
}
84+
}
85+
86+
private static LoadTsFileMemoryManager newMemoryManager() throws Exception {
87+
final Constructor<LoadTsFileMemoryManager> constructor =
88+
LoadTsFileMemoryManager.class.getDeclaredConstructor();
89+
constructor.setAccessible(true);
90+
return constructor.newInstance();
91+
}
92+
93+
private static void setUsedMemorySize(
94+
final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes) throws Exception {
95+
final Field field = LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes");
96+
field.setAccessible(true);
97+
((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes);
98+
}
99+
100+
private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock(
101+
final LoadTsFileMemoryManager manager) throws Exception {
102+
final Field field = LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock");
103+
field.setAccessible(true);
104+
return (LoadTsFileDataCacheMemoryBlock) field.get(manager);
105+
}
106+
}

0 commit comments

Comments
 (0)