Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/ice-lvp-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@chainlink/ice-adapter': minor
---

Add Last Value Persistence (LVP) support for off-market hours. The adapter now listens for NetDania's internal heartbeat events (fired every ~180 seconds when the connection to the data provider is confirmed alive) and uses these to extend cache TTLs for active subscriptions. This ensures cached prices remain available during off-market hours while also confirming the data provider connection is healthy. The heartbeat events stop automatically if the connection is lost, allowing stale prices to expire naturally.

The default `CACHE_MAX_AGE` has been increased to 300 seconds (5 minutes) to exceed the NetDania heartbeat interval, ensuring LVP functions correctly out of the box.
101 changes: 55 additions & 46 deletions packages/sources/ice/src/config/index.ts
Original file line number Diff line number Diff line change
@@ -1,48 +1,57 @@
import { AdapterConfig } from '@chainlink/external-adapter-framework/config'

export const config = new AdapterConfig({
USER_GROUP: {
description: 'User group for ICE',
type: 'string',
default: 'chain.link',
},
NETDANIA_PASSWORD: {
description: 'Password for ICE',
type: 'string',
required: true,
sensitive: true,
},
API_ENDPOINT: {
description: 'streaming server endpoint for ICE',
type: 'string',
required: true,
},
API_ENDPOINT_FAILOVER_1: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
API_ENDPOINT_FAILOVER_2: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
API_ENDPOINT_FAILOVER_3: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
POLLING_INTERVAL: {
description: 'Polling interval for ICE',
type: 'number',
default: 2000,
},
CONNECTING_TIMEOUT_MS: {
description: 'Connecting timeout in milliseconds for ICE',
type: 'number',
default: 4000,
},
})
export const config = new AdapterConfig(
{
USER_GROUP: {
description: 'User group for ICE',
type: 'string',
default: 'chain.link',
},
NETDANIA_PASSWORD: {
description: 'Password for ICE',
type: 'string',
required: true,
sensitive: true,
},
API_ENDPOINT: {
description: 'streaming server endpoint for ICE',
type: 'string',
required: true,
},
API_ENDPOINT_FAILOVER_1: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
API_ENDPOINT_FAILOVER_2: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
API_ENDPOINT_FAILOVER_3: {
description: 'failover endpoint for ICE',
type: 'string',
default: '',
required: false,
},
POLLING_INTERVAL: {
description: 'Polling interval for ICE',
type: 'number',
default: 2000,
},
CONNECTING_TIMEOUT_MS: {
description: 'Connecting timeout in milliseconds for ICE',
type: 'number',
default: 4000,
},
},
{
// LVP (Last Value Persistence): Increase CACHE_MAX_AGE to exceed NetDania heartbeat interval (~180s)
// This ensures cached prices persist during off-market hours while the connection remains healthy
envDefaultOverrides: {
CACHE_MAX_AGE: 300_000, // 5 minutes
},
},
)
12 changes: 12 additions & 0 deletions packages/sources/ice/src/transport/netdania/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ enum Events {
ONRECONNECTED = 'OnReconnect',
ONPRICEUPDATE = 'OnPriceUpdate',
ONERROR = 'OnError',
// Custom event added for Chainlink EA - fired when NetDania internal heartbeat succeeds
ONHEARTBEAT = 'OnHeartbeat',
} // and others, from JsApi.Events

export class Utils {
Expand Down Expand Up @@ -147,6 +149,10 @@ export type InstrumentPartialUpdate = {
* It tries to keep the connection alive.
*
* Any change to subscriptions will not take effect until refresh().
*
* LVP (Last Value Persistence): Emits 'heartbeat' events when NetDania's internal heartbeat
* succeeds, confirming the connection to the data provider is alive. This allows the transport
* to extend cache TTLs during off-market hours when no price updates are received.
*/
export class StreamingClient extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
Expand Down Expand Up @@ -189,6 +195,12 @@ export class StreamingClient extends EventEmitter {

this.connection.addListener(Events.ONERROR, this.onErrorHandler)
this.connection.addListener(Events.ONPRICEUPDATE, this.onPriceUpdateHandler)

// Listen for NetDania's internal heartbeat success - this confirms the DP connection is alive
this.connection.addListener(Events.ONHEARTBEAT, (timestamp: number) => {
logger.debug(`NetDania heartbeat received at ${timestamp}, emitting heartbeat event for LVP`)
this.emit('heartbeat')
})
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,8 @@ void 0 === window.location?.host &&
ONIPLOCATIONRESPONSE: 'OnIPLocationResponse',
ONWORKSPACEDATA: 'OnWorkspaceData',
ONHISTORICALCHARTDATA: 'OnHistoricalChartData',
// Custom event added for Chainlink EA - fired when internal heartbeat succeeds
ONHEARTBEAT: 'OnHeartbeat',
})
})(void 0 !== window ? window : global),
(function (e) {
Expand Down Expand Up @@ -1881,6 +1883,8 @@ void 0 === window.location?.host &&
(e.onprogress = function () {}),
(e.onload = function () {
this.responseText,
// Fire ONHEARTBEAT event to signal successful heartbeat (Chainlink EA extension)
i.fireEvent(NetDania.JsApi.Events.ONHEARTBEAT, [Date.now()]),
i.ensureHeartBeatTimerStarted(),
(e.onerror = null),
(e.ontimeout = null),
Expand Down
29 changes: 29 additions & 0 deletions packages/sources/ice/src/transport/price.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type FullPriceUpdate = Required<PartialPriceUpdate> & {
export class NetDaniaStreamingTransport extends StreamingTransport<BaseEndpointTypes> {
private client!: StreamingClient
private localCache: LocalPriceCache = new LocalPriceCache()
private currentSubscriptions: { base: string; quote: string }[] = []

async initialize(
dependencies: TransportDependencies<BaseEndpointTypes>,
Expand All @@ -28,6 +29,8 @@ export class NetDaniaStreamingTransport extends StreamingTransport<BaseEndpointT
) {
await super.initialize(dependencies, adapterSettings, endpointName, transportName)
this.client = new StreamingClient(adapterSettings)

// Listen for price updates
this.client.on('price', async (update: InstrumentPartialUpdate) => {
// get base and quote from the instrument name in the requestIdToInstrument map
const base = update.instrument.substring(0, 3)
Expand Down Expand Up @@ -61,6 +64,29 @@ export class NetDaniaStreamingTransport extends StreamingTransport<BaseEndpointT
},
])
})

// LVP: Listen for heartbeat events to extend cache TTLs during off-market hours
this.client.on('heartbeat', async () => {
await this.updateTTL(adapterSettings.CACHE_MAX_AGE)
})
}

/**
* LVP (Last Value Persistence): Extends TTL for all currently subscribed pairs.
* This ensures cached prices remain available during off-market hours when
* no price updates are being received from the data provider.
*/
private async updateTTL(ttl: number): Promise<void> {
if (this.currentSubscriptions.length === 0) {
logger.debug('LVP updateTTL: No active subscriptions to refresh')
return
}

logger.debug(
`LVP updateTTL: Extending TTL for ${this.currentSubscriptions.length} subscriptions by ${ttl}ms`,
)

this.responseCache.writeTTL(this.name, this.currentSubscriptions, ttl)
}

override getSubscriptionTtlFromConfig(adapterSettings: BaseEndpointTypes['Settings']): number {
Expand All @@ -78,6 +104,9 @@ export class NetDaniaStreamingTransport extends StreamingTransport<BaseEndpointT
// reconcile: should be redundant
this.ensureDesired(subscriptions.desired)

// LVP: Track current subscriptions for TTL refresh during off-market hours
this.currentSubscriptions = subscriptions.desired

await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS_HTTP)
}

Expand Down
107 changes: 107 additions & 0 deletions packages/sources/ice/test/unit/transport-netdania-utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,72 @@
import { LoggerFactory, LoggerFactoryProvider } from '@chainlink/external-adapter-framework/util'
import fs from 'fs'
import { MonitorPriceResponse, Utils } from '../../src/transport/netdania'

// Mock logger for StreamingClient tests
const log = jest.fn()
const logger = {
fatal: log,
error: log,
warn: log,
info: log,
debug: log,
trace: log,
msgPrefix: 'mock-logger',
}

const loggerFactory: LoggerFactory = {
child: () => logger,
}

LoggerFactoryProvider.set(loggerFactory)

// Store listeners registered with the mock connection
const mockListeners: Record<string, ((...args: unknown[]) => void)[]> = {}

// Mock the JsApi for StreamingClient tests
jest.mock('../../src/transport/netdania/jsApi/jsapi-nodejs', () => ({
window: {
NetDania: {
JsApi: {
Fields: {
QUOTE_BID: 10,
QUOTE_ASK: 11,
QUOTE_MID_PRICE: 9,
QUOTE_TIME_STAMP: 152,
QUOTE_TIME_ZONE: 3015,
},
JSONConnection: jest.fn().mockImplementation(() => ({
addListener: jest.fn((event: string, callback: (...args: unknown[]) => void) => {
if (!mockListeners[event]) {
mockListeners[event] = []
}
mockListeners[event].push(callback)
}),
Flush: jest.fn(),
GetRequestList: jest.fn().mockReturnValue([]),
addRequests: jest.fn(),
RemoveRequests: jest.fn(),
disconnect: jest.fn(),
reconnect: jest.fn(),
_tryReconnect: true,
})),
Request: {
getReqObjPrice: jest.fn().mockImplementation((instrument, provider, flag) => ({
t: 1,
i: Math.floor(Math.random() * 10000),
m: flag,
s: instrument,
p: provider,
})),
},
},
},
},
}))

import { BaseEndpointTypes } from '../../src/endpoint/price'
import { StreamingClient } from '../../src/transport/netdania'

describe('PartialPriceUpdate', () => {
it('must parse all the correct updates correctly', async () => {
const rawUpdates = (await fs.promises.readFile(__dirname + '/raw-price-updates.jsonl', 'utf-8'))
Expand Down Expand Up @@ -83,3 +149,44 @@ describe('PartialPriceUpdate', () => {
expect(Utils.sanitize(urlWithoutH)).toBe(urlWithoutH)
})
})

describe('LVP (Last Value Persistence)', () => {
const mockSettings: BaseEndpointTypes['Settings'] = {
API_ENDPOINT: 'https://test.example.com',
API_ENDPOINT_FAILOVER_1: '',
API_ENDPOINT_FAILOVER_2: '',
API_ENDPOINT_FAILOVER_3: '',
NETDANIA_PASSWORD: 'test-password',
USER_GROUP: 'test.group',
POLLING_INTERVAL: 2000,
CONNECTING_TIMEOUT_MS: 4000,
CACHE_MAX_AGE: 300000, // 5 minutes - exceeds NetDania heartbeat interval for LVP
} as BaseEndpointTypes['Settings']

beforeEach(() => {
jest.clearAllMocks()
Object.keys(mockListeners).forEach((key) => delete mockListeners[key])
})

describe('StreamingClient heartbeat', () => {
it('should emit heartbeat event when NetDania ONHEARTBEAT fires', () => {
const client = new StreamingClient(mockSettings)
const heartbeatHandler = jest.fn()
client.on('heartbeat', heartbeatHandler)

const onHeartbeatCallbacks = mockListeners['OnHeartbeat'] || []
expect(onHeartbeatCallbacks.length).toBeGreaterThan(0)

onHeartbeatCallbacks.forEach((cb) => cb(Date.now()))

expect(heartbeatHandler).toHaveBeenCalledTimes(1)
})

it('should register listener for NetDania ONHEARTBEAT event', () => {
new StreamingClient(mockSettings)

expect(mockListeners['OnHeartbeat']).toBeDefined()
expect(mockListeners['OnHeartbeat'].length).toBe(1)
})
})
})
Loading