@@ -149,10 +149,10 @@ export class DirectLine implements IBotConnection {
149149 return throwError ( errorFailedToConnect ) ;
150150
151151 case ConnectionStatus . ExpiredToken :
152- return throwError ( errorExpiredToken ) ;
152+ return of ( connectionStatus ) ;
153153
154154 default :
155- return of ( null ) ;
155+ return of ( connectionStatus ) ;
156156 }
157157 } )
158158 )
@@ -253,7 +253,10 @@ export class DirectLine implements IBotConnection {
253253 // if the token is expired there's no reason to keep trying
254254 this . expiredToken ( ) ;
255255 return throwError ( error ) ;
256+ } else if ( error . status === 404 ) {
257+ throwError ( error ) ;
256258 }
259+
257260 return of ( error ) ;
258261 } ) ,
259262 delay ( timeout ) ,
@@ -417,37 +420,60 @@ export class DirectLine implements IBotConnection {
417420 }
418421
419422 private pollingGetActivity$ ( ) {
420- return interval ( this . pollingInterval )
421- . pipe (
422- combineLatest ( this . checkConnection ( ) ) ,
423- mergeMap ( _ =>
423+ const poller$ : Observable < AjaxResponse > = Observable . create ( ( subscriber : Subscriber < any > ) => {
424+ // A BehaviorSubject to trigger polling. Since it is a BehaviorSubject
425+ // the first event is produced immediately.
426+ const trigger$ = new BehaviorSubject < any > ( { } ) ;
427+
428+ trigger$ . subscribe ( ( ) => {
429+ if ( this . connectionStatus$ . getValue ( ) === ConnectionStatus . Online ) {
430+ const startTimestamp = Date . now ( ) ;
431+
424432 ajax ( {
425- method : "GET" ,
426- url : `${ this . domain } /conversations/${ this . conversationId } /activities?watermark=${ this . watermark } ` ,
427- timeout,
428433 headers : {
429- "Accept" : "application/json" ,
430- "Authorization" : `Bearer ${ this . token } `
431- }
432- } )
433- . pipe (
434- catchError ( error => {
435- if ( error . status === 403 ) {
436- // This is slightly ugly. We want to update this.connectionStatus$ to ExpiredToken so that subsequent
437- // calls to checkConnection will throw an error. But when we do so, it causes this.checkConnection()
438- // to immediately throw an error, which is caught by the catch() below and transformed into an empty
439- // object. Then next() returns, and we emit an empty object. Which means one 403 is causing
440- // two empty objects to be emitted. Which is harmless but, again, slightly ugly.
441- this . expiredToken ( ) ;
434+ Accept : 'application/json' ,
435+ Authorization : `Bearer ${ this . token } `
436+ } ,
437+ method : 'GET' ,
438+ url : `${ this . domain } /conversations/${ this . conversationId } /activities?watermark=${ this . watermark } ` ,
439+ timeout
440+ } ) . subscribe (
441+ ( result : AjaxResponse ) => {
442+ subscriber . next ( result ) ;
443+ setTimeout ( ( ) => trigger$ . next ( null ) , Math . max ( 0 , this . pollingInterval - Date . now ( ) + startTimestamp ) ) ;
444+ } ,
445+ ( error : any ) => {
446+ switch ( error . status ) {
447+ case 403 :
448+ this . connectionStatus$ . next ( ConnectionStatus . ExpiredToken ) ;
449+ setTimeout ( ( ) => trigger$ . next ( null ) , this . pollingInterval ) ;
450+ break ;
451+
452+ case 404 :
453+ this . connectionStatus$ . next ( ConnectionStatus . Ended ) ;
454+ break ;
455+
456+ default :
457+ // propagate the error
458+ subscriber . error ( error ) ;
459+ break ;
442460 }
443- return empty ( ) ;
444- } ) ,
445- // .do(ajaxResponse => konsole.log("getActivityGroup ajaxResponse", ajaxResponse))
446- map ( ajaxResponse => ajaxResponse . response as ActivityGroup ) ,
447- mergeMap ( activityGroup => this . observableFromActivityGroup ( activityGroup ) )
448- )
449- ) ,
450- catchError ( error => empty ( ) )
461+ }
462+ ) ;
463+ }
464+ } ) ;
465+ } ) ;
466+
467+ return this . checkConnection ( )
468+ . pipe (
469+ mergeMap ( _ =>
470+ poller$
471+ . pipe (
472+ catchError ( ( ) => empty ( ) ) ,
473+ map ( ajaxResponse => ajaxResponse . response as ActivityGroup ) ,
474+ mergeMap ( activityGroup => this . observableFromActivityGroup ( activityGroup ) )
475+ )
476+ )
451477 ) ;
452478 }
453479
@@ -467,6 +493,7 @@ export class DirectLine implements IBotConnection {
467493 // but it's simpler just to always fetch a new one.
468494 . pipe (
469495 retryWhen ( error$ => error$ . pipe (
496+ delay ( 3000 ) ,
470497 mergeMap ( error => this . reconnectToConversation ( )
471498 ) ) )
472499 )
@@ -543,7 +570,10 @@ export class DirectLine implements IBotConnection {
543570 // token has expired. We can't recover from this here, but the embedding
544571 // website might eventually call reconnect() with a new token and streamUrl.
545572 this . expiredToken ( ) ;
573+ } else if ( error . status === 404 ) {
574+ throwError ( errorConversationEnded ) ;
546575 }
576+
547577 return of ( error ) ;
548578 } ) ,
549579 delay ( timeout ) ,
0 commit comments