4343
4444import java .time .Clock ;
4545import java .util .ArrayDeque ;
46+ import java .util .ArrayList ;
4647import java .util .Collections ;
48+ import java .util .Iterator ;
4749import java .util .List ;
4850import java .util .Map ;
4951import java .util .Optional ;
5052import java .util .Set ;
53+ import java .util .concurrent .ConcurrentSkipListMap ;
5154import java .util .concurrent .CountDownLatch ;
5255import java .util .concurrent .TimeUnit ;
5356import java .util .concurrent .atomic .AtomicBoolean ;
9093import org .apache .bookkeeper .mledger .proto .MLDataFormats .MessageRange ;
9194import org .apache .bookkeeper .mledger .proto .MLDataFormats .PositionInfo ;
9295import org .apache .commons .lang3 .tuple .Pair ;
96+ import org .apache .pulsar .common .util .collections .BitSetRecyclable ;
9397import org .apache .pulsar .common .util .collections .ConcurrentOpenLongPairRangeSet ;
9498import org .apache .pulsar .common .util .collections .LongPairRangeSet ;
9599import org .apache .pulsar .common .util .collections .LongPairRangeSet .LongPairConsumer ;
@@ -156,6 +160,10 @@ public class ManagedCursorImpl implements ManagedCursor {
156160 return position ;
157161 };
158162 private final LongPairRangeSet <PositionImpl > individualDeletedMessages ;
163+
164+ // Maintain the deletion status for batch messages
165+ // (ledgerId, entryId) -> deletion indexes
166+ private final ConcurrentSkipListMap <PositionImpl , BitSetRecyclable > batchDeletedIndexes ;
159167 private final ReadWriteLock lock = new ReentrantReadWriteLock ();
160168
161169 private RateLimiter markDeleteLimiter ;
@@ -232,6 +240,11 @@ public interface VoidCallback {
232240 this .individualDeletedMessages = config .isUnackedRangesOpenCacheSetEnabled ()
233241 ? new ConcurrentOpenLongPairRangeSet <>(4096 , positionRangeConverter )
234242 : new LongPairRangeSet .DefaultRangeSet <>(positionRangeConverter );
243+ if (config .isDeletionAtBatchIndexLevelEnabled ()) {
244+ this .batchDeletedIndexes = new ConcurrentSkipListMap <>();
245+ } else {
246+ this .batchDeletedIndexes = null ;
247+ }
235248 this .digestType = BookKeeper .DigestType .fromApiDigestType (config .getDigestType ());
236249 STATE_UPDATER .set (this , State .Uninitialized );
237250 PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER .set (this , 0 );
@@ -379,6 +392,10 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
379392 if (positionInfo .getIndividualDeletedMessagesCount () > 0 ) {
380393 recoverIndividualDeletedMessages (positionInfo .getIndividualDeletedMessagesList ());
381394 }
395+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null
396+ && positionInfo .getBatchedEntryDeletionIndexInfoCount () > 0 ) {
397+ recoverBatchDeletedIndexes (positionInfo .getBatchedEntryDeletionIndexInfoList ());
398+ }
382399 recoveredCursor (position , recoveredProperties , lh );
383400 callback .operationComplete ();
384401 }, null );
@@ -398,6 +415,25 @@ private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> i
398415 }
399416 }
400417
418+ private void recoverBatchDeletedIndexes (List <MLDataFormats .BatchedEntryDeletionIndexInfo > batchDeletedIndexInfoList ) {
419+ lock .writeLock ().lock ();
420+ try {
421+ this .batchDeletedIndexes .clear ();
422+ batchDeletedIndexInfoList .forEach (batchDeletedIndexInfo -> {
423+ if (batchDeletedIndexInfo .getDeleteSetCount () > 0 ) {
424+ long [] array = new long [batchDeletedIndexInfo .getDeleteSetCount ()];
425+ for (int i = 0 ; i < batchDeletedIndexInfo .getDeleteSetList ().size (); i ++) {
426+ array [i ] = batchDeletedIndexInfo .getDeleteSetList ().get (i );
427+ }
428+ this .batchDeletedIndexes .put (PositionImpl .get (batchDeletedIndexInfo .getPosition ().getLedgerId (),
429+ batchDeletedIndexInfo .getPosition ().getEntryId ()), BitSetRecyclable .create ().resetWords (array ));
430+ }
431+ });
432+ } finally {
433+ lock .writeLock ().unlock ();
434+ }
435+ }
436+
401437 private void recoveredCursor (PositionImpl position , Map <String , Long > properties ,
402438 LedgerHandle recoveredFromCursorLedger ) {
403439 // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
@@ -920,6 +956,10 @@ public void operationComplete() {
920956 lastMarkDeleteEntry = new MarkDeleteEntry (newMarkDeletePosition , Collections .emptyMap (),
921957 null , null );
922958 individualDeletedMessages .clear ();
959+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
960+ batchDeletedIndexes .values ().forEach (BitSetRecyclable ::recycle );
961+ batchDeletedIndexes .clear ();
962+ }
923963
924964 PositionImpl oldReadPosition = readPosition ;
925965 if (oldReadPosition .compareTo (newPosition ) >= 0 ) {
@@ -1507,8 +1547,22 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
15071547 if (log .isDebugEnabled ()) {
15081548 log .debug ("[{}] Mark delete cursor {} up to position: {}" , ledger .getName (), name , position );
15091549 }
1550+
15101551 PositionImpl newPosition = (PositionImpl ) position ;
15111552
1553+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
1554+ if (newPosition .ackSet != null ) {
1555+ batchDeletedIndexes .put (newPosition , BitSetRecyclable .create ().resetWords (newPosition .ackSet ));
1556+ newPosition = ledger .getPreviousPosition (newPosition );
1557+ }
1558+ Map <PositionImpl , BitSetRecyclable > subMap = batchDeletedIndexes .subMap (PositionImpl .earliest , newPosition );
1559+ subMap .values ().forEach (BitSetRecyclable ::recycle );
1560+ subMap .clear ();
1561+ } else if (newPosition .ackSet != null ) {
1562+ newPosition = ledger .getPreviousPosition (newPosition );
1563+ newPosition .ackSet = null ;
1564+ }
1565+
15121566 if (((PositionImpl ) ledger .getLastConfirmedEntry ()).compareTo (newPosition ) < 0 ) {
15131567 if (log .isDebugEnabled ()) {
15141568 log .debug (
@@ -1600,6 +1654,11 @@ public void operationComplete() {
16001654 try {
16011655 individualDeletedMessages .removeAtMost (mdEntry .newPosition .getLedgerId (),
16021656 mdEntry .newPosition .getEntryId ());
1657+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
1658+ Map <PositionImpl , BitSetRecyclable > subMap = batchDeletedIndexes .subMap (PositionImpl .earliest , false , PositionImpl .get (mdEntry .newPosition .getLedgerId (), mdEntry .newPosition .getEntryId ()), true );
1659+ subMap .values ().forEach (BitSetRecyclable ::recycle );
1660+ subMap .clear ();
1661+ }
16031662 } finally {
16041663 lock .writeLock ().unlock ();
16051664 }
@@ -1722,35 +1781,62 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
17221781
17231782 for (Position pos : positions ) {
17241783 PositionImpl position = (PositionImpl ) checkNotNull (pos );
1725-
17261784 if (((PositionImpl ) ledger .getLastConfirmedEntry ()).compareTo (position ) < 0 ) {
17271785 if (log .isDebugEnabled ()) {
17281786 log .debug (
1729- "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]" ,
1730- ledger .getName (), position , ledger .getLastConfirmedEntry (), name );
1787+ "[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]" ,
1788+ ledger .getName (), position , ledger .getLastConfirmedEntry (), name );
17311789 }
17321790 callback .deleteFailed (new ManagedLedgerException ("Invalid mark deleted position" ), ctx );
17331791 return ;
17341792 }
17351793
17361794 if (individualDeletedMessages .contains (position .getLedgerId (), position .getEntryId ())
1737- || position .compareTo (markDeletePosition ) <= 0 ) {
1795+ || position .compareTo (markDeletePosition ) <= 0 ) {
1796+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
1797+ BitSetRecyclable bitSetRecyclable = batchDeletedIndexes .remove (position );
1798+ if (bitSetRecyclable != null ) {
1799+ bitSetRecyclable .recycle ();
1800+ }
1801+ }
17381802 if (log .isDebugEnabled ()) {
17391803 log .debug ("[{}] [{}] Position was already deleted {}" , ledger .getName (), name , position );
17401804 }
17411805 continue ;
17421806 }
1743-
1744- // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
1745- // the RangeSet recognize the "continuity" between adjacent Positions
1746- PositionImpl previousPosition = ledger .getPreviousPosition (position );
1747- individualDeletedMessages .addOpenClosed (previousPosition .getLedgerId (), previousPosition .getEntryId (),
1807+ if (position .ackSet == null ) {
1808+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
1809+ BitSetRecyclable bitSetRecyclable = batchDeletedIndexes .remove (position );
1810+ if (bitSetRecyclable != null ) {
1811+ bitSetRecyclable .recycle ();
1812+ }
1813+ }
1814+ // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will make
1815+ // the RangeSet recognize the "continuity" between adjacent Positions
1816+ PositionImpl previousPosition = ledger .getPreviousPosition (position );
1817+ individualDeletedMessages .addOpenClosed (previousPosition .getLedgerId (), previousPosition .getEntryId (),
17481818 position .getLedgerId (), position .getEntryId ());
1749- MSG_CONSUMED_COUNTER_UPDATER .incrementAndGet (this );
1819+ MSG_CONSUMED_COUNTER_UPDATER .incrementAndGet (this );
17501820
1751- if (log .isDebugEnabled ()) {
1752- log .debug ("[{}] [{}] Individually deleted messages: {}" , ledger .getName (), name ,
1821+ if (log .isDebugEnabled ()) {
1822+ log .debug ("[{}] [{}] Individually deleted messages: {}" , ledger .getName (), name ,
17531823 individualDeletedMessages );
1824+ }
1825+ } else if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
1826+ BitSetRecyclable bitSet = batchDeletedIndexes .computeIfAbsent (position , (v ) -> BitSetRecyclable .create ().resetWords (position .ackSet ));
1827+ BitSetRecyclable givenBitSet = BitSetRecyclable .create ().resetWords (position .ackSet );
1828+ bitSet .and (givenBitSet );
1829+ givenBitSet .recycle ();
1830+ if (bitSet .isEmpty ()) {
1831+ PositionImpl previousPosition = ledger .getPreviousPosition (position );
1832+ individualDeletedMessages .addOpenClosed (previousPosition .getLedgerId (), previousPosition .getEntryId (),
1833+ position .getLedgerId (), position .getEntryId ());
1834+ ++messagesConsumedCounter ;
1835+ BitSetRecyclable bitSetRecyclable = batchDeletedIndexes .remove (position );
1836+ if (bitSetRecyclable != null ) {
1837+ bitSetRecyclable .recycle ();
1838+ }
1839+ }
17541840 }
17551841 }
17561842
@@ -2062,6 +2148,9 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
20622148 info .addAllProperties (buildPropertiesMap (properties ));
20632149 if (persistIndividualDeletedMessageRanges ) {
20642150 info .addAllIndividualDeletedMessages (buildIndividualDeletedMessageRanges ());
2151+ if (config .isDeletionAtBatchIndexLevelEnabled ()) {
2152+ info .addAllBatchedEntryDeletionIndexInfo (buildBatchEntryDeletionIndexInfoList ());
2153+ }
20652154 }
20662155
20672156 if (log .isDebugEnabled ()) {
@@ -2306,11 +2395,38 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
23062395 }
23072396 }
23082397
2398+ private List <MLDataFormats .BatchedEntryDeletionIndexInfo > buildBatchEntryDeletionIndexInfoList () {
2399+ if (!config .isDeletionAtBatchIndexLevelEnabled () || batchDeletedIndexes == null || batchDeletedIndexes .isEmpty ()) {
2400+ return Collections .emptyList ();
2401+ }
2402+ MLDataFormats .NestedPositionInfo .Builder nestedPositionBuilder = MLDataFormats .NestedPositionInfo
2403+ .newBuilder ();
2404+ MLDataFormats .BatchedEntryDeletionIndexInfo .Builder batchDeletedIndexInfoBuilder = MLDataFormats .BatchedEntryDeletionIndexInfo
2405+ .newBuilder ();
2406+ List <MLDataFormats .BatchedEntryDeletionIndexInfo > result = Lists .newArrayList ();
2407+ Iterator <Map .Entry <PositionImpl , BitSetRecyclable >> iterator = batchDeletedIndexes .entrySet ().iterator ();
2408+ while (iterator .hasNext () && result .size () < config .getMaxBatchDeletedIndexToPersist ()) {
2409+ Map .Entry <PositionImpl , BitSetRecyclable > entry = iterator .next ();
2410+ nestedPositionBuilder .setLedgerId (entry .getKey ().getLedgerId ());
2411+ nestedPositionBuilder .setEntryId (entry .getKey ().getEntryId ());
2412+ batchDeletedIndexInfoBuilder .setPosition (nestedPositionBuilder .build ());
2413+ long [] array = entry .getValue ().toLongArray ();
2414+ List <Long > deleteSet = new ArrayList <>(array .length );
2415+ for (long l : array ) {
2416+ deleteSet .add (l );
2417+ }
2418+ batchDeletedIndexInfoBuilder .addAllDeleteSet (deleteSet );
2419+ result .add (batchDeletedIndexInfoBuilder .build ());
2420+ }
2421+ return result ;
2422+ }
2423+
23092424 void persistPositionToLedger (final LedgerHandle lh , MarkDeleteEntry mdEntry , final VoidCallback callback ) {
23102425 PositionImpl position = mdEntry .newPosition ;
23112426 PositionInfo pi = PositionInfo .newBuilder ().setLedgerId (position .getLedgerId ())
23122427 .setEntryId (position .getEntryId ())
23132428 .addAllIndividualDeletedMessages (buildIndividualDeletedMessageRanges ())
2429+ .addAllBatchedEntryDeletionIndexInfo (buildBatchEntryDeletionIndexInfoList ())
23142430 .addAllProperties (buildPropertiesMap (mdEntry .properties )).build ();
23152431
23162432
@@ -2693,6 +2809,16 @@ private ManagedCursorImpl cursorImpl() {
26932809 return this ;
26942810 }
26952811
2812+ @ Override
2813+ public long [] getDeletedBatchIndexesAsLongArray (PositionImpl position ) {
2814+ if (config .isDeletionAtBatchIndexLevelEnabled () && batchDeletedIndexes != null ) {
2815+ BitSetRecyclable bitSet = batchDeletedIndexes .get (position );
2816+ return bitSet == null ? null : bitSet .toLongArray ();
2817+ } else {
2818+ return null ;
2819+ }
2820+ }
2821+
26962822 void updateReadStats (int readEntriesCount , long readEntriesSize ) {
26972823 this .entriesReadCount += readEntriesCount ;
26982824 this .entriesReadSize += readEntriesSize ;
@@ -2723,7 +2849,5 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
27232849
27242850 return Math .min (maxEntriesBasedOnSize , maxEntries );
27252851 }
2726-
2727-
27282852 private static final Logger log = LoggerFactory .getLogger (ManagedCursorImpl .class );
27292853}
0 commit comments