Skip to content

Commit 8a32f1c

Browse files
committed
stream: cache minimum cursor count in share
Track the number of consumers at the cached minimum cursor in share() so the minimum is only recomputed when the last consumer at that cursor advances or detaches. This avoids scanning every consumer on each trim attempt when multiple consumers advance through a shared buffer. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent a37c61d commit 8a32f1c

4 files changed

Lines changed: 160 additions & 37 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict';
2+
3+
const common = require('../common.js');
4+
5+
const bench = common.createBenchmark(main, {
6+
consumers: [8, 32, 128],
7+
batches: [1e4],
8+
backpressure: ['block'],
9+
n: [5],
10+
}, {
11+
flags: ['--experimental-stream-iter'],
12+
});
13+
14+
async function main({ consumers, batches, backpressure, n }) {
15+
const { share, array } = require('stream/iter');
16+
const chunk = Buffer.alloc(1024);
17+
const totalOps = batches * consumers * n;
18+
19+
async function* source() {
20+
for (let i = 0; i < batches; i++) {
21+
yield [chunk];
22+
}
23+
}
24+
25+
bench.start();
26+
for (let i = 0; i < n; i++) {
27+
const shared = share(source(), { highWaterMark: 64, backpressure });
28+
const readers = Array.from({ length: consumers }, () => array(shared.pull()));
29+
await Promise.all(readers);
30+
}
31+
bench.end(totalOps);
32+
}

lib/internal/streams/iter/broadcast.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,9 @@ class BroadcastImpl {
343343
// Private methods
344344

345345
#recomputeMinCursor() {
346-
this.#cachedMinCursor = getMinCursor(
346+
const [minCursor] = getMinCursor(
347347
this.#consumers, this.#bufferStart + this.#buffer.length);
348+
this.#cachedMinCursor = minCursor;
348349
this.#minCursorDirty = false;
349350
}
350351

lib/internal/streams/iter/share.js

Lines changed: 114 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ class ShareImpl {
7777
#cancelled = false;
7878
#pulling = false;
7979
#pullWaiters = [];
80+
#cachedMinCursor = 0;
81+
#cachedMinCursorConsumers = 0;
8082

8183
constructor(source, options) {
8284
this.#source = source;
@@ -114,6 +116,14 @@ class ShareImpl {
114116
};
115117

116118
this.#consumers.add(state);
119+
if (this.#consumers.size === 1) {
120+
this.#cachedMinCursor = state.cursor;
121+
this.#cachedMinCursorConsumers = 1;
122+
} else if (state.cursor === this.#cachedMinCursor) {
123+
this.#cachedMinCursorConsumers++;
124+
} else {
125+
this.#recomputeMinCursor();
126+
}
117127
const self = this;
118128

119129
return {
@@ -139,22 +149,26 @@ class ShareImpl {
139149

140150
if (self.#cancelled) {
141151
state.detached = true;
142-
self.#consumers.delete(state);
152+
self.#deleteConsumer(state);
143153
return { __proto__: null, done: true, value: undefined };
144154
}
145155

146156
// Check if data is available in buffer
147157
const bufferIndex = state.cursor - self.#bufferStart;
148158
if (bufferIndex < self.#buffer.length) {
149159
const chunk = self.#buffer.get(bufferIndex);
160+
const cursor = state.cursor;
150161
state.cursor++;
151-
self.#tryTrimBuffer();
162+
if (cursor === self.#cachedMinCursor &&
163+
--self.#cachedMinCursorConsumers === 0) {
164+
self.#tryTrimBuffer();
165+
}
152166
return { __proto__: null, done: false, value: chunk };
153167
}
154168

155169
if (self.#sourceExhausted) {
156170
state.detached = true;
157-
self.#consumers.delete(state);
171+
self.#deleteConsumer(state);
158172
if (self.#sourceError) throw self.#sourceError;
159173
return { __proto__: null, done: true, value: undefined };
160174
}
@@ -163,7 +177,7 @@ class ShareImpl {
163177
const canPull = await self.#waitForBufferSpace();
164178
if (!canPull) {
165179
state.detached = true;
166-
self.#consumers.delete(state);
180+
self.#deleteConsumer(state);
167181
if (self.#sourceError) throw self.#sourceError;
168182
return { __proto__: null, done: true, value: undefined };
169183
}
@@ -176,17 +190,19 @@ class ShareImpl {
176190
state.detached = true;
177191
state.resolve = null;
178192
state.reject = null;
179-
self.#consumers.delete(state);
180-
self.#tryTrimBuffer();
193+
if (self.#deleteConsumer(state)) {
194+
self.#tryTrimBuffer();
195+
}
181196
return { __proto__: null, done: true, value: undefined };
182197
},
183198

184199
async throw() {
185200
state.detached = true;
186201
state.resolve = null;
187202
state.reject = null;
188-
self.#consumers.delete(state);
189-
self.#tryTrimBuffer();
203+
if (self.#deleteConsumer(state)) {
204+
self.#tryTrimBuffer();
205+
}
190206
return { __proto__: null, done: true, value: undefined };
191207
},
192208
};
@@ -254,9 +270,11 @@ class ShareImpl {
254270
this.#bufferStart++;
255271
for (const consumer of this.#consumers) {
256272
if (consumer.cursor < this.#bufferStart) {
273+
this.#deleteConsumerFromMin(consumer);
257274
consumer.cursor = this.#bufferStart;
258275
}
259276
}
277+
this.#recomputeMinCursor();
260278
return true;
261279
case 'drop-newest':
262280
return true;
@@ -324,18 +342,41 @@ class ShareImpl {
324342
}
325343

326344
#tryTrimBuffer() {
327-
const minCursor = getMinCursor(
328-
this.#consumers, this.#bufferStart + this.#buffer.length);
329-
const trimCount = minCursor - this.#bufferStart;
345+
if (this.#cachedMinCursorConsumers === 0) {
346+
this.#recomputeMinCursor();
347+
}
348+
const trimCount = this.#cachedMinCursor - this.#bufferStart;
330349
if (trimCount > 0) {
331350
this.#buffer.trimFront(trimCount);
332-
this.#bufferStart = minCursor;
351+
this.#bufferStart = this.#cachedMinCursor;
333352
for (let i = 0; i < this.#pullWaiters.length; i++) {
334353
this.#pullWaiters[i]();
335354
}
336355
this.#pullWaiters = [];
337356
}
338357
}
358+
359+
#recomputeMinCursor() {
360+
const [minCursor, minCursorConsumers] = getMinCursor(
361+
this.#consumers, this.#bufferStart + this.#buffer.length);
362+
this.#cachedMinCursor = minCursor;
363+
this.#cachedMinCursorConsumers = minCursorConsumers;
364+
}
365+
366+
#deleteConsumerFromMin(consumer) {
367+
if (consumer.cursor === this.#cachedMinCursor) {
368+
this.#cachedMinCursorConsumers--;
369+
}
370+
}
371+
372+
#deleteConsumer(consumer) {
373+
if (this.#consumers.delete(consumer)) {
374+
const wasAtMin = consumer.cursor === this.#cachedMinCursor;
375+
this.#deleteConsumerFromMin(consumer);
376+
return wasAtMin && this.#cachedMinCursorConsumers === 0;
377+
}
378+
return false;
379+
}
339380
}
340381

341382
// =============================================================================
@@ -352,6 +393,8 @@ class SyncShareImpl {
352393
#sourceExhausted = false;
353394
#sourceError = null;
354395
#cancelled = false;
396+
#cachedMinCursor = 0;
397+
#cachedMinCursorConsumers = 0;
355398

356399
constructor(source, options) {
357400
this.#source = source;
@@ -383,6 +426,14 @@ class SyncShareImpl {
383426
};
384427

385428
this.#consumers.add(state);
429+
if (this.#consumers.size === 1) {
430+
this.#cachedMinCursor = state.cursor;
431+
this.#cachedMinCursorConsumers = 1;
432+
} else if (state.cursor === this.#cachedMinCursor) {
433+
this.#cachedMinCursorConsumers++;
434+
} else {
435+
this.#recomputeMinCursor();
436+
}
386437
const self = this;
387438

388439
return {
@@ -396,26 +447,30 @@ class SyncShareImpl {
396447
}
397448
if (self.#sourceError) {
398449
state.detached = true;
399-
self.#consumers.delete(state);
450+
self.#deleteConsumer(state);
400451
throw self.#sourceError;
401452
}
402453
if (self.#cancelled) {
403454
state.detached = true;
404-
self.#consumers.delete(state);
455+
self.#deleteConsumer(state);
405456
return { __proto__: null, done: true, value: undefined };
406457
}
407458

408459
const bufferIndex = state.cursor - self.#bufferStart;
409460
if (bufferIndex < self.#buffer.length) {
410461
const chunk = self.#buffer.get(bufferIndex);
462+
const cursor = state.cursor;
411463
state.cursor++;
412-
self.#tryTrimBuffer();
464+
if (cursor === self.#cachedMinCursor &&
465+
--self.#cachedMinCursorConsumers === 0) {
466+
self.#tryTrimBuffer();
467+
}
413468
return { __proto__: null, done: false, value: chunk };
414469
}
415470

416471
if (self.#sourceExhausted) {
417472
state.detached = true;
418-
self.#consumers.delete(state);
473+
self.#deleteConsumer(state);
419474
return { __proto__: null, done: true, value: undefined };
420475
}
421476

@@ -436,13 +491,15 @@ class SyncShareImpl {
436491
self.#bufferStart++;
437492
for (const consumer of self.#consumers) {
438493
if (consumer.cursor < self.#bufferStart) {
494+
self.#deleteConsumerFromMin(consumer);
439495
consumer.cursor = self.#bufferStart;
440496
}
441497
}
498+
self.#recomputeMinCursor();
442499
break;
443500
case 'drop-newest':
444501
state.detached = true;
445-
self.#consumers.delete(state);
502+
self.#deleteConsumer(state);
446503
return { __proto__: null, done: true, value: undefined };
447504
}
448505
}
@@ -451,21 +508,25 @@ class SyncShareImpl {
451508

452509
if (self.#sourceError) {
453510
state.detached = true;
454-
self.#consumers.delete(state);
511+
self.#deleteConsumer(state);
455512
throw self.#sourceError;
456513
}
457514

458515
const newBufferIndex = state.cursor - self.#bufferStart;
459516
if (newBufferIndex < self.#buffer.length) {
460517
const chunk = self.#buffer.get(newBufferIndex);
518+
const cursor = state.cursor;
461519
state.cursor++;
462-
self.#tryTrimBuffer();
520+
if (cursor === self.#cachedMinCursor &&
521+
--self.#cachedMinCursorConsumers === 0) {
522+
self.#tryTrimBuffer();
523+
}
463524
return { __proto__: null, done: false, value: chunk };
464525
}
465526

466527
if (self.#sourceExhausted) {
467528
state.detached = true;
468-
self.#consumers.delete(state);
529+
self.#deleteConsumer(state);
469530
return { __proto__: null, done: true, value: undefined };
470531
}
471532

@@ -474,15 +535,17 @@ class SyncShareImpl {
474535

475536
return() {
476537
state.detached = true;
477-
self.#consumers.delete(state);
478-
self.#tryTrimBuffer();
538+
if (self.#deleteConsumer(state)) {
539+
self.#tryTrimBuffer();
540+
}
479541
return { __proto__: null, done: true, value: undefined };
480542
},
481543

482544
throw() {
483545
state.detached = true;
484-
self.#consumers.delete(state);
485-
self.#tryTrimBuffer();
546+
if (self.#deleteConsumer(state)) {
547+
self.#tryTrimBuffer();
548+
}
486549
return { __proto__: null, done: true, value: undefined };
487550
},
488551
};
@@ -532,13 +595,36 @@ class SyncShareImpl {
532595
}
533596

534597
#tryTrimBuffer() {
535-
const minCursor = getMinCursor(
536-
this.#consumers, this.#bufferStart + this.#buffer.length);
537-
const trimCount = minCursor - this.#bufferStart;
598+
if (this.#cachedMinCursorConsumers === 0) {
599+
this.#recomputeMinCursor();
600+
}
601+
const trimCount = this.#cachedMinCursor - this.#bufferStart;
538602
if (trimCount > 0) {
539603
this.#buffer.trimFront(trimCount);
540-
this.#bufferStart = minCursor;
604+
this.#bufferStart = this.#cachedMinCursor;
605+
}
606+
}
607+
608+
#recomputeMinCursor() {
609+
const [minCursor, minCursorConsumers] = getMinCursor(
610+
this.#consumers, this.#bufferStart + this.#buffer.length);
611+
this.#cachedMinCursor = minCursor;
612+
this.#cachedMinCursorConsumers = minCursorConsumers;
613+
}
614+
615+
#deleteConsumerFromMin(consumer) {
616+
if (consumer.cursor === this.#cachedMinCursor) {
617+
this.#cachedMinCursorConsumers--;
618+
}
619+
}
620+
621+
#deleteConsumer(consumer) {
622+
if (this.#consumers.delete(consumer)) {
623+
const wasAtMin = consumer.cursor === this.#cachedMinCursor;
624+
this.#deleteConsumerFromMin(consumer);
625+
return wasAtMin && this.#cachedMinCursorConsumers === 0;
541626
}
627+
return false;
542628
}
543629
}
544630

lib/internal/streams/iter/utils.js

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,24 @@ function onSignalAbort(signal, handler) {
7070
}
7171

7272
/**
73-
* Compute the minimum cursor across a set of consumers.
74-
* Returns fallback if the set is empty.
73+
* Compute the minimum cursor across a set of consumers and count how many
74+
* consumers are at that cursor.
7575
* @param {Set} consumers - Set of objects with a `cursor` property
76-
* @param {number} fallback - Value to return when set is empty
77-
* @returns {number}
76+
* @param {number} fallback - Cursor to return when set is empty
77+
* @returns {[number, number]}
7878
*/
7979
function getMinCursor(consumers, fallback) {
80-
let min = Infinity;
80+
let minCursor = fallback;
81+
let minCursorConsumers = 0;
8182
for (const consumer of consumers) {
82-
if (consumer.cursor < min) {
83-
min = consumer.cursor;
83+
if (consumer.cursor < minCursor) {
84+
minCursor = consumer.cursor;
85+
minCursorConsumers = 1;
86+
} else if (consumer.cursor === minCursor) {
87+
minCursorConsumers++;
8488
}
8589
}
86-
return min === Infinity ? fallback : min;
90+
return [minCursor, minCursorConsumers];
8791
}
8892

8993
/**

0 commit comments

Comments
 (0)