Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/directLine.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ const notImplemented = (): never => { throw new Error('not implemented') };

export const mockActivity = (text: string): DirectLineExport.Activity => ({ type: 'message', from: { id: 'sender' }, text });

export const mockVoiceActivity = (): DirectLineExport.Activity => ({
type: 'event',
from: { id: 'sender' },
name: 'voiceLiveEvent',
value: {
voiceLiveEvent: {
type: 'type',
delta: 'base64AudioChunk'
}
}
});

// MOCK DirectLine Server (shared state used by Observable.ajax and WebSocket mocks)

interface ActivitySocket {
Expand Down
148 changes: 148 additions & 0 deletions src/directLine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,152 @@ describe('MockSuite', () => {
expect(actualError.status).toStrictEqual(429);
expect(endTime - startTime).toStrictEqual(10);
});

test('VoiceActivityWebSocket', () => {
const voiceActivity = DirectLineMock.mockVoiceActivity();
directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });

const actual: Array<DirectLineExport.Activity> = [];
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));

let postActivityCompleted = false;
let postActivityError: any;

const scenario = function* (): IterableIterator<Observable<unknown>> {
yield Observable.timer(200, scheduler);
yield directline.postActivity(voiceActivity)
.do(() => postActivityCompleted = true)
.catch(error => {
postActivityError = error;
return Observable.empty();
});
yield Observable.timer(200, scheduler);
};

subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
scheduler.flush();

// Assert that voice activity was sent successfully without errors
expect(postActivityCompleted).toBe(true);
expect(postActivityError).toBeUndefined();
});

test('VoiceActivityWithoutWebSocket', () => {
const voiceActivity = DirectLineMock.mockVoiceActivity();
directline = new DirectLineExport.DirectLine({ ...services, webSocket: false });

let actualError: any;

const scenario = function* (): IterableIterator<Observable<unknown>> {
yield Observable.timer(200, scheduler);
yield directline.postActivity(voiceActivity).catch(error => {
actualError = error;
return Observable.empty();
});
};

subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
scheduler.flush();

expect(actualError.message).toContain('Voice activities require WebSocket to be enabled');
});

test('VoiceVsTextActivityRouting', () => {
const voiceActivity = DirectLineMock.mockVoiceActivity();
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
const textActivity = DirectLineMock.mockActivity('hello');

directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });

const actual: Array<DirectLineExport.Activity> = [];
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));

let voiceCompleted = false;
let textCompleted = false;
let voiceError: any;
let textError: any;

const scenario = function* (): IterableIterator<Observable<unknown>> {
yield Observable.timer(200, scheduler);

// Send text activity (should go through HTTP/Ajax)
yield directline.postActivity(textActivity)
.do(() => textCompleted = true)
.catch(error => {
textError = error;
return Observable.empty();
});

yield Observable.timer(100, scheduler);

// Send voice activity (should go through WebSocket)
yield directline.postActivity(voiceActivity)
.do(() => voiceCompleted = true)
.catch(error => {
voiceError = error;
return Observable.empty();
});

yield Observable.timer(200, scheduler);
};

subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
scheduler.flush();

// Both should complete successfully but through different paths
expect(textCompleted).toBe(true);
expect(voiceCompleted).toBe(true);
expect(textError).toBeUndefined();
expect(voiceError).toBeUndefined();

// Text activity should echo back, voice activity should not
expect(actual).toContainEqual(textActivity);
expect(actual).not.toContainEqual(voiceActivity);
});

test('InvalidVoiceActivityStructures', () => {
const invalidStructures: DirectLineExport.Activity[] = [
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
{ type: 'event', from: { id: 'user' }, value: null } as any,
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
{ type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: null } } as any,
{ type: 'event', from: { id: 'user' }, value: { voiceLiveEvent: {} } },
{ type: 'event', from: { id: 'user' }, value: { notVoice: { data: 'test' } } } as any
];

directline = new DirectLineExport.DirectLine({ ...services, webSocket: true });

const actual: Array<DirectLineExport.Activity> = [];
subscriptions.push(directline.activity$.subscribe(a => actual.push(a)));

let completedCount = 0;
let errorCount = 0;

const scenario = function* (): IterableIterator<Observable<unknown>> {
yield Observable.timer(200, scheduler);

// Send each invalid structure - should all go through HTTP path
for (const invalidActivity of invalidStructures) {
yield directline.postActivity(invalidActivity)
.do(() => completedCount++)
.catch(error => {
errorCount++;
return Observable.empty();
});
yield Observable.timer(100, scheduler);
}

yield Observable.timer(200, scheduler);
};

subscriptions.push(lazyConcat(scenario()).observeOn(scheduler).subscribe());
scheduler.flush();

// All invalid structures should complete successfully through HTTP path
expect(completedCount).toBe(invalidStructures.length);
expect(errorCount).toBe(0);

// All invalid structures should echo back (confirming they went through HTTP, not WebSocket)
expect(actual).toHaveLength(invalidStructures.length);
invalidStructures.forEach(invalidActivity => {
expect(actual).toContainEqual(invalidActivity);
});
});
});
67 changes: 60 additions & 7 deletions src/directLine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ export class DirectLine implements IBotConnection {
public referenceGrammarId: string;
private timeout = 20 * 1000;
private retries: number;
private webSocketConnection: WebSocket | null = null;

private localeOnStartConversation: string;
private userIdOnStartConversation: string;
Expand Down Expand Up @@ -765,6 +766,32 @@ export class DirectLine implements IBotConnection {
if (activity.type === "message" && activity.attachments && activity.attachments.length > 0)
return this.postMessageWithAttachments(activity);

// if it is voice activity, send it through webSocket as voice over http is not supported in ABS.
// ABS limitation - client to server push is not being processed over web socket for text.
// Once it is implemented, we can remove this and send all traffic to the webSocket
if (this.isVoiceEventActivity(activity)) {
if (!this.webSocket) {
return Observable.throw(new Error('Voice activities require WebSocket to be enabled'), this.services.scheduler);
}
return this.checkConnection(true)
.flatMap(_ =>
Observable.create((subscriber: Subscriber<any>) => {
const envelope = { activities: [activity] };
try {
if (!this.webSocketConnection || this.webSocketConnection.readyState !== WebSocket.OPEN) {
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
throw new Error('WebSocket connection not ready for voice activities');
}
this.webSocketConnection.send(JSON.stringify(envelope));
subscriber.next(envelope);
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
subscriber.complete();
} catch (e) {
subscriber.error(e);
}
})
)
.catch(error => this.catchExpiredToken(error));
}

// If we're not connected to the bot, get connected
// Will throw an error if we are not connected
konsole.log("postActivity", activity);
Expand All @@ -786,6 +813,32 @@ export class DirectLine implements IBotConnection {
.catch(error => this.catchExpiredToken(error));
}

// Until activity protocol changes for multi-modal output are ratified, this method
// identifies voice event activities using the given activity example below as payload
// to send voice chunks over activity protocol. The activity structure shown serves as
// the current solution for transmitting voice data:
// { "type": "event", "value": { "voiceLiveEvent": { "type": "response.audio.delta", "delta": "<base64 chunk>" } } }
private isVoiceEventActivity(activity: Activity) {
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
if (activity.type !== 'event') {
Comment thread
pranavjoshi001 marked this conversation as resolved.
Outdated
return false;
}

if (!activity?.value || typeof activity?.value !== 'object') {
return false;
}

const vle = activity?.value?.voiceLiveEvent;
if (!vle || typeof vle !== 'object') {
return false;
}

if (Object.keys(vle).length === 0) {
return false;
}

return true;
}

private postMessageWithAttachments(message: Message) {
const { attachments } = message;
// We clean the attachments but making sure every attachment has unique name.
Expand Down Expand Up @@ -938,26 +991,26 @@ export class DirectLine implements IBotConnection {
private observableWebSocket<T>() {
return Observable.create((subscriber: Subscriber<T>) => {
konsole.log("creating WebSocket", this.streamUrl);
const ws = new this.services.WebSocket(this.streamUrl);
this.webSocketConnection = new this.services.WebSocket(this.streamUrl);
let sub: Subscription;
let closed: boolean;

ws.onopen = open => {
this.webSocketConnection.onopen = open => {
konsole.log("WebSocket open", open);
// Chrome is pretty bad at noticing when a WebSocket connection is broken.
// If we periodically ping the server with empty messages, it helps Chrome
// realize when connection breaks, and close the socket. We then throw an
// error, and that give us the opportunity to attempt to reconnect.
sub = Observable.interval(this.timeout, this.services.scheduler).subscribe(_ => {
try {
ws.send("")
this.webSocketConnection.send("")
} catch(e) {
konsole.log("Ping error", e);
}
});
}

ws.onclose = close => {
this.webSocketConnection.onclose = close => {
konsole.log("WebSocket close", close);
if (sub) sub.unsubscribe();

Expand All @@ -967,7 +1020,7 @@ export class DirectLine implements IBotConnection {
closed = true;
}

ws.onerror = error => {
this.webSocketConnection.onerror = error => {
konsole.log("WebSocket error", error);
if (sub) sub.unsubscribe();

Expand All @@ -977,14 +1030,14 @@ export class DirectLine implements IBotConnection {
closed = true;
}

ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));
this.webSocketConnection.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));

// This is the 'unsubscribe' method, which is called when this observable is disposed.
// When the WebSocket closes itself, we throw an error, and this function is eventually called.
// When the observable is closed first (e.g. when tearing down a WebChat instance) then
// we need to manually close the WebSocket.
return () => {
if (ws.readyState === 0 || ws.readyState === 1) ws.close();
if (this.webSocketConnection.readyState === 0 || this.webSocketConnection.readyState === 1) this.webSocketConnection.close();
}
}) as Observable<T>
}
Expand Down
Loading