Skip to content

Commit 253b530

Browse files
Don't call stream.end() on Watch ended by server (#565)
1 parent 6ec78b4 commit 253b530

3 files changed

Lines changed: 89 additions & 77 deletions

File tree

handwritten/firestore/dev/src/reference.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {Timestamp} from './timestamp';
3333
import {DocumentData, OrderByDirection, Precondition, SetOptions, UpdateData, WhereFilterOp} from './types';
3434
import {autoId, requestTag} from './util';
3535
import {invalidArgumentMessage, validateEnumValue, validateFunction, validateInteger, validateMinNumberOfArguments} from './validate';
36-
import {Watch} from './watch';
36+
import {DocumentWatch, QueryWatch} from './watch';
3737
import {validateDocumentData, WriteBatch, WriteResult} from './write-batch';
3838

3939
import api = proto.google.firestore.v1;
@@ -448,7 +448,7 @@ export class DocumentReference {
448448
validateFunction('onNext', onNext);
449449
validateFunction('onError', onError, {optional: true});
450450

451-
const watch = Watch.forDocument(this);
451+
const watch = new DocumentWatch(this.firestore, this);
452452

453453
return watch.onSnapshot((readTime, size, docs) => {
454454
for (const document of docs()) {
@@ -1685,7 +1685,7 @@ export class Query {
16851685
validateFunction('onNext', onNext);
16861686
validateFunction('onError', onError, {optional: true});
16871687

1688-
const watch = Watch.forQuery(this);
1688+
const watch = new QueryWatch(this.firestore, this);
16891689

16901690
return watch.onSnapshot((readTime, size, docs, changes) => {
16911691
onNext(new QuerySnapshot(this, readTime, size, docs, changes));

handwritten/firestore/dev/src/watch.ts

Lines changed: 81 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,8 @@ type DocumentChangeSet = {
220220
* @class
221221
* @private
222222
*/
223-
export class Watch {
224-
private readonly _firestore: Firestore;
225-
private readonly _target: api.ITarget;
226-
private readonly _comparator: DocumentComparator;
223+
abstract class Watch {
224+
protected readonly _firestore: Firestore;
227225
private readonly _backoff: ExponentialBackoff;
228226
private readonly _requestTag: string;
229227

@@ -232,53 +230,21 @@ export class Watch {
232230
* @hideconstructor
233231
*
234232
* @param firestore The Firestore Database client.
235-
* @param target A Firestore 'Target' proto denoting the target to listen on.
236-
* @param comparator A comparator for QueryDocumentSnapshots that is used to
237-
* order the document snapshots returned by this watch.
238233
*/
239-
constructor(
240-
firestore: Firestore, target: api.ITarget,
241-
comparator: DocumentComparator) {
234+
constructor(firestore: Firestore) {
242235
this._firestore = firestore;
243-
this._target = target;
244-
this._comparator = comparator;
245236
this._backoff = new ExponentialBackoff();
246237
this._requestTag = requestTag();
247238
}
248239

249-
/**
250-
* Creates a new Watch instance to listen on DocumentReferences.
251-
*
252-
* @private
253-
* @param documentRef - The document reference for this watch.
254-
* @returns A newly created Watch instance.
255-
*/
256-
static forDocument(documentRef: DocumentReference): Watch {
257-
return new Watch(
258-
documentRef.firestore, {
259-
documents: {
260-
documents: [documentRef.formattedName],
261-
},
262-
targetId: WATCH_TARGET_ID,
263-
},
264-
DOCUMENT_WATCH_COMPARATOR);
265-
}
240+
/** Returns a 'Target' proto denoting the target to listen on. */
241+
protected abstract getTarget(resumeToken?: Uint8Array): Promise<api.ITarget>;
266242

267243
/**
268-
* Creates a new Watch instance to listen on Queries.
269-
*
270-
* @private
271-
* @param query The query used for this watch.
272-
* @returns A newly created Watch instance.
244+
* Returns a comparator for QueryDocumentSnapshots that is used to order the
245+
* document snapshots returned by this watch.
273246
*/
274-
static forQuery(query: Query): Watch {
275-
return new Watch(
276-
query.firestore, {
277-
query: query.toProto(),
278-
targetId: WATCH_TARGET_ID,
279-
},
280-
query.comparator());
281-
}
247+
protected abstract getComparator(): DocumentComparator;
282248

283249
/**
284250
* Determines whether an error is considered permanent and should not be
@@ -343,7 +309,7 @@ export class Watch {
343309
onError: (error: Error) => void): () => void {
344310
// The sorted tree of QueryDocumentSnapshots as sent in the last snapshot.
345311
// We only look at the keys.
346-
let docTree = rbtree(this._comparator);
312+
let docTree = rbtree(this.getComparator());
347313
// A map of document names to QueryDocumentSnapshots for the last sent
348314
// snapshot.
349315
let docMap = new Map<string, QueryDocumentSnapshot>();
@@ -358,7 +324,7 @@ export class Watch {
358324
// aren't docs.
359325
let hasPushed = false;
360326
// The server assigns and updates the resume token.
361-
let resumeToken: Uint8Array|null = null;
327+
let resumeToken: Uint8Array|undefined = undefined;
362328

363329
// Indicates whether we are interested in data from the stream. Set to false
364330
// in the 'unsubscribe()' callback.
@@ -367,10 +333,7 @@ export class Watch {
367333
// Sentinel value for a document remove.
368334
const REMOVED = {} as DocumentSnapshotBuilder;
369335

370-
const request: api.IListenRequest = {
371-
database: this._firestore.formattedName,
372-
addTarget: this._target,
373-
};
336+
const request: api.IListenRequest = {};
374337

375338
// We may need to replace the underlying stream on reset events.
376339
// This is the one that will be returned and proxy the current one.
@@ -382,7 +345,7 @@ export class Watch {
382345
const resetDocs = () => {
383346
logger('Watch.onSnapshot', this._requestTag, 'Resetting documents');
384347
changeMap.clear();
385-
resumeToken = null;
348+
resumeToken = undefined;
386349

387350
docTree.forEach((snapshot: QueryDocumentSnapshot) => {
388351
// Mark each document as deleted. If documents are not deleted, they
@@ -414,46 +377,53 @@ export class Watch {
414377
* Clears the change map.
415378
*/
416379
const maybeReopenStream = (err: GrpcError) => {
380+
if (currentStream) {
381+
currentStream.unpipe(stream);
382+
currentStream = null;
383+
}
384+
417385
if (isActive && !this.isPermanentError(err)) {
418386
logger(
419387
'Watch.onSnapshot', this._requestTag,
420388
'Stream ended, re-opening after retryable error: ', err);
421-
request.addTarget!.resumeToken = resumeToken;
422389
changeMap.clear();
423390

424391
if (this.isResourceExhaustedError(err)) {
425392
this._backoff.resetToMax();
426393
}
427394

428-
resetStream();
395+
initStream();
429396
} else {
430397
closeStream(err);
431398
}
432399
};
433400

434401
/** Helper to restart the outgoing stream to the backend. */
435-
const resetStream = () => {
436-
logger('Watch.onSnapshot', this._requestTag, 'Opening new stream');
402+
const restartStream = () => {
403+
logger('Watch.onSnapshot', this._requestTag, 'Restarting stream');
437404
if (currentStream) {
438405
currentStream.unpipe(stream);
439406
currentStream.end();
440407
currentStream = null;
441-
initStream();
442408
}
409+
initStream();
443410
};
444411

445412
/**
446413
* Initializes a new stream to the backend with backoff.
447414
*/
448415
const initStream = () => {
449-
this._backoff.backoffAndWait().then(() => {
416+
this._backoff.backoffAndWait().then(async () => {
450417
if (!isActive) {
451418
logger(
452419
'Watch.onSnapshot', this._requestTag,
453420
'Not initializing inactive stream');
454421
return;
455422
}
456423

424+
request.database = await this._firestore.formattedName;
425+
request.addTarget = await this.getTarget(resumeToken);
426+
457427
// Note that we need to call the internal _listen API to pass additional
458428
// header values in readWriteStream.
459429
this._firestore
@@ -605,21 +575,21 @@ export class Watch {
605575

606576
changes.deletes.sort((name1, name2) => {
607577
// Deletes are sorted based on the order of the existing document.
608-
return this._comparator(
578+
return this.getComparator()(
609579
updatedMap.get(name1)!, updatedMap.get(name2)!);
610580
});
611581
changes.deletes.forEach(name => {
612582
const change = deleteDoc(name);
613583
appliedChanges.push(change);
614584
});
615585

616-
changes.adds.sort(this._comparator);
586+
changes.adds.sort(this.getComparator());
617587
changes.adds.forEach(snapshot => {
618588
const change = addDoc(snapshot);
619589
appliedChanges.push(change);
620590
});
621591

622-
changes.updates.sort(this._comparator);
592+
changes.updates.sort(this.getComparator());
623593
changes.updates.forEach(snapshot => {
624594
const change = modifyDoc(snapshot);
625595
if (change) {
@@ -639,7 +609,7 @@ export class Watch {
639609
* Assembles a new snapshot from the current set of changes and invokes the
640610
* user's callback. Clears the current changes on completion.
641611
*/
642-
const push = (readTime: Timestamp, nextResumeToken: Uint8Array|null) => {
612+
const push = (readTime: Timestamp, nextResumeToken?: Uint8Array) => {
643613
const changes = extractChanges(docMap, changeMap, readTime);
644614
const diff = computeSnapshot(docTree, docMap, changes);
645615

@@ -776,7 +746,7 @@ export class Watch {
776746
// We need to remove all the current results.
777747
resetDocs();
778748
// The filter didn't match, so re-issue the query.
779-
resetStream();
749+
restartStream();
780750
}
781751
} else {
782752
closeStream(new Error(
@@ -801,3 +771,54 @@ export class Watch {
801771
};
802772
}
803773
}
774+
775+
/**
776+
* Creates a new Watch instance to listen on DocumentReferences.
777+
*
778+
* @private
779+
*/
780+
export class DocumentWatch extends Watch {
781+
constructor(firestore: Firestore, private readonly ref: DocumentReference) {
782+
super(firestore);
783+
}
784+
785+
getComparator(): DocumentComparator {
786+
return DOCUMENT_WATCH_COMPARATOR;
787+
}
788+
789+
async getTarget(resumeToken?: Uint8Array):
790+
Promise<google.firestore.v1.ITarget> {
791+
const formattedName = await this.ref.formattedName;
792+
return {
793+
documents: {
794+
documents: [formattedName],
795+
},
796+
targetId: WATCH_TARGET_ID,
797+
resumeToken
798+
};
799+
}
800+
}
801+
802+
/**
803+
* Creates a new Watch instance to listen on Queries.
804+
*
805+
* @private
806+
*/
807+
export class QueryWatch extends Watch {
808+
private comparator: DocumentComparator;
809+
810+
constructor(firestore: Firestore, private readonly query: Query) {
811+
super(firestore);
812+
this.comparator = query.comparator();
813+
}
814+
815+
getComparator(): DocumentComparator {
816+
return this.query.comparator();
817+
}
818+
819+
async getTarget(resumeToken?: Uint8Array):
820+
Promise<google.firestore.v1.ITarget> {
821+
const query = await this.query.toProto();
822+
return {query, targetId: WATCH_TARGET_ID, resumeToken};
823+
}
824+
}

handwritten/firestore/dev/test/watch.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class DeferredListener<T> {
209209
*/
210210
class StreamHelper {
211211
private readonly deferredListener =
212-
new DeferredListener<api.IListenResponse>();
212+
new DeferredListener<api.IListenRequest>();
213213
private backendStream: NodeJS.ReadWriteStream|null = null;
214214

215215
streamCount = 0; // The number of streams that the client has requested
@@ -242,12 +242,12 @@ class StreamHelper {
242242
/**
243243
* Returns a Promise with the next results from the underlying stream.
244244
*/
245-
await(type: string): Promise<api.IListenResponse|Error|undefined> {
245+
await(type: string): Promise<api.IListenRequest|Error|undefined> {
246246
return this.deferredListener.await(type);
247247
}
248248

249249
/** Waits for a destroyed stream to be re-opened. */
250-
awaitReopen(): Promise<api.IListenResponse> {
250+
awaitReopen(): Promise<api.IListenRequest> {
251251
return this.await('error')
252252
.then(() => this.await('close'))
253253
.then(() => this.awaitOpen());
@@ -257,9 +257,9 @@ class StreamHelper {
257257
* Waits for the stream to open and to receive its first message (the
258258
* AddTarget message).
259259
*/
260-
awaitOpen(): Promise<api.IListenResponse> {
260+
awaitOpen(): Promise<api.IListenRequest> {
261261
return this.await('open').then(() => {
262-
return this.await('data') as api.IListenResponse;
262+
return this.await('data') as api.IListenRequest;
263263
});
264264
}
265265

@@ -733,16 +733,10 @@ describe('Query watch', () => {
733733
return watchHelper.await('snapshot')
734734
.then(() => {
735735
streamHelper.close();
736-
return streamHelper.await('end');
737-
})
738-
.then(() => {
739736
return streamHelper.awaitOpen();
740737
})
741738
.then(() => {
742739
streamHelper.close();
743-
return streamHelper.await('end');
744-
})
745-
.then(() => {
746740
return streamHelper.awaitOpen();
747741
})
748742
.then(() => {
@@ -769,9 +763,6 @@ describe('Query watch', () => {
769763
})
770764
.then(() => {
771765
streamHelper.close();
772-
return streamHelper.await('end');
773-
})
774-
.then(() => {
775766
unsubscribe();
776767
expect(streamHelper.streamCount).to.equal(1);
777768
});

0 commit comments

Comments
 (0)