diff --git a/.changeset/ice-lvp-support.md b/.changeset/ice-lvp-support.md new file mode 100644 index 0000000000..16fca6f1e2 --- /dev/null +++ b/.changeset/ice-lvp-support.md @@ -0,0 +1,7 @@ +--- +'@chainlink/ice-adapter': minor +--- + +Add Last Value Persistence (LVP) support for off-market hours. The adapter now listens for NetDania's internal heartbeat events (fired every ~180 seconds when the connection to the data provider is confirmed alive) and uses these to extend cache TTLs for active subscriptions. This ensures cached prices remain available during off-market hours while also confirming the data provider connection is healthy. The heartbeat events stop automatically if the connection is lost, allowing stale prices to expire naturally. + +The default `CACHE_MAX_AGE` has been increased to 300 seconds (5 minutes) to exceed the NetDania heartbeat interval, ensuring LVP functions correctly out of the box. diff --git a/packages/sources/ice/src/config/index.ts b/packages/sources/ice/src/config/index.ts index 87125d2ba5..9bdb7fa4ee 100644 --- a/packages/sources/ice/src/config/index.ts +++ b/packages/sources/ice/src/config/index.ts @@ -1,48 +1,57 @@ import { AdapterConfig } from '@chainlink/external-adapter-framework/config' -export const config = new AdapterConfig({ - USER_GROUP: { - description: 'User group for ICE', - type: 'string', - default: 'chain.link', - }, - NETDANIA_PASSWORD: { - description: 'Password for ICE', - type: 'string', - required: true, - sensitive: true, - }, - API_ENDPOINT: { - description: 'streaming server endpoint for ICE', - type: 'string', - required: true, - }, - API_ENDPOINT_FAILOVER_1: { - description: 'failover endpoint for ICE', - type: 'string', - default: '', - required: false, - }, - API_ENDPOINT_FAILOVER_2: { - description: 'failover endpoint for ICE', - type: 'string', - default: '', - required: false, - }, - API_ENDPOINT_FAILOVER_3: { - description: 'failover endpoint for ICE', - type: 'string', - default: '', - required: false, - }, - POLLING_INTERVAL: { - description: 'Polling interval for ICE', - type: 'number', - default: 2000, - }, - CONNECTING_TIMEOUT_MS: { - description: 'Connecting timeout in milliseconds for ICE', - type: 'number', - default: 4000, - }, -}) +export const config = new AdapterConfig( + { + USER_GROUP: { + description: 'User group for ICE', + type: 'string', + default: 'chain.link', + }, + NETDANIA_PASSWORD: { + description: 'Password for ICE', + type: 'string', + required: true, + sensitive: true, + }, + API_ENDPOINT: { + description: 'streaming server endpoint for ICE', + type: 'string', + required: true, + }, + API_ENDPOINT_FAILOVER_1: { + description: 'failover endpoint for ICE', + type: 'string', + default: '', + required: false, + }, + API_ENDPOINT_FAILOVER_2: { + description: 'failover endpoint for ICE', + type: 'string', + default: '', + required: false, + }, + API_ENDPOINT_FAILOVER_3: { + description: 'failover endpoint for ICE', + type: 'string', + default: '', + required: false, + }, + POLLING_INTERVAL: { + description: 'Polling interval for ICE', + type: 'number', + default: 2000, + }, + CONNECTING_TIMEOUT_MS: { + description: 'Connecting timeout in milliseconds for ICE', + type: 'number', + default: 4000, + }, + }, + { + // LVP (Last Value Persistence): Increase CACHE_MAX_AGE to exceed NetDania heartbeat interval (~180s) + // This ensures cached prices persist during off-market hours while the connection remains healthy + envDefaultOverrides: { + CACHE_MAX_AGE: 300_000, // 5 minutes + }, + }, +) diff --git a/packages/sources/ice/src/transport/netdania/index.ts b/packages/sources/ice/src/transport/netdania/index.ts index 8332906573..1e11a74388 100644 --- a/packages/sources/ice/src/transport/netdania/index.ts +++ b/packages/sources/ice/src/transport/netdania/index.ts @@ -58,6 +58,8 @@ enum Events { ONRECONNECTED = 'OnReconnect', ONPRICEUPDATE = 'OnPriceUpdate', ONERROR = 'OnError', + // Custom event added for Chainlink EA - fired when NetDania internal heartbeat succeeds + ONHEARTBEAT = 'OnHeartbeat', } // and others, from JsApi.Events export class Utils { @@ -147,6 +149,10 @@ export type InstrumentPartialUpdate = { * It tries to keep the connection alive. * * Any change to subscriptions will not take effect until refresh(). + * + * LVP (Last Value Persistence): Emits 'heartbeat' events when NetDania's internal heartbeat + * succeeds, confirming the connection to the data provider is alive. This allows the transport + * to extend cache TTLs during off-market hours when no price updates are received. */ export class StreamingClient extends EventEmitter { // eslint-disable-next-line @typescript-eslint/ban-ts-comment @@ -189,6 +195,12 @@ export class StreamingClient extends EventEmitter { this.connection.addListener(Events.ONERROR, this.onErrorHandler) this.connection.addListener(Events.ONPRICEUPDATE, this.onPriceUpdateHandler) + + // Listen for NetDania's internal heartbeat success - this confirms the DP connection is alive + this.connection.addListener(Events.ONHEARTBEAT, (timestamp: number) => { + logger.debug(`NetDania heartbeat received at ${timestamp}, emitting heartbeat event for LVP`) + this.emit('heartbeat') + }) } /** diff --git a/packages/sources/ice/src/transport/netdania/jsApi/jsapi-nodejs.ts b/packages/sources/ice/src/transport/netdania/jsApi/jsapi-nodejs.ts index 974c9d520c..cc0c9b39ed 100644 --- a/packages/sources/ice/src/transport/netdania/jsApi/jsapi-nodejs.ts +++ b/packages/sources/ice/src/transport/netdania/jsApi/jsapi-nodejs.ts @@ -976,6 +976,8 @@ void 0 === window.location?.host && ONIPLOCATIONRESPONSE: 'OnIPLocationResponse', ONWORKSPACEDATA: 'OnWorkspaceData', ONHISTORICALCHARTDATA: 'OnHistoricalChartData', + // Custom event added for Chainlink EA - fired when internal heartbeat succeeds + ONHEARTBEAT: 'OnHeartbeat', }) })(void 0 !== window ? window : global), (function (e) { @@ -1881,6 +1883,8 @@ void 0 === window.location?.host && (e.onprogress = function () {}), (e.onload = function () { this.responseText, + // Fire ONHEARTBEAT event to signal successful heartbeat (Chainlink EA extension) + i.fireEvent(NetDania.JsApi.Events.ONHEARTBEAT, [Date.now()]), i.ensureHeartBeatTimerStarted(), (e.onerror = null), (e.ontimeout = null), diff --git a/packages/sources/ice/src/transport/price.ts b/packages/sources/ice/src/transport/price.ts index afb5279960..b38a871ab6 100644 --- a/packages/sources/ice/src/transport/price.ts +++ b/packages/sources/ice/src/transport/price.ts @@ -19,6 +19,7 @@ export type FullPriceUpdate = Required & { export class NetDaniaStreamingTransport extends StreamingTransport { private client!: StreamingClient private localCache: LocalPriceCache = new LocalPriceCache() + private currentSubscriptions: { base: string; quote: string }[] = [] async initialize( dependencies: TransportDependencies, @@ -28,6 +29,8 @@ export class NetDaniaStreamingTransport extends StreamingTransport { // get base and quote from the instrument name in the requestIdToInstrument map const base = update.instrument.substring(0, 3) @@ -61,6 +64,29 @@ export class NetDaniaStreamingTransport extends StreamingTransport { + await this.updateTTL(adapterSettings.CACHE_MAX_AGE) + }) + } + + /** + * LVP (Last Value Persistence): Extends TTL for all currently subscribed pairs. + * This ensures cached prices remain available during off-market hours when + * no price updates are being received from the data provider. + */ + private async updateTTL(ttl: number): Promise { + if (this.currentSubscriptions.length === 0) { + logger.debug('LVP updateTTL: No active subscriptions to refresh') + return + } + + logger.debug( + `LVP updateTTL: Extending TTL for ${this.currentSubscriptions.length} subscriptions by ${ttl}ms`, + ) + + this.responseCache.writeTTL(this.name, this.currentSubscriptions, ttl) } override getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number { @@ -78,6 +104,9 @@ export class NetDaniaStreamingTransport extends StreamingTransport logger, +} + +LoggerFactoryProvider.set(loggerFactory) + +// Store listeners registered with the mock connection +const mockListeners: Record void)[]> = {} + +// Mock the JsApi for StreamingClient tests +jest.mock('../../src/transport/netdania/jsApi/jsapi-nodejs', () => ({ + window: { + NetDania: { + JsApi: { + Fields: { + QUOTE_BID: 10, + QUOTE_ASK: 11, + QUOTE_MID_PRICE: 9, + QUOTE_TIME_STAMP: 152, + QUOTE_TIME_ZONE: 3015, + }, + JSONConnection: jest.fn().mockImplementation(() => ({ + addListener: jest.fn((event: string, callback: (...args: unknown[]) => void) => { + if (!mockListeners[event]) { + mockListeners[event] = [] + } + mockListeners[event].push(callback) + }), + Flush: jest.fn(), + GetRequestList: jest.fn().mockReturnValue([]), + addRequests: jest.fn(), + RemoveRequests: jest.fn(), + disconnect: jest.fn(), + reconnect: jest.fn(), + _tryReconnect: true, + })), + Request: { + getReqObjPrice: jest.fn().mockImplementation((instrument, provider, flag) => ({ + t: 1, + i: Math.floor(Math.random() * 10000), + m: flag, + s: instrument, + p: provider, + })), + }, + }, + }, + }, +})) + +import { BaseEndpointTypes } from '../../src/endpoint/price' +import { StreamingClient } from '../../src/transport/netdania' + describe('PartialPriceUpdate', () => { it('must parse all the correct updates correctly', async () => { const rawUpdates = (await fs.promises.readFile(__dirname + '/raw-price-updates.jsonl', 'utf-8')) @@ -83,3 +149,44 @@ describe('PartialPriceUpdate', () => { expect(Utils.sanitize(urlWithoutH)).toBe(urlWithoutH) }) }) + +describe('LVP (Last Value Persistence)', () => { + const mockSettings: BaseEndpointTypes['Settings'] = { + API_ENDPOINT: 'https://test.example.com', + API_ENDPOINT_FAILOVER_1: '', + API_ENDPOINT_FAILOVER_2: '', + API_ENDPOINT_FAILOVER_3: '', + NETDANIA_PASSWORD: 'test-password', + USER_GROUP: 'test.group', + POLLING_INTERVAL: 2000, + CONNECTING_TIMEOUT_MS: 4000, + CACHE_MAX_AGE: 300000, // 5 minutes - exceeds NetDania heartbeat interval for LVP + } as BaseEndpointTypes['Settings'] + + beforeEach(() => { + jest.clearAllMocks() + Object.keys(mockListeners).forEach((key) => delete mockListeners[key]) + }) + + describe('StreamingClient heartbeat', () => { + it('should emit heartbeat event when NetDania ONHEARTBEAT fires', () => { + const client = new StreamingClient(mockSettings) + const heartbeatHandler = jest.fn() + client.on('heartbeat', heartbeatHandler) + + const onHeartbeatCallbacks = mockListeners['OnHeartbeat'] || [] + expect(onHeartbeatCallbacks.length).toBeGreaterThan(0) + + onHeartbeatCallbacks.forEach((cb) => cb(Date.now())) + + expect(heartbeatHandler).toHaveBeenCalledTimes(1) + }) + + it('should register listener for NetDania ONHEARTBEAT event', () => { + new StreamingClient(mockSettings) + + expect(mockListeners['OnHeartbeat']).toBeDefined() + expect(mockListeners['OnHeartbeat'].length).toBe(1) + }) + }) +})