diff --git a/eslint-suppressions.json b/eslint-suppressions.json index 62d7af273e..cef941a545 100644 --- a/eslint-suppressions.json +++ b/eslint-suppressions.json @@ -862,34 +862,45 @@ "count": 1 } }, - "packages/core-backend/src/AccountActivityService.test.ts": { + "packages/core-backend/src/api/shared-types.ts": { "no-restricted-syntax": { - "count": 2 + "count": 1 } }, - "packages/core-backend/src/AccountActivityService.ts": { + "packages/core-backend/src/index.ts": { "no-restricted-syntax": { - "count": 1 + "count": 4 } }, - "packages/core-backend/src/BackendWebSocketService.test.ts": { + "packages/core-backend/src/ws/AccountActivityService.test.ts": { "no-restricted-syntax": { - "count": 1 + "count": 2 } }, - "packages/core-backend/src/BackendWebSocketService.ts": { + "packages/core-backend/src/ws/AccountActivityService.ts": { "no-restricted-syntax": { - "count": 5 + "count": 1 } }, - "packages/core-backend/src/api/shared-types.ts": { + "packages/core-backend/src/ws/BackendWebSocketService.test.ts": { "no-restricted-syntax": { "count": 1 } }, - "packages/core-backend/src/index.ts": { + "packages/core-backend/src/ws/BackendWebSocketService.ts": { "no-restricted-syntax": { + "count": 5 + } + }, + "packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts": { + "@typescript-eslint/naming-convention": { "count": 2 + }, + "id-length": { + "count": 1 + }, + "no-restricted-syntax": { + "count": 1 } }, "packages/delegation-controller/src/DelegationController.test.ts": { diff --git a/packages/core-backend/CHANGELOG.md b/packages/core-backend/CHANGELOG.md index 08cdccdd2e..1d28aaa4b1 100644 --- a/packages/core-backend/CHANGELOG.md +++ b/packages/core-backend/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add `OHLCVService` for real-time OHLCV (candlestick) data streaming via WebSocket ([#8695](https://github.com/MetaMask/core/pull/8695)) + - Wraps `BackendWebSocketService` through the messenger pattern to provide subscribe/unsubscribe semantics for market-data OHLCV channels + - Includes reference counting, grace-period unsubscribe, idempotency checks, chain-status forwarding, and automatic resubscription on reconnect +- Export new types `OHLCVBar`, `OHLCVSubscriptionOptions`, `OHLCVSystemNotificationData`, `OHLCVServiceOptions`, `OHLCVServiceActions`, `OHLCVServiceAllowedActions`, `OHLCVServiceBarUpdatedEvent`, `OHLCVServiceChainStatusChangedEvent`, `OHLCVServiceSubscriptionErrorEvent`, `OHLCVServiceEvents`, `OHLCVServiceAllowedEvents`, and `OHLCVServiceMessenger` ([#8695](https://github.com/MetaMask/core/pull/8695)) +- Export new constants `OHLCV_SERVICE_ALLOWED_ACTIONS` and `OHLCV_SERVICE_ALLOWED_EVENTS` for configuring the messenger ([#8695](https://github.com/MetaMask/core/pull/8695)) + ### Changed - Bump `@metamask/accounts-controller` from `^38.1.0` to `^38.1.1` ([#8774](https://github.com/MetaMask/core/pull/8774)) diff --git a/packages/core-backend/package.json b/packages/core-backend/package.json index 1dbe1eb9ec..5dd31f7409 100644 --- a/packages/core-backend/package.json +++ b/packages/core-backend/package.json @@ -60,6 +60,7 @@ "@metamask/profile-sync-controller": "^28.0.2", "@metamask/utils": "^11.9.0", "@tanstack/query-core": "^5.62.16", + "async-mutex": "^0.5.0", "uuid": "^8.3.2" }, "devDependencies": { diff --git a/packages/core-backend/src/index.ts b/packages/core-backend/src/index.ts index 8245157118..6197c068f8 100644 --- a/packages/core-backend/src/index.ts +++ b/packages/core-backend/src/index.ts @@ -9,7 +9,7 @@ export { getCloseReason, WebSocketState, WebSocketEventType, -} from './BackendWebSocketService'; +} from './ws/BackendWebSocketService'; export type { BackendWebSocketServiceOptions, @@ -24,7 +24,7 @@ export type { BackendWebSocketServiceConnectionStateChangedEvent, BackendWebSocketServiceEvents, BackendWebSocketServiceMessenger, -} from './BackendWebSocketService'; +} from './ws/BackendWebSocketService'; // ============================================================================ // ACCOUNT ACTIVITY SERVICE @@ -34,7 +34,7 @@ export { AccountActivityService, ACCOUNT_ACTIVITY_SERVICE_ALLOWED_ACTIONS, ACCOUNT_ACTIVITY_SERVICE_ALLOWED_EVENTS, -} from './AccountActivityService'; +} from './ws/AccountActivityService'; export type { SystemNotificationData, @@ -49,7 +49,7 @@ export type { AccountActivityServiceEvents, AllowedEvents as AccountActivityServiceAllowedEvents, AccountActivityServiceMessenger, -} from './AccountActivityService'; +} from './ws/AccountActivityService'; // ============================================================================ // SHARED TYPES @@ -80,6 +80,31 @@ export type { ApiPlatformClientServiceMessenger, } from './ApiPlatformClientService'; +// ============================================================================ +// OHLCV SERVICE +// ============================================================================ + +export { + OHLCVService, + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './ws/ohlcv'; + +export type { + OHLCVBar, + OHLCVSubscriptionOptions, + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './ws/ohlcv'; + // ============================================================================ // API PLATFORM CLIENT // ============================================================================ diff --git a/packages/core-backend/src/AccountActivityService-method-action-types.ts b/packages/core-backend/src/ws/AccountActivityService-method-action-types.ts similarity index 100% rename from packages/core-backend/src/AccountActivityService-method-action-types.ts rename to packages/core-backend/src/ws/AccountActivityService-method-action-types.ts diff --git a/packages/core-backend/src/AccountActivityService.test.ts b/packages/core-backend/src/ws/AccountActivityService.test.ts similarity index 99% rename from packages/core-backend/src/AccountActivityService.test.ts rename to packages/core-backend/src/ws/AccountActivityService.test.ts index 0dd8630ee1..ac5ea12dbc 100644 --- a/packages/core-backend/src/AccountActivityService.test.ts +++ b/packages/core-backend/src/ws/AccountActivityService.test.ts @@ -7,7 +7,9 @@ import type { } from '@metamask/messenger'; import type { Hex } from '@metamask/utils'; -import { flushPromises } from '../../../tests/helpers'; +import { flushPromises } from '../../../../tests/helpers'; +import type { Transaction, BalanceUpdate } from '../types'; +import type { AccountActivityMessage } from '../types'; import { AccountActivityService } from './AccountActivityService'; import type { AccountActivityServiceMessenger, @@ -15,8 +17,6 @@ import type { } from './AccountActivityService'; import type { ServerNotificationMessage } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; -import type { Transaction, BalanceUpdate } from './types'; -import type { AccountActivityMessage } from './types'; type AllAccountActivityServiceActions = MessengerActions; diff --git a/packages/core-backend/src/AccountActivityService.ts b/packages/core-backend/src/ws/AccountActivityService.ts similarity index 99% rename from packages/core-backend/src/AccountActivityService.ts rename to packages/core-backend/src/ws/AccountActivityService.ts index a8b61a427e..88b97f1729 100644 --- a/packages/core-backend/src/AccountActivityService.ts +++ b/packages/core-backend/src/ws/AccountActivityService.ts @@ -13,6 +13,12 @@ import type { TraceCallback } from '@metamask/controller-utils'; import type { InternalAccount } from '@metamask/keyring-internal-api'; import type { Messenger } from '@metamask/messenger'; +import { projectLogger, createModuleLogger } from '../logger'; +import type { + Transaction, + AccountActivityMessage, + BalanceUpdate, +} from '../types'; import type { AccountActivityServiceMethodActions } from './AccountActivityService-method-action-types'; import type { WebSocketConnectionInfo, @@ -21,12 +27,6 @@ import type { } from './BackendWebSocketService'; import { WebSocketState } from './BackendWebSocketService'; import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from './logger'; -import type { - Transaction, - AccountActivityMessage, - BalanceUpdate, -} from './types'; // ============================================================================= // Types and Constants diff --git a/packages/core-backend/src/BackendWebSocketService-method-action-types.ts b/packages/core-backend/src/ws/BackendWebSocketService-method-action-types.ts similarity index 100% rename from packages/core-backend/src/BackendWebSocketService-method-action-types.ts rename to packages/core-backend/src/ws/BackendWebSocketService-method-action-types.ts diff --git a/packages/core-backend/src/BackendWebSocketService.test.ts b/packages/core-backend/src/ws/BackendWebSocketService.test.ts similarity index 99% rename from packages/core-backend/src/BackendWebSocketService.test.ts rename to packages/core-backend/src/ws/BackendWebSocketService.test.ts index cf113e1ba7..9c17e430c2 100644 --- a/packages/core-backend/src/BackendWebSocketService.test.ts +++ b/packages/core-backend/src/ws/BackendWebSocketService.test.ts @@ -5,7 +5,7 @@ import type { MockAnyNamespace, } from '@metamask/messenger'; -import { flushPromises } from '../../../tests/helpers'; +import { flushPromises } from '../../../../tests/helpers'; import { BackendWebSocketService, getCloseReason, diff --git a/packages/core-backend/src/BackendWebSocketService.ts b/packages/core-backend/src/ws/BackendWebSocketService.ts similarity index 99% rename from packages/core-backend/src/BackendWebSocketService.ts rename to packages/core-backend/src/ws/BackendWebSocketService.ts index d8ca9139d5..c71075c372 100644 --- a/packages/core-backend/src/BackendWebSocketService.ts +++ b/packages/core-backend/src/ws/BackendWebSocketService.ts @@ -9,8 +9,8 @@ import type { AuthenticationController } from '@metamask/profile-sync-controller import { getErrorMessage } from '@metamask/utils'; import { v4 as uuidV4 } from 'uuid'; +import { projectLogger, createModuleLogger } from '../logger'; import type { BackendWebSocketServiceMethodActions } from './BackendWebSocketService-method-action-types'; -import { projectLogger, createModuleLogger } from './logger'; const SERVICE_NAME = 'BackendWebSocketService' as const; diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts new file mode 100644 index 0000000000..6c30d85382 --- /dev/null +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService-method-action-types.ts @@ -0,0 +1,40 @@ +/** + * This file is auto generated. + * Do not edit manually. + */ + +import type { OHLCVService } from './OHLCVService'; + +/** + * Subscribe to an OHLCV channel. If this is the first subscriber for the + * given asset/interval/currency combination a WebSocket subscription is + * created. Additional calls for the same combination only bump the reference + * count. + * + * @param options - The subscription parameters. + * @returns A promise that resolves once the subscription is established. + */ +export type OHLCVServiceSubscribeAction = { + type: `OHLCVService:subscribe`; + handler: OHLCVService['subscribe']; +}; + +/** + * Unsubscribe from an OHLCV channel. Decrements the reference count and, + * when it reaches zero, starts a grace-period timer before actually + * unsubscribing from the WebSocket to absorb rapid navigation patterns. + * + * @param options - The subscription parameters to unsubscribe from. + * @returns A promise that resolves once the unsubscription is processed. + */ +export type OHLCVServiceUnsubscribeAction = { + type: `OHLCVService:unsubscribe`; + handler: OHLCVService['unsubscribe']; +}; + +/** + * Union of all OHLCVService action types. + */ +export type OHLCVServiceMethodActions = + | OHLCVServiceSubscribeAction + | OHLCVServiceUnsubscribeAction; diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts new file mode 100644 index 0000000000..92aac6a00e --- /dev/null +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.test.ts @@ -0,0 +1,1130 @@ +import { Messenger, MOCK_ANY_NAMESPACE } from '@metamask/messenger'; +import type { + MessengerActions, + MessengerEvents, + MockAnyNamespace, +} from '@metamask/messenger'; + +import { flushPromises } from '../../../../../tests/helpers'; +import type { ServerNotificationMessage } from '../BackendWebSocketService'; +import { WebSocketState } from '../BackendWebSocketService'; +import { OHLCVService } from './OHLCVService'; +import type { OHLCVServiceMessenger } from './OHLCVService'; +import type { OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Test Helpers +// ============================================================================= + +type AllOHLCVServiceActions = MessengerActions; +type AllOHLCVServiceEvents = MessengerEvents; + +type RootMessenger = Messenger< + MockAnyNamespace, + AllOHLCVServiceActions, + AllOHLCVServiceEvents +>; + +const completeAsyncOperations = async (timeoutMs = 0): Promise => { + // Multiple rounds are needed because the channel lock chains promises + // through .then(), requiring several microtask ticks to fully settle. + for (let i = 0; i < 5; i++) { + await flushPromises(); + } + if (timeoutMs > 0) { + await new Promise((resolve) => setTimeout(resolve, timeoutMs)); + } + await flushPromises(); +}; + +function getRootMessenger(): RootMessenger { + return new Messenger({ namespace: MOCK_ANY_NAMESPACE }); +} + +const getMessenger = (): { + rootMessenger: RootMessenger; + messenger: OHLCVServiceMessenger; + mocks: { + connect: jest.Mock; + subscribe: jest.Mock; + channelHasSubscription: jest.Mock; + getSubscriptionsByChannel: jest.Mock; + findSubscriptionsByChannelPrefix: jest.Mock; + forceReconnection: jest.Mock; + addChannelCallback: jest.Mock; + removeChannelCallback: jest.Mock; + getConnectionInfo: jest.Mock; + }; +} => { + const rootMessenger = getRootMessenger(); + const messenger: OHLCVServiceMessenger = new Messenger< + 'OHLCVService', + AllOHLCVServiceActions, + AllOHLCVServiceEvents, + RootMessenger + >({ + namespace: 'OHLCVService', + parent: rootMessenger, + }); + + rootMessenger.delegate({ + actions: [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', + ], + events: ['BackendWebSocketService:connectionStateChanged'], + messenger, + }); + + const mockConnect = jest.fn(); + const mockForceReconnection = jest.fn(); + const mockSubscribe = jest.fn(); + const mockChannelHasSubscription = jest.fn().mockReturnValue(false); + const mockGetSubscriptionsByChannel = jest.fn().mockReturnValue([]); + const mockFindSubscriptionsByChannelPrefix = jest.fn().mockReturnValue([]); + const mockAddChannelCallback = jest.fn(); + const mockRemoveChannelCallback = jest.fn(); + const mockGetConnectionInfo = jest.fn(); + + rootMessenger.registerActionHandler( + 'BackendWebSocketService:connect', + mockConnect, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:forceReconnection', + mockForceReconnection, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:subscribe', + mockSubscribe, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:channelHasSubscription', + mockChannelHasSubscription, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getSubscriptionsByChannel', + mockGetSubscriptionsByChannel, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + mockFindSubscriptionsByChannelPrefix, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:addChannelCallback', + mockAddChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:removeChannelCallback', + mockRemoveChannelCallback, + ); + rootMessenger.registerActionHandler( + 'BackendWebSocketService:getConnectionInfo', + mockGetConnectionInfo, + ); + + return { + rootMessenger, + messenger, + mocks: { + connect: mockConnect, + subscribe: mockSubscribe, + channelHasSubscription: mockChannelHasSubscription, + getSubscriptionsByChannel: mockGetSubscriptionsByChannel, + findSubscriptionsByChannelPrefix: mockFindSubscriptionsByChannelPrefix, + forceReconnection: mockForceReconnection, + addChannelCallback: mockAddChannelCallback, + removeChannelCallback: mockRemoveChannelCallback, + getConnectionInfo: mockGetConnectionInfo, + }, + }; +}; + +type WithServiceCallback = (payload: { + service: OHLCVService; + messenger: OHLCVServiceMessenger; + rootMessenger: RootMessenger; + mocks: ReturnType['mocks']; + destroy: () => void; +}) => Promise | R; + +async function withService(fn: WithServiceCallback): Promise { + const setup = getMessenger(); + const service = new OHLCVService({ messenger: setup.messenger }); + service.init(); + + try { + return await fn({ + service, + messenger: setup.messenger, + rootMessenger: setup.rootMessenger, + mocks: setup.mocks, + destroy: () => service.destroy(), + }); + } finally { + service.destroy(); + } +} + +const getSystemNotificationCallback = (mocks: { + addChannelCallback: jest.Mock; +}): ((notification: ServerNotificationMessage) => void) => { + const call = mocks.addChannelCallback.mock.calls.find( + (c: unknown[]) => + c[0] && + typeof c[0] === 'object' && + 'channelName' in c[0] && + (c[0] as { channelName: string }).channelName === + 'system-notifications.v1.market-data.v1', + ); + + if (!call) { + throw new Error('system notification callback not registered'); + } + + return (call[0] as { callback: (n: ServerNotificationMessage) => void }) + .callback; +}; + +// ============================================================================= +// Shared Constants +// ============================================================================= + +const SUB_OPTS: OHLCVSubscriptionOptions = { + assetId: 'eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913', + interval: '1m', + currency: 'usd', +}; + +const EXPECTED_CHANNEL = + 'market-data.v1.eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913.1m.usd'; + +const BASE_CONNECTION_INFO = { + url: 'ws://test', + timeout: 10000, + reconnectDelay: 500, + maxReconnectDelay: 5000, + requestTimeout: 30000, +}; + +// ============================================================================= +// Tests +// ============================================================================= + +describe('OHLCVService', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + // =========================================================================== + // Constructor + // =========================================================================== + + describe('constructor', () => { + it('should register method action handlers and system-notifications callback', async () => { + await withService(async ({ service, mocks }) => { + expect(service).toBeInstanceOf(OHLCVService); + expect(service.name).toBe('OHLCVService'); + + expect(mocks.addChannelCallback).toHaveBeenCalledWith({ + channelName: 'system-notifications.v1.market-data.v1', + callback: expect.any(Function), + }); + }); + }); + }); + + // =========================================================================== + // Subscribe + // =========================================================================== + + describe('subscribe', () => { + it('should connect and create a WebSocket subscription for a new channel', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).toHaveBeenCalledTimes(1); + expect(mocks.channelHasSubscription).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip WS subscribe if the channel already has a subscription', async () => { + await withService(async ({ service, mocks }) => { + mocks.channelHasSubscription.mockReturnValue(true); + + await service.subscribe(SUB_OPTS); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should increment refCount on duplicate subscribe without WS traffic', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should force reconnection when subscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.connect.mockRejectedValueOnce(new Error('connection failed')); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('connection failed'), + operation: 'subscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalledTimes(1); + }); + }); + + it('should publish barUpdated events when WebSocket delivers data', async () => { + await withService(async ({ service, mocks, messenger }) => { + let capturedCallback: (n: ServerNotificationMessage) => void = + jest.fn(); + + mocks.subscribe.mockImplementation((opts) => { + capturedCallback = opts.callback; + return Promise.resolve(); + }); + + await service.subscribe(SUB_OPTS); + + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + capturedCallback({ + event: 'data', + subscriptionId: 'sub-1', + timestamp: 1776364071003, + channel: EXPECTED_CHANNEL, + data: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + } as ServerNotificationMessage); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 1776364020, + open: 74.099, + high: 74.1, + low: 74.083, + close: 74.099, + volume: 5806.43, + }, + }); + }); + }); + }); + + // =========================================================================== + // Unsubscribe + // =========================================================================== + + describe('unsubscribe', () => { + it('should be a no-op if channel was never subscribed', async () => { + await withService(async ({ service, mocks }) => { + await service.unsubscribe(SUB_OPTS); + + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should decrement refCount without unsubscribing when other consumers remain', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + + // No timer should have been started, no WS unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should start a grace-period timer and unsubscribe after expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Before grace period expires — still subscribed + expect(mockUnsub).not.toHaveBeenCalled(); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mocks.getSubscriptionsByChannel).toHaveBeenCalledWith( + EXPECTED_CHANNEL, + ); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Grace Period — Re-subscribe During Grace + // =========================================================================== + + describe('grace period', () => { + it('should cancel grace-period timer if re-subscribed before expiry', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // WS subscription still exists (no disconnect happened) + mocks.channelHasSubscription.mockReturnValue(true); + + // Re-subscribe during grace period + jest.advanceTimersByTime(1000); + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + // Should NOT have called connect/subscribe again — subscription is still alive + expect(mocks.connect).not.toHaveBeenCalled(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + + // Advance past original grace period — should NOT unsubscribe + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + }); + }); + + it('should not flush same-channel grace period (reuse instead)', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // WS subscription still exists (no disconnect happened) + mocks.channelHasSubscription.mockReturnValue(true); + + // Re-subscribe to the SAME channel — should cancel grace, not flush + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + // Should NOT have unsubscribed (grace was cancelled, not flushed) + expect(mockUnsub).not.toHaveBeenCalled(); + // Should NOT have created a new WS subscription (reused existing) + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should unsubscribe old channels via grace period during rapid time-range switching', async () => { + const opts1m = SUB_OPTS; + const opts1h: OHLCVSubscriptionOptions = { + ...SUB_OPTS, + interval: '1h', + }; + + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + // Subscribe 1m → unsubscribe → subscribe 1h + await service.subscribe(opts1m); + await service.unsubscribe(opts1m); + await service.subscribe(opts1h); + + // 1m is in grace period, not yet unsubscribed + expect(mockUnsub).not.toHaveBeenCalled(); + + // Grace period expires — old channel cleaned up + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Reference Counting + // =========================================================================== + + describe('reference counting', () => { + it('should share a single WS subscription across multiple consumers', async () => { + await withService(async ({ service, mocks }) => { + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + // Only one WS subscribe call + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + + // Unsubscribe twice — refCount goes from 3 → 1 + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Still has one consumer — no WS unsubscribe + expect(mocks.getSubscriptionsByChannel).not.toHaveBeenCalled(); + }); + }); + + it('should unsubscribe from WS when all consumers leave and grace expires', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.subscribe(SUB_OPTS); + + await service.unsubscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Race Condition — Per-Channel Locking + // =========================================================================== + + describe('per-channel locking', () => { + it('should serialize concurrent subscribes so refCount is correct', async () => { + await withService(async ({ service, mocks }) => { + let connectResolve!: () => void; + mocks.connect.mockImplementation( + () => + new Promise((resolve) => { + connectResolve = resolve; + }), + ); + + const p1 = service.subscribe(SUB_OPTS); + // Let the microtask tick so `connect` is called and `connectResolve` is assigned + await flushPromises(); + + const p2 = service.subscribe(SUB_OPTS); + + // p1 is waiting on connect, p2 is queued behind it via the lock + connectResolve(); + mocks.connect.mockResolvedValue(undefined); + await p1; + await p2; + + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + // refCount must be 2 — first unsubscribe drops it to 1, no grace timer + await service.unsubscribe(SUB_OPTS); + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + expect(mockUnsub).not.toHaveBeenCalled(); + + // Second unsubscribe drops refCount to 0 → grace timer → WS unsubscribe + await service.unsubscribe(SUB_OPTS); + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + + it('should serialize concurrent subscribe + unsubscribe so refCount never corrupts', async () => { + await withService(async ({ service, mocks }) => { + let connectResolve!: () => void; + mocks.connect.mockImplementation( + () => + new Promise((resolve) => { + connectResolve = resolve; + }), + ); + + const pSub = service.subscribe(SUB_OPTS); + await flushPromises(); + + const pUnsub = service.unsubscribe(SUB_OPTS); + + connectResolve(); + await pSub; + await pUnsub; + + // After subscribe then unsubscribe, refCount is 0 → grace timer starts + // Advance past grace period + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + + it('should create a fresh WS subscription when subscribe races with grace-period unsubscribe', async () => { + await withService(async ({ service, mocks }) => { + let unsubResolve!: () => void; + const mockUnsub = jest.fn( + () => + new Promise((resolve) => { + unsubResolve = resolve; + }), + ); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await flushPromises(); + + const subscribePromise = service.subscribe(SUB_OPTS); + unsubResolve(); + await subscribePromise; + + expect(mocks.subscribe).toHaveBeenCalledTimes(2); + }); + }); + }); + + // =========================================================================== + // Reconnect Resilience + // =========================================================================== + + describe('reconnect', () => { + it('should resubscribe active channels on WebSocket CONNECTED', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + }); + }); + + it('should skip resubscribe if channel already has a subscription after reconnect', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(true); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should not resubscribe channels in grace period (refCount === 0)', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Channel is now in grace period (refCount === 0, timer running) + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(mocks.subscribe).not.toHaveBeenCalled(); + }); + }); + + it('should recreate WS subscription when re-subscribing during grace period after disconnect', async () => { + await withService( + async ({ service, mocks, messenger, rootMessenger }) => { + // 1. Subscribe — creates WS subscription, refCount = 1 + await service.subscribe(SUB_OPTS); + + // 2. Unsubscribe — refCount = 0, grace-period timer starts + await service.unsubscribe(SUB_OPTS); + + // 3. Disconnect — BackendWebSocketService clears all server-side + // subscriptions. channelHasSubscription now returns false. + mocks.channelHasSubscription.mockReturnValue(false); + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.DISCONNECTED, + connectedAt: undefined, + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + // 4. Reconnect — resubscribeActiveChannels skips this channel + // because refCount is 0 (correct behaviour). + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await completeAsyncOperations(); + expect(mocks.subscribe).not.toHaveBeenCalled(); + + // 5. User re-subscribes BEFORE grace timer fires. + // The grace-period branch cancels the timer and bumps refCount, + // but the underlying WS subscription no longer exists. + // The fix must detect this and create a fresh WS subscription. + mocks.subscribe.mockClear(); + mocks.connect.mockClear(); + await service.subscribe(SUB_OPTS); + + expect(mocks.connect).toHaveBeenCalledTimes(1); + expect(mocks.subscribe).toHaveBeenCalledWith({ + channels: [EXPECTED_CHANNEL], + channelType: 'market-data.v1', + callback: expect.any(Function), + }); + + // 6. Verify bar updates are delivered through the new subscription. + const capturedCallback = mocks.subscribe.mock.calls[0][0].callback; + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + capturedCallback({ + data: { + timestamp: 200, + open: 10, + high: 20, + low: 5, + close: 15, + volume: 1000, + }, + timestamp: Date.now(), + }); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 200, + open: 10, + high: 20, + low: 5, + close: 15, + volume: 1000, + }, + }); + }, + ); + }); + + it('should deliver bar updates via resubscribed channel callback', async () => { + await withService( + async ({ service, mocks, messenger, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + const resubscribeCallback = mocks.subscribe.mock.calls[0][0].callback; + const barListener = jest.fn(); + messenger.subscribe('OHLCVService:barUpdated', barListener); + + resubscribeCallback({ + data: { + timestamp: 100, + open: 1, + high: 2, + low: 0.5, + close: 1.5, + volume: 999, + }, + timestamp: Date.now(), + }); + + expect(barListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + bar: { + timestamp: 100, + open: 1, + high: 2, + low: 0.5, + close: 1.5, + volume: 999, + }, + }); + }, + ); + }); + + it('should publish chainStatusChanged down on DISCONNECTED', async () => { + await withService(async ({ mocks, messenger, rootMessenger }) => { + const statusListener = jest.fn(); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); + + // Simulate a system notification marking a chain as up + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'up' }, + timestamp: Date.now(), + } as ServerNotificationMessage); + + statusListener.mockClear(); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.DISCONNECTED, + connectedAt: undefined, + reconnectAttempts: 0, + }, + ); + await completeAsyncOperations(); + + expect(statusListener).toHaveBeenCalledWith( + expect.objectContaining({ + chainIds: ['eip155:8453'], + status: 'down', + }), + ); + }); + }); + }); + + // =========================================================================== + // System Notifications + // =========================================================================== + + describe('system notifications', () => { + it('should forward chain-down notifications via chainStatusChanged event', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:8453'], status: 'down' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:8453'], + status: 'down', + timestamp: 1776364071003, + }); + }); + }); + + it('should forward chain-up notifications', async () => { + await withService(async ({ mocks, messenger }) => { + const statusListener = jest.fn(); + messenger.subscribe('OHLCVService:chainStatusChanged', statusListener); + + const systemCallback = getSystemNotificationCallback(mocks); + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { chainIds: ['eip155:1', 'eip155:137'], status: 'up' }, + timestamp: 1776364071003, + } as ServerNotificationMessage); + + expect(statusListener).toHaveBeenCalledWith({ + chainIds: ['eip155:1', 'eip155:137'], + status: 'up', + timestamp: 1776364071003, + }); + }); + }); + + it('should throw on invalid system notification data', async () => { + await withService(async ({ mocks }) => { + const systemCallback = getSystemNotificationCallback(mocks); + + expect(() => + systemCallback({ + event: 'system-notification', + channel: 'system-notifications.v1.market-data.v1', + data: { invalid: true }, + timestamp: Date.now(), + } as unknown as ServerNotificationMessage), + ).toThrow('Invalid system notification data'); + }); + }); + }); + + // =========================================================================== + // Error Paths + // =========================================================================== + + describe('error paths', () => { + it('should publish subscriptionError and force reconnection when unsubscribe fails', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.getSubscriptionsByChannel.mockImplementation(() => { + throw new Error('ws gone'); + }); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('ws gone'), + operation: 'unsubscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalled(); + }); + }); + + it('should not produce unhandled rejection when forceReconnection throws during grace-period unsubscribe', async () => { + await withService(async ({ service, mocks, messenger }) => { + mocks.getSubscriptionsByChannel.mockImplementation(() => { + throw new Error('ws gone'); + }); + mocks.forceReconnection.mockRejectedValue( + new Error('reconnect also failed'), + ); + + const errorListener = jest.fn(); + messenger.subscribe('OHLCVService:subscriptionError', errorListener); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + expect(errorListener).toHaveBeenCalledWith({ + channel: EXPECTED_CHANNEL, + error: expect.stringContaining('ws gone'), + operation: 'unsubscribe', + }); + expect(mocks.forceReconnection).toHaveBeenCalled(); + }); + }); + + it('should log and continue when resubscription fails for a channel', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + mocks.subscribe.mockRejectedValueOnce(new Error('resubscribe fail')); + + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await completeAsyncOperations(); + + // Should have attempted but failed silently + expect(mocks.subscribe).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Reconnect + Concurrent Mutation Safety + // =========================================================================== + + describe('resubscribe holds mutex to prevent concurrent mutation', () => { + it('should block unsubscribe until resubscription completes, preventing orphaned WS subscriptions', async () => { + await withService(async ({ service, mocks, rootMessenger }) => { + await service.subscribe(SUB_OPTS); + mocks.subscribe.mockClear(); + mocks.channelHasSubscription.mockReturnValue(false); + + // Make the WS subscribe during reconnect take time so we can + // attempt a concurrent unsubscribe while it's in progress. + let resubResolve!: () => void; + mocks.subscribe.mockImplementation( + () => + new Promise((resolve) => { + resubResolve = resolve; + }), + ); + + // Trigger reconnect — this calls #resubscribeActiveChannels which + // now holds the mutex across the entire loop. + rootMessenger.publish( + 'BackendWebSocketService:connectionStateChanged', + { + ...BASE_CONNECTION_INFO, + state: WebSocketState.CONNECTED, + connectedAt: Date.now(), + reconnectAttempts: 1, + }, + ); + await flushPromises(); + + // Concurrent unsubscribe — must queue behind the mutex. + const unsubPromise = service.unsubscribe(SUB_OPTS); + + // The unsubscribe hasn't run yet because the mutex is held. + // Complete the WS resubscription. + resubResolve(); + await flushPromises(); + await unsubPromise; + + // refCount was 1 at reconnect time; after resubscribe completes + // the queued unsubscribe drops it to 0 and starts the grace timer. + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + jest.advanceTimersByTime(3000); + await completeAsyncOperations(); + + // The grace-period unsubscribe fires cleanly — no orphaned subscription. + expect(mockUnsub).toHaveBeenCalledTimes(1); + }); + }); + }); + + // =========================================================================== + // Destroy + // =========================================================================== + + describe('destroy', () => { + it('should clear grace-period timers and remove channel callback', async () => { + await withService(async ({ service, mocks }) => { + const mockUnsub = jest.fn(); + mocks.getSubscriptionsByChannel.mockReturnValue([ + { unsubscribe: mockUnsub }, + ]); + + await service.subscribe(SUB_OPTS); + await service.unsubscribe(SUB_OPTS); + + // Grace timer is running — destroy should clear it + service.destroy(); + + jest.advanceTimersByTime(5000); + await completeAsyncOperations(); + + // Timer was cleared so the actual unsubscribe should NOT have fired + expect(mockUnsub).not.toHaveBeenCalled(); + + expect(mocks.removeChannelCallback).toHaveBeenCalledWith( + 'system-notifications.v1.market-data.v1', + ); + }); + }); + }); +}); diff --git a/packages/core-backend/src/ws/ohlcv/OHLCVService.ts b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts new file mode 100644 index 0000000000..aca5183306 --- /dev/null +++ b/packages/core-backend/src/ws/ohlcv/OHLCVService.ts @@ -0,0 +1,565 @@ +/** + * OHLCV Service for real-time candlestick data streaming via WebSocket. + * + * Wraps {@link BackendWebSocketService} through the messenger pattern to + * provide subscribe/unsubscribe semantics for OHLCV market-data channels. + * Includes reference counting, grace-period unsubscribe, idempotency checks, + * chain-status forwarding, and automatic resubscription on reconnect. + */ + +import type { + TraceCallback, + TraceContext, + TraceRequest, +} from '@metamask/controller-utils'; +import type { Messenger } from '@metamask/messenger'; +import { Mutex } from 'async-mutex'; + +import { projectLogger, createModuleLogger } from '../../logger'; +import type { + WebSocketConnectionInfo, + BackendWebSocketServiceConnectionStateChangedEvent, + ServerNotificationMessage, +} from '../BackendWebSocketService'; +import { WebSocketState } from '../BackendWebSocketService'; +import type { BackendWebSocketServiceMethodActions } from '../BackendWebSocketService-method-action-types'; +import type { OHLCVServiceMethodActions } from './OHLCVService-method-action-types'; +import type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; + +// ============================================================================= +// Constants +// ============================================================================= + +const SERVICE_NAME = 'OHLCVService'; + +const log = createModuleLogger(projectLogger, SERVICE_NAME); + +const MESSENGER_EXPOSED_METHODS = ['subscribe', 'unsubscribe'] as const; + +const SUBSCRIPTION_NAMESPACE = 'market-data.v1'; + +const SYSTEM_NOTIFICATIONS_CHANNEL = `system-notifications.v1.${SUBSCRIPTION_NAMESPACE}`; + +/** Delay before actually unsubscribing from a channel after refCount reaches 0. */ +const GRACE_PERIOD_MS = 3_000; + +// ============================================================================= +// Types — Channel Tracking +// ============================================================================= + +type ChannelEntry = { + refCount: number; + gracePeriodTimer?: ReturnType; +}; + +// ============================================================================= +// Types — System Notifications +// ============================================================================= + +/** + * System notification data for chain status updates on market-data channels. + */ +export type OHLCVSystemNotificationData = { + chainIds: string[]; + status: 'down' | 'up'; + timestamp?: number; +}; + +// ============================================================================= +// Types — Service Options +// ============================================================================= + +/** + * Configuration options for the OHLCV service. + */ +export type OHLCVServiceOptions = { + /** Optional callback to trace performance of OHLCV operations (default: no-op) */ + traceFn?: TraceCallback; +}; + +// ============================================================================= +// Action and Event Types +// ============================================================================= + +export type OHLCVServiceActions = OHLCVServiceMethodActions; + +export const OHLCV_SERVICE_ALLOWED_ACTIONS = [ + 'BackendWebSocketService:connect', + 'BackendWebSocketService:forceReconnection', + 'BackendWebSocketService:subscribe', + 'BackendWebSocketService:getConnectionInfo', + 'BackendWebSocketService:channelHasSubscription', + 'BackendWebSocketService:getSubscriptionsByChannel', + 'BackendWebSocketService:findSubscriptionsByChannelPrefix', + 'BackendWebSocketService:addChannelCallback', + 'BackendWebSocketService:removeChannelCallback', +] as const; + +export const OHLCV_SERVICE_ALLOWED_EVENTS = [ + 'BackendWebSocketService:connectionStateChanged', +] as const; + +export type AllowedActions = BackendWebSocketServiceMethodActions; + +// Events published by OHLCVService + +export type OHLCVServiceBarUpdatedEvent = { + type: `OHLCVService:barUpdated`; + payload: [{ channel: string; bar: OHLCVBar }]; +}; + +export type OHLCVServiceChainStatusChangedEvent = { + type: `OHLCVService:chainStatusChanged`; + payload: [{ chainIds: string[]; status: 'up' | 'down'; timestamp?: number }]; +}; + +export type OHLCVServiceSubscriptionErrorEvent = { + type: `OHLCVService:subscriptionError`; + payload: [{ channel: string; error: string; operation: string }]; +}; + +export type OHLCVServiceEvents = + | OHLCVServiceBarUpdatedEvent + | OHLCVServiceChainStatusChangedEvent + | OHLCVServiceSubscriptionErrorEvent; + +export type AllowedEvents = BackendWebSocketServiceConnectionStateChangedEvent; + +export type OHLCVServiceMessenger = Messenger< + typeof SERVICE_NAME, + OHLCVServiceActions | AllowedActions, + OHLCVServiceEvents | AllowedEvents +>; + +// ============================================================================= +// Main Service Class +// ============================================================================= + +/** + * Service for real-time OHLCV candlestick streaming via the backend WebSocket + * gateway. Communicates with {@link BackendWebSocketService} exclusively + * through the messenger — no direct import of the class. + * + * Features: + * - Reference counting: multiple UI consumers share one WebSocket subscription + * - Grace-period unsubscribe: avoids rapid unsub/resub during navigation + * - Idempotency: duplicate subscribe calls for the same channel are no-ops + * - Reconnect resilience: resubscribes all active channels on reconnect + * - Chain-status forwarding: listens to system-notifications for chain up/down + * + */ +export class OHLCVService { + readonly name = SERVICE_NAME; + + readonly #messenger: OHLCVServiceMessenger; + + readonly #trace: TraceCallback; + + readonly #channels = new Map(); + + readonly #mutex = new Mutex(); + + readonly #chainsUp = new Set(); + + // ============================================================================= + // Constructor + // ============================================================================= + + constructor( + options: OHLCVServiceOptions & { messenger: OHLCVServiceMessenger }, + ) { + this.#messenger = options.messenger; + + this.#trace = + options.traceFn ?? + ((( + _request: TraceRequest, + fn?: (context?: TraceContext) => Result, + ) => fn?.()) as TraceCallback); + + this.#messenger.registerMethodActionHandlers( + this, + MESSENGER_EXPOSED_METHODS, + ); + + this.#messenger.subscribe( + 'BackendWebSocketService:connectionStateChanged', + // eslint-disable-next-line @typescript-eslint/no-misused-promises + (connectionInfo: WebSocketConnectionInfo) => + this.#handleWebSocketStateChange(connectionInfo), + ); + } + + /** + * Register the system-notifications channel callback. + */ + init(): void { + log('OHLCV-WS: Initializing — registering system-notifications callback'); + this.#messenger.call('BackendWebSocketService:addChannelCallback', { + channelName: SYSTEM_NOTIFICATIONS_CHANNEL, + callback: (notification: ServerNotificationMessage) => + this.#handleSystemNotification(notification), + }); + } + + // ============================================================================= + // Public — Subscribe / Unsubscribe + // ============================================================================= + + /** + * Subscribe to an OHLCV channel. If this is the first subscriber for the + * given asset/interval/currency combination a WebSocket subscription is + * created. Additional calls for the same combination only bump the reference + * count. + * + * @param options - The subscription parameters. + * @returns A promise that resolves once the subscription is established. + */ + async subscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const releaseLock = await this.#mutex.acquire(); + try { + await this.#subscribeInner(channel); + } finally { + releaseLock(); + } + } + + async #subscribeInner(channel: string): Promise { + const entry = this.#channels.get(channel); + + if (entry?.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + entry.gracePeriodTimer = undefined; + log('OHLCV-WS: Cancelled grace-period unsubscribe', { + channel, + }); + + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + entry.refCount += 1; + log('OHLCV-WS: WS subscription still alive, bumped refCount', { + channel, + refCount: entry.refCount, + }); + return; + } + // WS subscription was lost (e.g. after disconnect/reconnect) — fall + // through to recreate it. refCount is bumped only after success below. + } else if (entry && entry.refCount > 0) { + entry.refCount += 1; + return; + } + + try { + await this.#messenger.call('BackendWebSocketService:connect'); + + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + log( + 'OHLCV-WS: Channel already has WS subscription (idempotency), skipping', + { + channel, + }, + ); + this.#channels.set(channel, { refCount: 1 }); + return; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + + this.#channels.set(channel, { refCount: 1 }); + log('OHLCV-WS: Subscribe succeeded — new WS subscription created', { + channel, + }); + } catch (error) { + log('OHLCV-WS: Subscription failed, forcing reconnection', { + channel, + error, + }); + this.#channels.delete(channel); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'subscribe', + }); + await this.#forceReconnection(); + } + } + + /** + * Unsubscribe from an OHLCV channel. Decrements the reference count and, + * when it reaches zero, starts a grace-period timer before actually + * unsubscribing from the WebSocket to absorb rapid navigation patterns. + * + * @param options - The subscription parameters to unsubscribe from. + * @returns A promise that resolves once the unsubscription is processed. + */ + async unsubscribe(options: OHLCVSubscriptionOptions): Promise { + const channel = this.#buildChannel(options); + const releaseLock = await this.#mutex.acquire(); + try { + await this.#unsubscribeInner(channel); + } finally { + releaseLock(); + } + } + + async #unsubscribeInner(channel: string): Promise { + const entry = this.#channels.get(channel); + + if (!entry || entry.refCount <= 0) { + return; + } + + entry.refCount -= 1; + + if (entry.refCount > 0) { + return; + } + + entry.gracePeriodTimer = setTimeout(() => { + entry.gracePeriodTimer = undefined; + this.#performUnsubscribe(channel).catch(() => { + // no-op + }); + }, GRACE_PERIOD_MS); + } + + // ============================================================================= + // Private — WebSocket Subscription Helpers + // ============================================================================= + + async #performUnsubscribe(channel: string): Promise { + const releaseLock = await this.#mutex.acquire(); + try { + const entry = this.#channels.get(channel); + if (entry && entry.refCount > 0) { + log( + 'OHLCV-WS: Skipping unsubscribe — new subscriber arrived while queued', + { channel, refCount: entry.refCount }, + ); + return; + } + + log('OHLCV-WS: Grace period expired — performing actual WS unsubscribe', { + channel, + }); + this.#channels.delete(channel); + + try { + const subscriptions = this.#messenger.call( + 'BackendWebSocketService:getSubscriptionsByChannel', + channel, + ); + + for (const sub of subscriptions) { + await sub.unsubscribe(); + } + log('OHLCV-WS: WS unsubscribe completed', { channel }); + } catch (error) { + log('OHLCV-WS: Unsubscription failed, forcing reconnection', { + channel, + error, + }); + this.#messenger.publish('OHLCVService:subscriptionError', { + channel, + error: String(error), + operation: 'unsubscribe', + }); + await this.#forceReconnection().catch(() => { + // no-op + }); + } + } finally { + releaseLock(); + } + } + + /** + * Resubscribe all channels that were active before a disconnect. + * Called when WebSocket transitions to CONNECTED. + */ + async #resubscribeActiveChannels(): Promise { + const releaseLock = await this.#mutex.acquire(); + try { + const channelCount = this.#channels.size; + log('OHLCV-WS: Resubscribing active channels after reconnect', { + count: channelCount, + }); + + for (const [channel, entry] of this.#channels.entries()) { + if (entry.refCount === 0) { + continue; + } + + try { + if ( + this.#messenger.call( + 'BackendWebSocketService:channelHasSubscription', + channel, + ) + ) { + log( + 'OHLCV-WS: Channel already subscribed on server, skipping resubscribe', + { + channel, + }, + ); + continue; + } + + await this.#messenger.call('BackendWebSocketService:subscribe', { + channels: [channel], + channelType: SUBSCRIPTION_NAMESPACE, + callback: (notification: ServerNotificationMessage) => { + this.#handleBarUpdate(channel, notification); + }, + }); + log('OHLCV-WS: Resubscription succeeded', { channel }); + } catch (error) { + log('OHLCV-WS: Resubscription failed for channel', { + channel, + error, + }); + } + } + } finally { + releaseLock(); + } + } + + // ============================================================================= + // Private — Message Handlers + // ============================================================================= + + #handleBarUpdate( + channel: string, + notification: ServerNotificationMessage, + ): void { + const bar = notification.data as OHLCVBar; + + // eslint-disable-next-line @typescript-eslint/no-floating-promises + this.#trace( + { + name: `${SERVICE_NAME} Bar Update`, + data: { channel, timestamp: bar.timestamp }, + tags: { service: SERVICE_NAME }, + }, + () => { + this.#messenger.publish('OHLCVService:barUpdated', { channel, bar }); + }, + ); + } + + #handleSystemNotification(notification: ServerNotificationMessage): void { + const data = notification.data as OHLCVSystemNotificationData; + const { timestamp } = notification; + + if (!data.chainIds || !Array.isArray(data.chainIds) || !data.status) { + throw new Error( + 'Invalid system notification data: missing chainIds or status', + ); + } + + if (data.status === 'up') { + for (const chainId of data.chainIds) { + this.#chainsUp.add(chainId); + } + } else { + for (const chainId of data.chainIds) { + this.#chainsUp.delete(chainId); + } + } + + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: data.chainIds, + status: data.status, + timestamp, + }); + + log(`OHLCV-WS: Chain status change: ${data.status}`, { + chains: data.chainIds, + status: data.status, + }); + } + + async #handleWebSocketStateChange( + connectionInfo: WebSocketConnectionInfo, + ): Promise { + const { state } = connectionInfo; + + if (state === WebSocketState.CONNECTED) { + await this.#resubscribeActiveChannels(); + } else if (state === WebSocketState.DISCONNECTED) { + const chainsToMarkDown = Array.from(this.#chainsUp); + + if (chainsToMarkDown.length > 0) { + this.#messenger.publish('OHLCVService:chainStatusChanged', { + chainIds: chainsToMarkDown, + status: 'down', + timestamp: Date.now(), + }); + + log( + 'OHLCV-WS: WebSocket disconnection — marked tracked chains as down', + { + count: chainsToMarkDown.length, + chains: chainsToMarkDown, + }, + ); + + this.#chainsUp.clear(); + } + } + } + + // ============================================================================= + // Private — Utility + // ============================================================================= + + #buildChannel(options: OHLCVSubscriptionOptions): string { + return `${SUBSCRIPTION_NAMESPACE}.${options.assetId}.${options.interval}.${options.currency}`; + } + + async #forceReconnection(): Promise { + log('OHLCV-WS: Forcing WebSocket reconnection'); + await this.#messenger.call('BackendWebSocketService:forceReconnection'); + } + + // ============================================================================= + // Public — Cleanup + // ============================================================================= + + /** + * Destroy the service and clean up all resources. + */ + destroy(): void { + for (const entry of this.#channels.values()) { + if (entry.gracePeriodTimer) { + clearTimeout(entry.gracePeriodTimer); + } + } + this.#channels.clear(); + this.#chainsUp.clear(); + + this.#messenger.call( + 'BackendWebSocketService:removeChannelCallback', + SYSTEM_NOTIFICATIONS_CHANNEL, + ); + } +} diff --git a/packages/core-backend/src/ws/ohlcv/index.ts b/packages/core-backend/src/ws/ohlcv/index.ts new file mode 100644 index 0000000000..e40d24d8c3 --- /dev/null +++ b/packages/core-backend/src/ws/ohlcv/index.ts @@ -0,0 +1,18 @@ +export { OHLCVService } from './OHLCVService'; +export { + OHLCV_SERVICE_ALLOWED_ACTIONS, + OHLCV_SERVICE_ALLOWED_EVENTS, +} from './OHLCVService'; +export type { + OHLCVSystemNotificationData, + OHLCVServiceOptions, + OHLCVServiceActions, + AllowedActions as OHLCVServiceAllowedActions, + OHLCVServiceBarUpdatedEvent, + OHLCVServiceChainStatusChangedEvent, + OHLCVServiceSubscriptionErrorEvent, + OHLCVServiceEvents, + AllowedEvents as OHLCVServiceAllowedEvents, + OHLCVServiceMessenger, +} from './OHLCVService'; +export type { OHLCVBar, OHLCVSubscriptionOptions } from './types'; diff --git a/packages/core-backend/src/ws/ohlcv/types.ts b/packages/core-backend/src/ws/ohlcv/types.ts new file mode 100644 index 0000000000..b8e64caf33 --- /dev/null +++ b/packages/core-backend/src/ws/ohlcv/types.ts @@ -0,0 +1,33 @@ +/** + * OHLCV WebSocket streaming types for real-time candlestick data. + */ + +/** + * A single OHLCV candlestick bar received from the market-data WebSocket stream. + */ +export type OHLCVBar = { + /** Unix timestamp (seconds) of the candle open */ + timestamp: number; + /** Opening price */ + open: number; + /** Highest price during the candle period */ + high: number; + /** Lowest price during the candle period */ + low: number; + /** Closing price (latest) */ + close: number; + /** Trading volume during the candle period */ + volume: number; +}; + +/** + * Options for subscribing to an OHLCV channel. + */ +export type OHLCVSubscriptionOptions = { + /** CAIP-19 asset identifier, e.g. "eip155:8453/erc20:0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913" */ + assetId: string; + /** Candle interval, e.g. "1m", "5m", "15m", "1h", "4h", "1d" */ + interval: string; + /** Fiat currency code, e.g. "usd", "eur" */ + currency: string; +}; diff --git a/yarn.lock b/yarn.lock index 5cbd27294f..38f2d2120e 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3382,6 +3382,7 @@ __metadata: "@tanstack/query-core": "npm:^5.62.16" "@ts-bridge/cli": "npm:^0.6.4" "@types/jest": "npm:^29.5.14" + async-mutex: "npm:^0.5.0" deepmerge: "npm:^4.2.2" jest: "npm:^29.7.0" jest-environment-jsdom: "npm:^29.7.0"