Skip to content

Commit a0ce67e

Browse files
committed
stream: cache minimum cursor counts
Cache the minimum consumer cursor and track how many consumers are currently positioned at that cursor. This avoids scanning every consumer on each trim attempt when multiple consumers advance through shared buffers. Update share() and broadcast() to recompute the minimum cursor only when the last consumer at the cached minimum advances or detaches. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent a37c61d commit a0ce67e

4 files changed

Lines changed: 208 additions & 54 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
5+
const bench = common.createBenchmark(main, {
6+
consumers: [8, 32, 128],
7+
batches: [1e4],
8+
backpressure: ['block'],
9+
n: [5],
10+
}, {
11+
flags: ['--experimental-stream-iter'],
12+
});
13+
14+
async function main({ consumers, batches, backpressure, n }) {
15+
const { share, array } = require('stream/iter');
16+
const chunk = Buffer.alloc(1024);
17+
const totalOps = batches * consumers * n;
18+
19+
async function* source() {
20+
for (let i = 0; i < batches; i++) {
21+
yield [chunk];
22+
}
23+
}
24+
25+
bench.start();
26+
for (let i = 0; i < n; i++) {
27+
const shared = share(source(), { highWaterMark: 64, backpressure });
28+
const readers = Array.from({ length: consumers }, () => array(shared.pull()));
29+
await Promise.all(readers);
30+
}
31+
bench.end(totalOps);
32+
}

lib/internal/streams/iter/broadcast.js

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class BroadcastImpl {
9292
#options;
9393
#writer = null;
9494
#cachedMinCursor = 0;
95-
#minCursorDirty = false;
95+
#cachedMinCursorConsumers = 0;
9696

9797
constructor(options) {
9898
this.#options = options;
@@ -154,9 +154,11 @@ class BroadcastImpl {
154154
// since this consumer may now be the slowest.
155155
if (this.#consumers.size === 1) {
156156
this.#cachedMinCursor = state.cursor;
157-
this.#minCursorDirty = false;
157+
this.#cachedMinCursorConsumers = 1;
158+
} else if (state.cursor === this.#cachedMinCursor) {
159+
this.#cachedMinCursorConsumers++;
158160
} else {
159-
this.#minCursorDirty = true;
161+
this.#recomputeMinCursor();
160162
}
161163
const self = this;
162164

@@ -167,9 +169,9 @@ class BroadcastImpl {
167169
state.detached = true;
168170
state.resolve = null;
169171
state.reject = null;
170-
self.#consumers.delete(state);
171-
self.#minCursorDirty = true;
172-
self.#tryTrimBuffer();
172+
if (self.#deleteConsumer(state)) {
173+
self.#tryTrimBuffer();
174+
}
173175
}
174176

175177
return {
@@ -186,19 +188,21 @@ class BroadcastImpl {
186188
const bufferIndex = state.cursor - self.#bufferStart;
187189
if (bufferIndex < self.#buffer.length) {
188190
const chunk = self.#buffer.get(bufferIndex);
189-
// If this consumer was at the min cursor, mark dirty
190-
if (state.cursor <= self.#cachedMinCursor) {
191-
self.#minCursorDirty = true;
192-
}
191+
const cursor = state.cursor;
193192
state.cursor++;
194-
self.#tryTrimBuffer();
193+
if (cursor === self.#cachedMinCursor &&
194+
--self.#cachedMinCursorConsumers === 0) {
195+
self.#tryTrimBuffer();
196+
}
195197
return PromiseResolve(
196198
{ __proto__: null, done: false, value: chunk });
197199
}
198200

199201
if (self.#error) {
200202
state.detached = true;
201-
self.#consumers.delete(state);
203+
if (self.#deleteConsumer(state)) {
204+
self.#tryTrimBuffer();
205+
}
202206
return PromiseReject(self.#error);
203207
}
204208

@@ -274,9 +278,11 @@ class BroadcastImpl {
274278
this.#bufferStart++;
275279
for (const consumer of this.#consumers) {
276280
if (consumer.cursor < this.#bufferStart) {
281+
this.#deleteConsumerFromMin(consumer);
277282
consumer.cursor = this.#bufferStart;
278283
}
279284
}
285+
this.#recomputeMinCursor();
280286
break;
281287
case 'drop-newest':
282288
return true;
@@ -297,6 +303,7 @@ class BroadcastImpl {
297303
const bufferIndex = consumer.cursor - this.#bufferStart;
298304
if (bufferIndex < this.#buffer.length) {
299305
const chunk = this.#buffer.get(bufferIndex);
306+
this.#advanceConsumer(consumer);
300307
consumer.cursor++;
301308
consumer.resolve({ __proto__: null, done: false, value: chunk });
302309
} else {
@@ -306,6 +313,9 @@ class BroadcastImpl {
306313
consumer.reject = null;
307314
}
308315
}
316+
if (this.#consumers.size > 0 && this.#cachedMinCursorConsumers === 0) {
317+
this.#tryTrimBuffer();
318+
}
309319
}
310320

311321
[kAbort](reason) {
@@ -343,13 +353,14 @@ class BroadcastImpl {
343353
// Private methods
344354

345355
#recomputeMinCursor() {
346-
this.#cachedMinCursor = getMinCursor(
356+
const [minCursor, minCursorConsumers] = getMinCursor(
347357
this.#consumers, this.#bufferStart + this.#buffer.length);
348-
this.#minCursorDirty = false;
358+
this.#cachedMinCursor = minCursor;
359+
this.#cachedMinCursorConsumers = minCursorConsumers;
349360
}
350361

351362
#tryTrimBuffer() {
352-
if (this.#minCursorDirty) {
363+
if (this.#cachedMinCursorConsumers === 0) {
353364
this.#recomputeMinCursor();
354365
}
355366
const trimCount = this.#cachedMinCursor - this.#bufferStart;
@@ -364,6 +375,26 @@ class BroadcastImpl {
364375
}
365376
}
366377

378+
#advanceConsumer(consumer) {
379+
if (consumer.cursor === this.#cachedMinCursor) {
380+
this.#cachedMinCursorConsumers--;
381+
}
382+
}
383+
384+
#deleteConsumerFromMin(consumer) {
385+
if (consumer.cursor === this.#cachedMinCursor) {
386+
this.#cachedMinCursorConsumers--;
387+
}
388+
}
389+
390+
#deleteConsumer(consumer) {
391+
if (this.#consumers.delete(consumer)) {
392+
this.#deleteConsumerFromMin(consumer);
393+
return this.#cachedMinCursorConsumers === 0;
394+
}
395+
return false;
396+
}
397+
367398
#notifyConsumers() {
368399
const waiters = this.#waiters;
369400
if (waiters.length === 0) return;
@@ -376,9 +407,7 @@ class BroadcastImpl {
376407
const bufferIndex = consumer.cursor - this.#bufferStart;
377408
if (bufferIndex < this.#buffer.length) {
378409
const chunk = this.#buffer.get(bufferIndex);
379-
if (consumer.cursor <= this.#cachedMinCursor) {
380-
this.#minCursorDirty = true;
381-
}
410+
this.#advanceConsumer(consumer);
382411
consumer.cursor++;
383412
const resolve = consumer.resolve;
384413
consumer.resolve = null;
@@ -390,6 +419,9 @@ class BroadcastImpl {
390419
}
391420
}
392421
}
422+
if (this.#cachedMinCursorConsumers === 0) {
423+
this.#tryTrimBuffer();
424+
}
393425
}
394426
}
395427

0 commit comments

Comments
 (0)