@@ -130,10 +130,10 @@ export class DirectLine implements IBotConnection {
130130 return throwError ( errorFailedToConnect ) ;
131131
132132 case ConnectionStatus . ExpiredToken :
133- return throwError ( errorExpiredToken ) ;
133+ return of ( connectionStatus ) ;
134134
135135 default :
136- return of ( null ) ;
136+ return of ( connectionStatus ) ;
137137 }
138138 } )
139139 )
@@ -212,7 +212,10 @@ export class DirectLine implements IBotConnection {
212212 // if the token is expired there's no reason to keep trying
213213 this . expiredToken ( ) ;
214214 return throwError ( error ) ;
215+ } else if ( error . status === 404 ) {
216+ throwError ( error ) ;
215217 }
218+
216219 return of ( error ) ;
217220 } ) ,
218221 delay ( timeout ) ,
@@ -370,37 +373,60 @@ export class DirectLine implements IBotConnection {
370373 }
371374
372375 private pollingGetActivity$ ( ) {
373- return interval ( this . pollingInterval )
374- . pipe (
375- combineLatest ( this . checkConnection ( ) ) ,
376- mergeMap ( _ =>
376+ const poller$ : Observable < AjaxResponse > = Observable . create ( ( subscriber : Subscriber < any > ) => {
377+ // A BehaviorSubject to trigger polling. Since it is a BehaviorSubject
378+ // the first event is produced immediately.
379+ const trigger$ = new BehaviorSubject < any > ( { } ) ;
380+
381+ trigger$ . subscribe ( ( ) => {
382+ if ( this . connectionStatus$ . getValue ( ) === ConnectionStatus . Online ) {
383+ const startTimestamp = Date . now ( ) ;
384+
377385 ajax ( {
378- method : "GET" ,
379- url : `${ this . domain } /conversations/${ this . conversationId } /activities?watermark=${ this . watermark } ` ,
380- timeout,
381386 headers : {
382- "Accept" : "application/json" ,
383- "Authorization" : `Bearer ${ this . token } `
384- }
385- } )
386- . pipe (
387- catchError ( error => {
388- if ( error . status === 403 ) {
389- // This is slightly ugly. We want to update this.connectionStatus$ to ExpiredToken so that subsequent
390- // calls to checkConnection will throw an error. But when we do so, it causes this.checkConnection()
391- // to immediately throw an error, which is caught by the catch() below and transformed into an empty
392- // object. Then next() returns, and we emit an empty object. Which means one 403 is causing
393- // two empty objects to be emitted. Which is harmless but, again, slightly ugly.
394- this . expiredToken ( ) ;
387+ Accept : 'application/json' ,
388+ Authorization : `Bearer ${ this . token } `
389+ } ,
390+ method : 'GET' ,
391+ url : `${ this . domain } /conversations/${ this . conversationId } /activities?watermark=${ this . watermark } ` ,
392+ timeout
393+ } ) . subscribe (
394+ ( result : AjaxResponse ) => {
395+ subscriber . next ( result ) ;
396+ setTimeout ( ( ) => trigger$ . next ( null ) , Math . max ( 0 , this . pollingInterval - Date . now ( ) + startTimestamp ) ) ;
397+ } ,
398+ ( error : any ) => {
399+ switch ( error . status ) {
400+ case 403 :
401+ this . connectionStatus$ . next ( ConnectionStatus . ExpiredToken ) ;
402+ setTimeout ( ( ) => trigger$ . next ( null ) , this . pollingInterval ) ;
403+ break ;
404+
405+ case 404 :
406+ this . connectionStatus$ . next ( ConnectionStatus . Ended ) ;
407+ break ;
408+
409+ default :
410+ // propagate the error
411+ subscriber . error ( error ) ;
412+ break ;
395413 }
396- return empty ( ) ;
397- } ) ,
398- // .do(ajaxResponse => konsole.log("getActivityGroup ajaxResponse", ajaxResponse))
399- map ( ajaxResponse => ajaxResponse . response as ActivityGroup ) ,
400- mergeMap ( activityGroup => this . observableFromActivityGroup ( activityGroup ) )
401- )
402- ) ,
403- catchError ( error => empty ( ) )
414+ }
415+ ) ;
416+ }
417+ } ) ;
418+ } ) ;
419+
420+ return this . checkConnection ( )
421+ . pipe (
422+ mergeMap ( _ =>
423+ poller$
424+ . pipe (
425+ catchError ( ( ) => empty ( ) ) ,
426+ map ( ajaxResponse => ajaxResponse . response as ActivityGroup ) ,
427+ mergeMap ( activityGroup => this . observableFromActivityGroup ( activityGroup ) )
428+ )
429+ )
404430 ) ;
405431 }
406432
@@ -420,6 +446,7 @@ export class DirectLine implements IBotConnection {
420446 // but it's simpler just to always fetch a new one.
421447 . pipe (
422448 retryWhen ( error$ => error$ . pipe (
449+ delay ( 3000 ) ,
423450 mergeMap ( error => this . reconnectToConversation ( )
424451 ) ) )
425452 )
@@ -491,7 +518,10 @@ export class DirectLine implements IBotConnection {
491518 // token has expired. We can't recover from this here, but the embedding
492519 // website might eventually call reconnect() with a new token and streamUrl.
493520 this . expiredToken ( ) ;
521+ } else if ( error . status === 404 ) {
522+ throwError ( errorConversationEnded ) ;
494523 }
524+
495525 return of ( error ) ;
496526 } ) ,
497527 delay ( timeout ) ,
0 commit comments