From 0729bac45fe35b4b949cf18cb55be7a5566c1910 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Sat, 9 May 2026 18:06:06 +0800 Subject: [PATCH 1/3] Fix that WALBuffer waits for flush instead of file-roll --- .idea/icon.png | Bin 6736 -> 0 bytes .../dataregion/wal/buffer/IWALBuffer.java | 6 +- .../dataregion/wal/buffer/WALBuffer.java | 18 +- .../dataregion/wal/node/WALNode.java | 12 +- .../wal/node/WALNodeWaitForRollFileTest.java | 420 ++++++++++++++++++ 5 files changed, 443 insertions(+), 13 deletions(-) delete mode 100644 .idea/icon.png create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java diff --git a/.idea/icon.png b/.idea/icon.png deleted file mode 100644 index 493aca9320ef7493bea080592c34e5ac4623c63b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6736 zcma)BcU;rivJV~UO+lq3p(>Ds-h1d>AP}WPfY2eKgAzm`6akeYRf?j32m$G0L=XgN zvM8NU1Vp4M2nh0myLWf*+jsBh{c)0<^Zm}8nK?7_E5Tcs=`%B&V*mgE%!UT=OXP3& zlR-~Q{tTSYv<3iZ&Z4dDuy&><%5DLE5Eu6VR}>`NFOZA_0IC|{fi7-7C@jzw<%#xJ z18=mx00YtPYG7MAQ4$2E{5E+cJiZru!i}Z0*atCXuGpL3ulNI=(ur9!GKVN^0 za=04!mtJM^?~`RH82F2V^-%-cnOXpK0)kOMIfxtt23BVPss_7zC|`o>{UJ=Ase!$) z*g$0{G%PF(5+(x)2=;_ZDJdyIVbV}(X-P6e5)ef;2V&6y z{=gHRF0KI~ST!)1j068=nP1?a%Kb5acu$T4G~6W+Dg}W-|4t6{3cv=wVSxQz}PC*+fB1g}T=|6kyHVuS{I3@MC8HDI8xTx3 zj_iZXFJu3L40UuYf&)CzzGMt@NnaaisH-C{r6eyeDGibO#XUJUWeapT%GVB#_Cxt& z$T_A4miohyzfk0VLVxXUMJ_J#7Qd@JIa_5zKNnAwE!rLH^_RB4027qI=Rc5>Gx7~N zNpc&1H2GAee;N2o$nGyp4Xh*&{o6*!e?TW?162K$Ov(mm3^pJ*;@8sewPb|~`hE8M z)EE7$h=9Ofsif@UcCx=3I50TCJ;V*={>uciy5CVufCo0rB^agUNzNiQu$G4hnrvwV zkX)Z=Pk&S}P#Q>%>mQN*Q!300Mb`Mg4)bpr1O3A~=t*h+%RE)+KO9i~Mfh7UAj|!} zM(#1>W(EDTTahP!_B)h6Ig^6P-Rz6q(H#JQA{I@~iGSn*03enTlF?_Zt;Lm~@yx(T z=8TgqB{oEwEvIyS zFP!N5y7g)u;s|qmE6dK_mgve|XkjfWsw$}3m|9V#_o1ke7(;gk~cRq0fq z0TKfMuymdbicE2q^8f&+r6F9)Iz0blK~x+6NOSiKm403N%MT1KwOIM^)(bO9DtI3Y zuJV3-_3b9!Cna)wie7wK#useDj8`4YlA3JnFVcl5m8=r%!D5E?;4%<6BVVI%$78TC ztp9Cp#B%?c$~D;s7c0i|VDE5x8z@v#tu)=a0qu}H8N?q@eoDP`FPu0gGC0-Rk&)iXyK zaZ*Ij!jW3S!9+!js7uLS9})NXmEh`1D{egp^ffljs(alA@uU5%vsmRZZGNuu;T6(b z#DQrsYq~tlsjBu^7|Mp)5hY#&qzW&ZEqc7D2K}+!r;P1)KjHZS8KeBP(#yEJ@h1Hcf?+x9IWN-h=0kR1RQPmtZK=lB%CXd zy(WM8n=Wzi?&#OPZ^$##dIe+?n49pvE`StD7U=M@1<8Qqvv?=9-N&4=x&z7-tMjSC zt*+JmH;v_qWRo`9Gj{6pA#s?qOyu3geS7}!cHw-AUWau zoW7xv{`Ybd?cVB+QIFRV*e<4<$F@>VBD5cW@+=9Lakg?+BSDGOsGT^1f(-%+vy4{K zcH#ylx2fELJ-Ck($Q4spH+_{S=6Gi1{8;zL-qkRpsk)#viXM|AhRsC6!LzCcEpjeR zaYuBpnDMG}g-^q`)_Ye~o#1C&oiUxmcM?Q!Ib!@whf!~S?9et)_#5y!scjxClL z4Z(w0;eHyNb|OKCw9YAn7IdbVJ%RULD67MbVWOsc4CIUbCB!v`wHqrxBW}*}4%SNN z1#+mDLKvFX&7Ad7&%&U6FBM7A936pU0#cEAY=i`+((&TEo!*J!4YGGf1H>!!sm=qL zp0Ku~(9;*cQ>pLp{y^wW>Js@9fuS}Cb8(DdV_RA;nvj5>XC9%`q2`#c6uTT6JeSq> zg@UH+Xha84ke};&&-3A3v7w=9~IAPDlqaqsA?~DpLK99QKSs53ij(^I(3R=Of`On z9{0636;Oyti0)`VE%up5d7xB%<5bng&#O#46BRJt*R=KbSf}#TdBEuyHUS-e#6vyL z7n7=C)Q&VabHyCxLastKTR9wVSs^rTtCHqlp6lxkz&*J;`qWlnXa4F3Q4`=ow2^kP z<`nRBm-*tpNRsU-9s{N^~7--%Izh6X{P@yvA61Yk4f4y`0UP4pQwFg!HgsOsEfA-8sE7VTgZ~IT! zQ6A>X1rAR^xg?!+Pj9zV*r;I$(TgN06!;X{0j^?><{W>z45B=rjk5T#dOsMkLEWhy z@t(Ov!*zT)VyjvU4A0#a*H3*xzgAh*F1b7-4w{(3k^~}d32}DmgX??*Q9EDp3u^3w zka(?}o9VenY<{1-7w5b$c$J2x73Vb^Q@7i1zl*v%s)G6!p|Z*6oIcnhsnlM8o5U4e ztc02rv}SEH0Sw$1b)TeE;wIkcqIP&YwI!3;MUoOc^IMr~)doWQbc7W{_e4nQox*vl zjiRIRTL9O_-{L}RpWE8Bt=Fe4a87MV5mRZbBRQv*q7}7Ap>*~+*0_Sk+KQ-#_s(sh zrxr((K186I%7aY9j@PMFmw2t31LMXhYp*~wQ>N0`UPjvcLMbjTX>#CJ4E}*PEU9BY9?QkV@zb|g^-cp{NZqqx@>Bgi` zhQ+wsUyAg2wbXe{)Z|A#5hn{5($YD{=U>tho8?lxBy zB5*W--c|enVxMn(wy!9*lwjwg|2$g2vUrx-Q8KUeUDD_zH)z1o3USSPXcquiyq?Ze z!OHGLoF+0OKd5BwSdwNbZ`RXq9XX}ZyBeL&{0+XvCe#<|LvmY8j$v#vM1JXCl(U>V zACz-M3F5fJL&rPElcIf>Be8XhHJub+z}6ASu3g2)Js%c|*qtib1a-5S!{2djZ?e0h zoJy(+FYST6Z{p`&=tHCx%g2ok^laC!GY-u)6;G$jVbx1MwXHP=K}mY?w|8C6a7+Ya zhE0=69iG(~x87IOq|Kq7Dfm`ctP^OrPnL9*6pac&pPoiv6{Vu1ag;Vf#+X``vT$~} z#Ts*T&uH=3(&>5|k>4pRnL3SdQV%5R0=lS1v>j%+c$EV%I+o0rBmM* zTB3RU+~04yt+U?~~}NC#tPJrFtQAbS6-kv&%>D6ShxKq|3CJa-U&7 zVIW!g1uIW$S*i^8W&>j&k89PLIdEs_8=GwMUCgbuqNiN7EqWEy1+mr)m(L;#11Nr0 zRY5*lN^oY6(t%d4W+}^^8s?II?rAnM&ON+@(BDmd0hW1O512NYV8U-5fC&5vB>s4X zuLSCKyc%z<)VZwhlq6LALW`nidf&xL4HjN=P9qEwQiTC-3!B&=Gzv!=G{Bw1XHC^9 zSRe%F>Bv?-X-+*0H9nV^ZtWlIwP)uk-X^!p_N-OZZ^YmC%j7lT3cyDbOX?T*i4Dl% zWdA$unMuQor%Wy(qLNX$V*L0qBhc#Fv))yKXNp-S^<4t<9cJB}h)k97hmmSW2$pKxt?PMXs=o4|v zk1cSTG6^$NNBPvY_;RehV{u&85DU)w3C(ug^4R;w#~M0ZH=bvp9yStMvI#9O8&h?o zh0u~>eYeD}Bv;}ZY~EClmapeK*wDPYVWwc&1}U%~rYs|V)J3+3bz>YD>{l=GycT>~ z6}wjdxnj2?HlMyP%i=rJzWmPz<@6)5$A)~9lShS=zQtAQN}_BfjP{zZV4X->*w|hr zWq+53O!83j8g@0yB2RN_O|figL9u2=kAIPQ>UmgCYZ%giFgVRMDYWzbnpGNhp;n_`EB}E+H_3+Kx3)jwU1@H{Y z@fnmXsr0TsA6e%7LfhI-2R7LlNmK|bfyxsNQoF6DzXwCLOIbCoKmGKN77#3Rw;<-* z0wED!K}q7WhftVAyf3DD$lO}*fkoIL-LzVQ&>gAi{TTz@5*-HrkbJ`Br~{32;|rk{ zh*?|N84GIKuQ9ZT(qGE9LB_y^9uKecIe6-8&v+NSOZKi9tk1HMSbJBgtd^sIT9`@N z(1^Q)7Gqd_UsELi@JfRg9ghy zzEgsJa|vqklLZtdmi$m2WpK-l@N?n00rt@M*d}?4$(1plw0dSO zBWnnOYZ48$*Qy&Qf-Y`On?bbdinRx5tu0~ot?Nz(6T$2cL9bmKoekTnT_reXKfmHM zcrx`vX25eU&A|N)zIRo@*glMZ{!woSBN0-ul&zcfk|WAEuXHQ{WcDOGW955!^7bg^ zAc|wG($}c&Yvmf=%l}mdQvU4LalR#0`OYyXIZ_qHX{SvX<#;D?gQL#cb5ng*v2raw zU;Vhfq3R1g%^8WsVWy+Pj{$<|i$#gilV!5`l@X44uN@LED}-=%MFCU3APv$dQbTq( z)@xM@o>dG~`Me>&_Q=A|)3Kjl^!tv?Qe1sckafs@`Fu6SAsD2;uoC@ruKKs^bAd_) zj;}LgjZJPS&^=3Pp}BpFp~!sygHiaLN5@7+=aJArNHgxqw`tDDSBg4~>I+Mq&V?`v zIcTk$I63O+cJ4V*4X0UWt}?7>uDS{bedWe|QDegYRAIfK6XeQQ(Bd4g+PD?S*)?hX zWww@@4y+Z6Nlz~mtl0g);5|y1q7%PzC`}z@q59r5U9;850-@ntvX}S8KuYZTgSpX4 zwk+U?w8PAFN9+TApzZ@I?n{z!N0@{MxhV3ucSkuGb_#_i}7EUd8>qy#o vg-y>W2dQ!xG-}*_@e{_85j@U~A3V?i-Zp7)sCBnJ`6I|s*9=~*?Hu<%{Y29Z diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java index 978ea78116268..082dcca8c05bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/IWALBuffer.java @@ -51,7 +51,7 @@ public interface IWALBuffer extends AutoCloseable { * * @throws InterruptedException when interrupted by the flush thread */ - void waitForFlush() throws InterruptedException; + void waitForRollFile() throws InterruptedException; /** * Wait for next flush operation done, if the predicate == true after entering a locked @@ -60,14 +60,14 @@ public interface IWALBuffer extends AutoCloseable { * @param waitPredicate the condition which should be satisfied before waiting. * @throws InterruptedException when interrupted by the flush thread */ - public void waitForFlush(Predicate waitPredicate) throws InterruptedException; + public void waitForRollFile(Predicate waitPredicate) throws InterruptedException; /** * Wait for next flush operation done. * * @throws InterruptedException when interrupted by the flush thread */ - boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException; + boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException; /** Return true when all wal entries all consumed and flushed. */ boolean isAllWALEntriesConsumed(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java index a7d79f92b5753..d46ee6c7deb34 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALBuffer.java @@ -90,6 +90,7 @@ public class WALBuffer extends AbstractWALBuffer { private final Lock buffersLock = new ReentrantLock(); // condition to guarantee correctness of switching buffers private final Condition idleBufferReadyCondition = buffersLock.newCondition(); + private final Condition rollLogWriterCondition = buffersLock.newCondition(); // last writer position when fsync is called, help record each entry's position private long lastFsyncPosition; @@ -170,6 +171,13 @@ private int getCompressedByteBufferSize(int size) { protected File rollLogWriter(long searchIndex, WALFileStatus fileStatus) throws IOException { File file = super.rollLogWriter(searchIndex, fileStatus); currentWALFileWriter.setCompressedByteBuffer(compressedByteBuffer); + buffersLock.lock(); + try { + // notify WALReader that new file is generated, and it can read new file + rollLogWriterCondition.signalAll(); + } finally { + buffersLock.unlock(); + } return file; } @@ -656,7 +664,7 @@ private void switchSyncingBufferToIdle() { } @Override - public void waitForFlush() throws InterruptedException { + public void waitForRollFile() throws InterruptedException { buffersLock.lock(); try { idleBufferReadyCondition.await(); @@ -666,11 +674,11 @@ public void waitForFlush() throws InterruptedException { } @Override - public void waitForFlush(Predicate waitPredicate) throws InterruptedException { + public void waitForRollFile(Predicate waitPredicate) throws InterruptedException { buffersLock.lock(); try { if (waitPredicate.test(this)) { - idleBufferReadyCondition.await(); + rollLogWriterCondition.await(); } } finally { buffersLock.unlock(); @@ -678,10 +686,10 @@ public void waitForFlush(Predicate waitPredicate) throws InterruptedE } @Override - public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException { + public boolean waitForRollFile(long time, TimeUnit unit) throws InterruptedException { buffersLock.lock(); try { - return idleBufferReadyCondition.await(time, unit); + return rollLogWriterCondition.await(time, unit); } finally { buffersLock.unlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 07dd4d78f6605..4152f1f8f0779 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -96,7 +96,7 @@ public class WALNode implements IWALNode { // no iot consensus, all insert nodes can be safely deleted public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MAX_VALUE; // timeout threshold when waiting for next wal entry - private static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30; + public static final long WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30; private static final WritingMetrics WRITING_METRICS = WritingMetrics.getInstance(); // unique identifier of this WALNode @@ -761,6 +761,8 @@ public boolean hasNext() { notFirstFile.set(true); } + tryToCollectInsertNodeAndBumpIndex.run(); + // update file index and version id if (currentFileIndex >= filesToSearch.length - 1) { needUpdatingFilesToSearch = true; @@ -792,7 +794,7 @@ public void waitForNextReady() throws InterruptedException { while (!hasNext()) { if (!walFileRolled) { boolean timeout = - !buffer.waitForFlush(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS); + !buffer.waitForRollFile(WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC, TimeUnit.SECONDS); if (timeout) { bufferLastSearchIndex = buffer.getCurrentSearchIndex(); logger.info( @@ -805,7 +807,7 @@ public void waitForNextReady() throws InterruptedException { } else { // only wait when the search index of the buffer remains the same as the previous check long finalBufferLastSearchIndex = bufferLastSearchIndex; - buffer.waitForFlush(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex); + buffer.waitForRollFile(buf -> buf.getCurrentSearchIndex() == finalBufferLastSearchIndex); } } } @@ -814,8 +816,8 @@ public void waitForNextReady() throws InterruptedException { public void waitForNextReady(long time, TimeUnit unit) throws InterruptedException, TimeoutException { if (!hasNext()) { - boolean timeout = !buffer.waitForFlush(time, unit); - if (timeout || !hasNext()) { + boolean timeout = !buffer.waitForRollFile(time, unit); + if (timeout && !hasNext()) { throw new TimeoutException(); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java new file mode 100644 index 0000000000000..9cd159b51a85d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.storageengine.dataregion.wal.node; + +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; +import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; +import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.db.utils.constant.TestConstant; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class WALNodeWaitForRollFileTest { + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final String identifier = String.valueOf(Integer.MAX_VALUE); + private static final String logDirectory = + TestConstant.BASE_OUTPUT_PATH.concat("wal-roll-file-test"); + private static final String databasePath = "root.test_sg"; + private static final String devicePath = databasePath + ".test_d"; + private static final String dataRegionId = "1"; + private WALMode prevMode; + private String prevConsensus; + private WALNode walNode; + private long originWALThreshold; + + @Before + public void setUp() throws Exception { + originWALThreshold = config.getWalFileSizeThresholdInByte(); + EnvironmentUtils.cleanDir(logDirectory); + prevMode = config.getWalMode(); + prevConsensus = config.getDataRegionConsensusProtocolClass(); + config.setWalMode(WALMode.SYNC); + config.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + config.setWalFileSizeThresholdInByte(2 * 1024 * 1024); + walNode = new WALNode(identifier, logDirectory); + } + + @After + public void tearDown() throws Exception { + walNode.close(); + config.setWalMode(prevMode); + config.setDataRegionConsensusProtocolClass(prevConsensus); + config.setWalFileSizeThresholdInByte(originWALThreshold); + EnvironmentUtils.cleanDir(logDirectory); + StorageEngine.getInstance().reset(); + } + + /** + * Verifies that waitForNextReady(time, unit) throws TimeoutException when no WAL data is + * available at the requested search index. This uses waitForRollFile internally. + */ + @Test + public void testWaitForNextReadyTimesOutWhenNoData() throws Exception { + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + assertFalse(iterator.hasNext()); + try { + iterator.waitForNextReady(1, TimeUnit.SECONDS); + fail("Expected TimeoutException"); + } catch (TimeoutException e) { + // expected + } + } + + /** + * Verifies that waitForNextReady(time, unit) does NOT wake up from a buffer flush alone — it + * requires a WAL file roll. This is the core behavioral change: the old waitForFlush would return + * on any buffer sync, but waitForRollFile only returns when a new WAL file is created. + */ + @Test + public void testWaitForNextReadyNotWokenByFlushWithoutRoll() throws Exception { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + // write a small amount of data (not enough to trigger roll) + InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); + insertTabletNode.setSearchIndex(1); + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + // data is flushed to buffer but no WAL file roll happened yet, iterator at search index 1 + // should not find data (because the current-writing WAL file is not readable by the iterator) + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + + try { + long start = System.currentTimeMillis(); + iterator.waitForNextReady(2, TimeUnit.SECONDS); + if (System.currentTimeMillis() - start + < WALNode.WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC * 1000) { + fail("The data should not be found before timeout"); + } + } catch (TimeoutException e) { + // expected: flush happened but no roll, so waitForRollFile timed out + } + } + + /** + * Verifies that waitForNextReady succeeds after a WAL file roll makes data readable. The iterator + * should wake up when rollLogWriter signals the rollLogWriterCondition. + */ + @Test + public void testWaitForNextReadySucceedsAfterRollFile() throws Exception { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + // write data with search index + for (int i = 1; i <= 5; i++) { + InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {i}); + insertTabletNode.setSearchIndex(i); + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + } + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + // roll the WAL file so the data is in a closed file readable by the iterator + walNode.rollWALFile(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + // iterator at search index 1 should find the data after roll + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + assertTrue(iterator.hasNext()); + assertNotNull(iterator.next()); + } + + /** + * Verifies that waitForNextReady wakes up when a WAL file roll is triggered concurrently. A + * background thread rolls the WAL file while the main thread waits on the iterator. + */ + @Test(timeout = 30000) + public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + // write data with search index + InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); + insertTabletNode.setSearchIndex(1); + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + + AtomicBoolean found = new AtomicBoolean(false); + AtomicReference error = new AtomicReference<>(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + // background: wait for data to become available via waitForNextReady + Future waitFuture = + executor.submit( + () -> { + try { + iterator.waitForNextReady(15, TimeUnit.SECONDS); + if (iterator.hasNext()) { + found.set(true); + } + } catch (Exception e) { + error.set(e); + } + }); + + // give the waiter thread time to start blocking + Thread.sleep(500); + + // trigger WAL file roll — this should signal rollLogWriterCondition and wake up the iterator + walNode.rollWALFile(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + waitFuture.get(20, TimeUnit.SECONDS); + executor.shutdown(); + + if (error.get() != null) { + throw error.get(); + } + assertTrue("Iterator should have found data after WAL file roll", found.get()); + } + + /** + * Verifies that the no-arg waitForNextReady eventually proceeds when enough data is written to + * trigger an automatic WAL file roll (file size exceeds threshold). Uses a small WAL file size + * threshold to trigger the roll quickly. + */ + @Test(timeout = 60000) + public void testWaitForNextReadyWithAutoRollOnSizeThreshold() throws Exception { + // use small WAL file size to trigger auto-roll + config.setWalFileSizeThresholdInByte(1024); + + try { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + // write initial data with search index + InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1}); + first.setSearchIndex(1); + walNode.log( + memTable.getMemTableId(), + first, + Collections.singletonList(new int[] {0, first.getRowCount()})); + + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> walNode.isAllWALEntriesConsumed()); + + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + + AtomicBoolean found = new AtomicBoolean(false); + AtomicReference error = new AtomicReference<>(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future waitFuture = + executor.submit( + () -> { + try { + iterator.waitForNextReady(30, TimeUnit.SECONDS); + if (iterator.hasNext()) { + found.set(true); + } + } catch (Exception e) { + error.set(e); + } + }); + + Thread.sleep(500); + + // write more data to exceed the small threshold and trigger auto-roll + for (int i = 2; i <= 50; i++) { + InsertTabletNode node = getInsertTabletNode(devicePath, new long[] {i}); + node.setSearchIndex(i); + walNode.log( + memTable.getMemTableId(), + node, + Collections.singletonList(new int[] {0, node.getRowCount()})); + } + + waitFuture.get(40, TimeUnit.SECONDS); + executor.shutdown(); + + if (error.get() != null) { + fail("waitForNextReady threw unexpected exception: " + error.get().getMessage()); + } + assertTrue("Iterator should have found data after auto WAL file roll", found.get()); + } finally { + config.setWalFileSizeThresholdInByte(2 * 1024 * 1024); + } + } + + /** + * Verifies that the no-arg waitForNextReady() automatically triggers a WAL file roll after the + * timeout expires (WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC = 30s). The flow is: data written to + * buffer → waitForRollFile(30s) times out → rollWALFile() called → data moves to closed file → + * hasNext() returns true → method returns. + */ + @Test(timeout = 120000) + public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception { + IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId); + walNode.onMemTableCreated(memTable, logDirectory + File.separator + "test.tsfile"); + + // write data with search index — stays in the current (active) WAL file + InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new long[] {1}); + insertTabletNode.setSearchIndex(1); + walNode.log( + memTable.getMemTableId(), + insertTabletNode, + Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); + + // iterator cannot read the active WAL file, so hasNext() should be false + ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1); + assertFalse("Data should not be visible before WAL file roll", iterator.hasNext()); + + AtomicBoolean found = new AtomicBoolean(false); + AtomicReference error = new AtomicReference<>(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + long startTime = System.currentTimeMillis(); + + // call the no-arg waitForNextReady() — it should: + // 1) wait 30s for rollLogWriterCondition (timeout) + // 2) auto-call rollWALFile() + // 3) data becomes readable, hasNext() returns true, method returns + Future waitFuture = + executor.submit( + () -> { + try { + iterator.waitForNextReady(); + if (iterator.hasNext()) { + found.set(true); + } + } catch (Exception e) { + error.set(e); + } + }); + + waitFuture.get(90, TimeUnit.SECONDS); + executor.shutdown(); + + long elapsed = System.currentTimeMillis() - startTime; + + if (error.get() != null) { + fail("waitForNextReady() threw unexpected exception: " + error.get().getMessage()); + } + assertTrue("Iterator should have found data after auto-triggered WAL file roll", found.get()); + assertTrue( + "Should have waited at least 30s for the timeout to trigger auto-roll, but only waited " + + elapsed + + "ms", + elapsed >= TimeUnit.SECONDS.toMillis(WALNode.WAIT_FOR_NEXT_WAL_ENTRY_TIMEOUT_IN_SEC - 1)); + } + + private InsertTabletNode getInsertTabletNode(String devicePath, long[] times) + throws IllegalPathException { + String[] measurements = new String[] {"s1", "s2", "s3", "s4", "s5", "s6"}; + TSDataType[] dataTypes = new TSDataType[6]; + dataTypes[0] = TSDataType.DOUBLE; + dataTypes[1] = TSDataType.FLOAT; + dataTypes[2] = TSDataType.INT64; + dataTypes[3] = TSDataType.INT32; + dataTypes[4] = TSDataType.BOOLEAN; + dataTypes[5] = TSDataType.TEXT; + + Object[] columns = new Object[6]; + columns[0] = new double[times.length]; + columns[1] = new float[times.length]; + columns[2] = new long[times.length]; + columns[3] = new int[times.length]; + columns[4] = new boolean[times.length]; + columns[5] = new Binary[times.length]; + + for (int r = 0; r < times.length; r++) { + ((double[]) columns[0])[r] = 1.0d + r; + ((float[]) columns[1])[r] = 2.0f + r; + ((long[]) columns[2])[r] = 10000L + r; + ((int[]) columns[3])[r] = 100 + r; + ((boolean[]) columns[4])[r] = (r % 2 == 0); + ((Binary[]) columns[5])[r] = new Binary("hh" + r, TSFileConfig.STRING_CHARSET); + } + + BitMap[] bitMaps = new BitMap[dataTypes.length]; + for (int i = 0; i < dataTypes.length; i++) { + if (bitMaps[i] == null) { + bitMaps[i] = new BitMap(times.length); + } + bitMaps[i].mark(i % times.length); + } + MeasurementSchema[] schemas = new MeasurementSchema[6]; + for (int i = 0; i < 6; i++) { + schemas[i] = new MeasurementSchema(measurements[i], dataTypes[i], TSEncoding.PLAIN); + } + + return new InsertTabletNode( + new PlanNodeId(""), + new PartialPath(devicePath), + false, + measurements, + dataTypes, + schemas, + times, + bitMaps, + columns, + times.length); + } +} From 2c9222bf803d9225c5d84676388261127437b05b Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Sat, 9 May 2026 18:38:52 +0800 Subject: [PATCH 2/3] fix import --- .../dataregion/wal/node/WALNodeWaitForRollFileTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java index 9cd159b51a85d..198af65c7f2ef 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java @@ -20,11 +20,11 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; From 44d3b74bfae7e238501bedc90f21c537ebdb9e12 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Sat, 9 May 2026 20:03:02 +0800 Subject: [PATCH 3/3] fix import --- .../iotdb/db/storageengine/dataregion/wal/node/WALNode.java | 2 -- .../dataregion/wal/node/WALNodeWaitForRollFileTest.java | 5 +++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java index 07e8e7c867432..98993c563f9cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java @@ -761,8 +761,6 @@ public boolean hasNext() { notFirstFile.set(true); } - tryToCollectInsertNodeAndBumpIndex.run(); - // update file index and version id if (currentFileIndex >= filesToSearch.length - 1) { needUpdatingFilesToSearch = true; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java index 198af65c7f2ef..2977841d5e6b6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.iot.log.ConsensusReqReader; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ContinuousSameSearchIndexSeparatorNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; @@ -193,6 +194,8 @@ public void testWaitForNextReadyWakesUpOnConcurrentRoll() throws Exception { memTable.getMemTableId(), insertTabletNode, Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + walNode.log( + memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId(""))); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed()); @@ -319,6 +322,8 @@ public void testWaitForNextReadyAutoTriggersRollOnTimeout() throws Exception { memTable.getMemTableId(), insertTabletNode, Collections.singletonList(new int[] {0, insertTabletNode.getRowCount()})); + walNode.log( + memTable.getMemTableId(), new ContinuousSameSearchIndexSeparatorNode(new PlanNodeId(""))); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> walNode.isAllWALEntriesConsumed());