Skip to content

Commit f7a7d21

Browse files
author
Pranav Joshi
committed
post voice traffic only to socket
1 parent 13f7a42 commit f7a7d21

3 files changed

Lines changed: 220 additions & 7 deletions

File tree

src/directLine.mock.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@ const notImplemented = (): never => { throw new Error('not implemented') };
1111

1212
export const mockActivity = (text: string): DirectLineExport.Activity => ({ type: 'message', from: { id: 'sender' }, text });
1313

14+
export const mockVoiceActivity = (): DirectLineExport.Activity => ({
15+
type: 'event',
16+
from: { id: 'sender' },
17+
name: 'voiceLiveEvent',
18+
value: {
19+
voiceLiveEvent: {
20+
type: 'type',
21+
delta: 'base64AudioChunk'
22+
}
23+
}
24+
});
25+
1426
// MOCK DirectLine Server (shared state used by Observable.ajax and WebSocket mocks)
1527

1628
interface ActivitySocket {

src/directLine.test.ts

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,4 +243,152 @@ describe('MockSuite', () => {
243243
expect(actualError.status).toStrictEqual(429);
244244
expect(endTime - startTime).toStrictEqual(10);
245245
});
246+
247+
test('VoiceActivityWebSocket', () => {
248+
const voiceActivity = DirectLineMock.mockVoiceActivity();
249+
directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });
250+
251+
const actual: Array<DirectLineExport.Activity> = [];
252+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
253+
254+
let postActivityCompleted = false;
255+
let postActivityError: any;
256+
257+
const scenario = function* (): IterableIterator<Observable<unknown>> {
258+
yield Observable.timer(200, scheduler);
259+
yield directline.postActivity(voiceActivity)
260+
.do(() => postActivityCompleted = true)
261+
.catch(error => {
262+
postActivityError = error;
263+
return Observable.empty();
264+
});
265+
yield Observable.timer(200, scheduler);
266+
};
267+
268+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
269+
scheduler.flush();
270+
271+
// Assert that voice activity was sent successfully without errors
272+
expect(postActivityCompleted).toBe(true);
273+
expect(postActivityError).toBeUndefined();
274+
});
275+
276+
test('VoiceActivityWithoutWebSocket', () => {
277+
const voiceActivity = DirectLineMock.mockVoiceActivity();
278+
directline = new DirectLineExport.DirectLine({ ...services, webSocket: false });
279+
280+
let actualError: any;
281+
282+
const scenario = function* (): IterableIterator<Observable<unknown>> {
283+
yield Observable.timer(200, scheduler);
284+
yield directline.postActivity(voiceActivity).catch(error => {
285+
actualError = error;
286+
return Observable.empty();
287+
});
288+
};
289+
290+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
291+
scheduler.flush();
292+
293+
expect(actualError.message).toContain('Voice activities require WebSocket to be enabled');
294+
});
295+
296+
test('VoiceVsTextActivityRouting', () => {
297+
const voiceActivity = DirectLineMock.mockVoiceActivity();
298+
const textActivity = DirectLineMock.mockActivity('hello');
299+
300+
directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });
301+
302+
const actual: Array<DirectLineExport.Activity> = [];
303+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
304+
305+
let voiceCompleted = false;
306+
let textCompleted = false;
307+
let voiceError: any;
308+
let textError: any;
309+
310+
const scenario = function* (): IterableIterator<Observable<unknown>> {
311+
yield Observable.timer(200, scheduler);
312+
313+
// Send text activity (should go through HTTP/Ajax)
314+
yield directline.postActivity(textActivity)
315+
.do(() => textCompleted = true)
316+
.catch(error => {
317+
textError = error;
318+
return Observable.empty();
319+
});
320+
321+
yield Observable.timer(100, scheduler);
322+
323+
// Send voice activity (should go through WebSocket)
324+
yield directline.postActivity(voiceActivity)
325+
.do(() => voiceCompleted = true)
326+
.catch(error => {
327+
voiceError = error;
328+
return Observable.empty();
329+
});
330+
331+
yield Observable.timer(200, scheduler);
332+
};
333+
334+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
335+
scheduler.flush();
336+
337+
// Both should complete successfully but through different paths
338+
expect(textCompleted).toBe(true);
339+
expect(voiceCompleted).toBe(true);
340+
expect(textError).toBeUndefined();
341+
expect(voiceError).toBeUndefined();
342+
343+
// Text activity should echo back, voice activity should not
344+
expect(actual).toContainEqual(textActivity);
345+
expect(actual).not.toContainEqual(voiceActivity);
346+
});
347+
348+
test('InvalidVoiceActivityStructures', () => {
349+
const invalidStructures: DirectLineExport.Activity[] = [
350+
{ type: 'event', from: { id: 'user' }, value: null } as any,
351+
{ type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: null } } as any,
352+
{ type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: {} } },
353+
{ type: 'event', from: { id: 'user' }, value: { notVoice: { data: 'test' } } } as any
354+
];
355+
356+
directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });
357+
358+
const actual: Array<DirectLineExport.Activity> = [];
359+
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));
360+
361+
let completedCount = 0;
362+
let errorCount = 0;
363+
364+
const scenario = function* (): IterableIterator<Observable<unknown>> {
365+
yield Observable.timer(200, scheduler);
366+
367+
// Send each invalid structure - should all go through HTTP path
368+
for (const invalidActivity of invalidStructures) {
369+
yield directline.postActivity(invalidActivity)
370+
.do(() => completedCount++)
371+
.catch(error => {
372+
errorCount++;
373+
return Observable.empty();
374+
});
375+
yield Observable.timer(100, scheduler);
376+
}
377+
378+
yield Observable.timer(200, scheduler);
379+
};
380+
381+
subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
382+
scheduler.flush();
383+
384+
// All invalid structures should complete successfully through HTTP path
385+
expect(completedCount).toBe(invalidStructures.length);
386+
expect(errorCount).toBe(0);
387+
388+
// All invalid structures should echo back (confirming they went through HTTP, not WebSocket)
389+
expect(actual).toHaveLength(invalidStructures.length);
390+
invalidStructures.forEach(invalidActivity => {
391+
expect(actual).toContainEqual(invalidActivity);
392+
});
393+
});
246394
});

src/directLine.ts

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ export class DirectLine implements IBotConnection {
470470
public referenceGrammarId: string;
471471
private timeout = 20 * 1000;
472472
private retries: number;
473+
private webSocketConnection: WebSocket | null = null;
473474

474475
private localeOnStartConversation: string;
475476
private userIdOnStartConversation: string;
@@ -765,6 +766,32 @@ export class DirectLine implements IBotConnection {
765766
if (activity.type === "message" && activity.attachments && activity.attachments.length > 0)
766767
return this.postMessageWithAttachments(activity);
767768

769+
// if it is voice activity, send it through webSocket as voice over http is not supported in ABS.
770+
// ABS limitation - client to server push is not being processed over web socket for text.
771+
// Once it is implemented, we can remove this and send all traffic to the webSocket
772+
if (this.isVoiceEventActivity(activity)) {
773+
if (!this.webSocket) {
774+
return Observable.throw(new Error('Voice activities require WebSocket to be enabled'), this.services.scheduler);
775+
}
776+
return this.checkConnection(true)
777+
.flatMap(_ =>
778+
Observable.create((subscriber: Subscriber<any>) => {
779+
const envelope = { activities: [activity] };
780+
try {
781+
if (!this.webSocketConnection || this.webSocketConnection.readyState !== WebSocket.OPEN) {
782+
throw new Error('WebSocket connection not ready for voice activities');
783+
}
784+
this.webSocketConnection.send(JSON.stringify(envelope));
785+
subscriber.next(envelope);
786+
subscriber.complete();
787+
} catch (e) {
788+
subscriber.error(e);
789+
}
790+
})
791+
)
792+
.catch(error => this.catchExpiredToken(error));
793+
}
794+
768795
// If we're not connected to the bot, get connected
769796
// Will throw an error if we are not connected
770797
konsole.log("postActivity", activity);
@@ -786,6 +813,32 @@ export class DirectLine implements IBotConnection {
786813
.catch(error => this.catchExpiredToken(error));
787814
}
788815

816+
// Until activity protocol changes for multi-modal output are ratified, this method
817+
// identifies voice event activities using the given activity example below as payload
818+
// to send voice chunks over activity protocol. The activity structure shown serves as
819+
// the current solution for transmitting voice data:
820+
// { "type": "event", "value": { "voiceLiveEvent": { "type": "response.audio.delta", "delta": "<base64 chunk>" } } }
821+
private isVoiceEventActivity(activity: Activity) {
822+
if (activity.type !== 'event') {
823+
return false;
824+
}
825+
826+
if (!activity?.value || typeof activity?.value !== 'object') {
827+
return false;
828+
}
829+
830+
const vle = activity?.value?.voiceLiveEvent;
831+
if (!vle || typeof vle !== 'object') {
832+
return false;
833+
}
834+
835+
if (Object.keys(vle).length === 0) {
836+
return false;
837+
}
838+
839+
return true;
840+
}
841+
789842
private postMessageWithAttachments(message: Message) {
790843
const { attachments } = message;
791844
// We clean the attachments but making sure every attachment has unique name.
@@ -938,26 +991,26 @@ export class DirectLine implements IBotConnection {
938991
private observableWebSocket<T>() {
939992
return Observable.create((subscriber: Subscriber<T>) => {
940993
konsole.log("creating WebSocket", this.streamUrl);
941-
const ws = new this.services.WebSocket(this.streamUrl);
994+
this.webSocketConnection = new this.services.WebSocket(this.streamUrl);
942995
let sub: Subscription;
943996
let closed: boolean;
944997

945-
ws.onopen = open => {
998+
this.webSocketConnection.onopen = open => {
946999
konsole.log("WebSocket open", open);
9471000
// Chrome is pretty bad at noticing when a WebSocket connection is broken.
9481001
// If we periodically ping the server with empty messages, it helps Chrome
9491002
// realize when connection breaks, and close the socket. We then throw an
9501003
// error, and that give us the opportunity to attempt to reconnect.
9511004
sub = Observable.interval(this.timeout, this.services.scheduler).subscribe(_ => {
9521005
try {
953-
ws.send("")
1006+
this.webSocketConnection.send("")
9541007
} catch(e) {
9551008
konsole.log("Ping error", e);
9561009
}
9571010
});
9581011
}
9591012

960-
ws.onclose = close => {
1013+
this.webSocketConnection.onclose = close => {
9611014
konsole.log("WebSocket close", close);
9621015
if (sub) sub.unsubscribe();
9631016

@@ -967,7 +1020,7 @@ export class DirectLine implements IBotConnection {
9671020
closed = true;
9681021
}
9691022

970-
ws.onerror = error => {
1023+
this.webSocketConnection.onerror = error => {
9711024
konsole.log("WebSocket error", error);
9721025
if (sub) sub.unsubscribe();
9731026

@@ -977,14 +1030,14 @@ export class DirectLine implements IBotConnection {
9771030
closed = true;
9781031
}
9791032

980-
ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));
1033+
this.webSocketConnection.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));
9811034

9821035
// This is the 'unsubscribe' method, which is called when this observable is disposed.
9831036
// When the WebSocket closes itself, we throw an error, and this function is eventually called.
9841037
// When the observable is closed first (e.g. when tearing down a WebChat instance) then
9851038
// we need to manually close the WebSocket.
9861039
return () => {
987-
if (ws.readyState === 0 || ws.readyState === 1) ws.close();
1040+
if (this.webSocketConnection.readyState === 0 || this.webSocketConnection.readyState === 1) this.webSocketConnection.close();
9881041
}
9891042
}) as Observable<T>
9901043
}

0 commit comments

Comments
 (0)