From 69b2f05de2127662f5bc4894dda0409ec119c7f8 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 9 Jan 2024 15:26:25 -0800 Subject: [PATCH 01/20] Support subscription batching from API, Tables, and Plots --- src/api/telemetry/TelemetryAPI.js | 52 +++++++++++++++++--- src/api/telemetry/TelemetryCollection.js | 5 +- src/plugins/plot/configuration/PlotSeries.js | 47 +++++++++++------- 3 files changed, 78 insertions(+), 26 deletions(-) diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 24677b6b3d0..1f898277330 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -54,6 +54,23 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js'; * @memberof module:openmct.TelemetryAPI~ */ +/** + * Describes and bounds requests for telemetry data. + * + * @typedef TelemetrySubscriptionOptions + * @property {String} [strategy] symbolic identifier directing providers on how + * to handle telemetry subscriptions. The default behavior is 'latest' which will + * always return a single telemetry value with each callback, and in the event + * of throttling will always prioritize the latest data, meaning intermediate + * data will be skipped. Alternatively, the `batch` strategy can be used, which + * will return all telemetry values since the last callback. This strategy is + * useful for cases where intermediate data is important, such as when + * rendering a telemetry plot or table. If `batch` is specified, the subscription + * callback will be invoked with an Array. + * + * @memberof module:openmct.TelemetryAPI~ + */ + /** * Utilities for telemetry * @interface TelemetryAPI @@ -378,7 +395,7 @@ export default class TelemetryAPI { * @memberof module:openmct.TelemetryAPI~TelemetryProvider# * @param {module:openmct.DomainObject} domainObject the object * which has associated telemetry - * @param {TelemetryRequestOptions} options configuration items for subscription + * @param {TelemetrySubscriptionOptions} options configuration items for subscription * @param {Function} callback the callback to invoke with new data, as * it becomes available * @returns {Function} a function which may be called to terminate @@ -389,7 +406,10 @@ export default class TelemetryAPI { return () => {}; } - const provider = this.findSubscriptionProvider(domainObject); + // Default behavior is to use the latest strategy, as opposed to the new "batch" strategy + options.strategy = options.strategy || 'latest'; + + const provider = this.findSubscriptionProvider(domainObject, options); if (!this.subscribeCache) { this.subscribeCache = {}; @@ -405,11 +425,9 @@ export default class TelemetryAPI { if (provider) { subscriber.unsubscribe = provider.subscribe( domainObject, - function (value) { - subscriber.callbacks.forEach(function (cb) { - cb(value); - }); - }, + options.strategy === 'batch' + ? subscriptionCallbackForArray + : subscriptionCallbackForSingleValue, options ); } else { @@ -419,6 +437,26 @@ export default class TelemetryAPI { subscriber.callbacks.push(callback); } + function subscriptionCallbackForArray(value) { + if (!Array.isArray(value)) { + value = [value]; + } + + subscriber.callbacks.forEach(function (cb) { + cb(value); + }); + } + + function subscriptionCallbackForSingleValue(value) { + if (Array.isArray(value)) { + value = value[value.length - 1]; + } + + subscriber.callbacks.forEach(function (cb) { + cb(value); + }); + } + return function unsubscribe() { subscriber.callbacks = subscriber.callbacks.filter(function (cb) { return cb !== callback; diff --git a/src/api/telemetry/TelemetryCollection.js b/src/api/telemetry/TelemetryCollection.js index b53e98e0873..12ccf7edae7 100644 --- a/src/api/telemetry/TelemetryCollection.js +++ b/src/api/telemetry/TelemetryCollection.js @@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter { if (this.unsubscribe) { this.unsubscribe(); } + const options = { ...this.options }; + //We always want to receive all available values in telemetry tables. + options.strategy = 'batch'; this.unsubscribe = this.openmct.telemetry.subscribe( this.domainObject, (datum) => this._processNewTelemetry(datum), - this.options + options ); } diff --git a/src/plugins/plot/configuration/PlotSeries.js b/src/plugins/plot/configuration/PlotSeries.js index 929d2d34763..abacea3911e 100644 --- a/src/plugins/plot/configuration/PlotSeries.js +++ b/src/plugins/plot/configuration/PlotSeries.js @@ -211,9 +211,16 @@ export default class PlotSeries extends Model { ); if (!this.unsubscribe) { - this.unsubscribe = this.openmct.telemetry.subscribe(this.domainObject, this.add.bind(this), { - filters: this.filters - }); + this.unsubscribe = this.openmct.telemetry.subscribe( + this.domainObject, + (data) => { + this.addAll(data, true); + }, + { + filters: this.filters, + strategy: 'batch' + } + ); } try { @@ -302,9 +309,7 @@ export default class PlotSeries extends Model { this.resetStats(); this.emit('reset'); if (newData) { - newData.forEach(function (point) { - this.add(point, true); - }, this); + this.addAll(newData, true); } } /** @@ -416,14 +421,14 @@ export default class PlotSeries extends Model { * when adding an array of points that are already properly sorted. * * @private - * @param {Object} point a telemetry datum. - * @param {Boolean} [appendOnly] default false, if true will append + * @param {Object} newData a telemetry datum. + * @param {Boolean} [sorted] default false, if true will append * a point to the end without dupe checking. */ - add(point, appendOnly) { + add(newData, sorted = false) { let data = this.getSeriesData(); let insertIndex = data.length; - const currentYVal = this.getYVal(point); + const currentYVal = this.getYVal(newData); const lastYVal = this.getYVal(data[insertIndex - 1]); if (this.isValueInvalid(currentYVal) && this.isValueInvalid(lastYVal)) { @@ -432,22 +437,28 @@ export default class PlotSeries extends Model { return; } - if (!appendOnly) { - insertIndex = this.sortedIndex(point); - if (this.getXVal(data[insertIndex]) === this.getXVal(point)) { + if (!sorted) { + insertIndex = this.sortedIndex(newData); + if (this.getXVal(data[insertIndex]) === this.getXVal(newData)) { return; } - if (this.getXVal(data[insertIndex - 1]) === this.getXVal(point)) { + if (this.getXVal(data[insertIndex - 1]) === this.getXVal(newData)) { return; } } - this.updateStats(point); - point.mctLimitState = this.evaluate(point); - data.splice(insertIndex, 0, point); + this.updateStats(newData); + newData.mctLimitState = this.evaluate(newData); + data.splice(insertIndex, 0, newData); this.updateSeriesData(data); - this.emit('add', point, insertIndex, this); + this.emit('add', newData, insertIndex, this); + } + + addAll(points, sorted = false) { + for (let i = 0; i < points.length; i++) { + this.add(points[i], sorted); + } } /** From a87ffee2649f4d72419417e7266b0996345d9fa1 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Thu, 11 Jan 2024 09:52:26 -0800 Subject: [PATCH 02/20] Added batching worker --- .../telemetry/BatchingWebSocketProvider.js | 60 ++++ src/api/telemetry/TelemetryAPI.js | 2 + src/api/telemetry/WebSocketWorker.js | 295 ++++++++++++++++++ 3 files changed, 357 insertions(+) create mode 100644 src/api/telemetry/BatchingWebSocketProvider.js create mode 100644 src/api/telemetry/WebSocketWorker.js diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js new file mode 100644 index 00000000000..20154cfe50f --- /dev/null +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -0,0 +1,60 @@ +import installWorker from './WebSocketWorker.js'; + +class BatchingWebSocketProvider extends EventTarget { + #worker; + + constructor() { + super(); + // Install worker, register listeners etc. + const workerFunction = `(${installWorker.toString()})()`; + const workerBlob = new Blob([workerFunction]); + const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' }); + this.#worker = new Worker(workerUrl); + + this.routeMessageToHandler = this.routeMessageToHandler.bind(this); + this.#worker.addEventListener('message', this.routeMessageToHandler); + } + + connect(url) { + this.#worker.postMessage({ + type: 'connect', + url + }); + } + + disconnect() { + this.#worker.postMessage({ type: 'disconnect' }); + } + + sendMessage(message) { + this.#worker.postMessage({ + type: 'message', + message + }); + } + + setBatchingStrategy(strategy) { + const serializedStrategy = { + shouldBatchMessage: strategy.shouldBatchMessage.toString(), + getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString() + }; + + this.#worker.postMessage({ + type: 'setBatchingStrategy', + serializedStrategy + }); + } + + routeMessageToHandler(message) { + // Batch message would need to be handle differently here + if (message.data.type === 'batch') { + this.dispatchEvent(new MessageEvent('batch', { data: message.data })); + } else if (message.data.type === 'message') { + this.dispatchEvent(new MessageEvent('message', { data: message.data })); + } else { + throw new Error(`Unknown message type: ${message.data.type}`); + } + } +} + +export default BatchingWebSocketProvider; diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 1f898277330..f1290b0f846 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -23,6 +23,7 @@ import objectUtils from 'objectUtils'; import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js'; +import BatchingWebSocketProvider from './BatchingWebSocketProvider.js'; import DefaultMetadataProvider from './DefaultMetadataProvider.js'; import TelemetryCollection from './TelemetryCollection.js'; import TelemetryMetadataManager from './TelemetryMetadataManager.js'; @@ -95,6 +96,7 @@ export default class TelemetryAPI { this.valueFormatterCache = new WeakMap(); this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry(); this.#isGreedyLAD = true; + this.BatchingWebSocketProvider = BatchingWebSocketProvider; } abortAllRequests() { diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js new file mode 100644 index 00000000000..f198cfd5c99 --- /dev/null +++ b/src/api/telemetry/WebSocketWorker.js @@ -0,0 +1,295 @@ +/* eslint-disable max-classes-per-file */ +export default function installWorker() { + const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; + + class ResilientWebSocket extends EventTarget { + #webSocket; + #isConnected = false; + #isConnecting = false; + #messageQueue = []; + #reconnectTimeoutHandle; + #currentWaitIndex = 0; + #messageCallbacks = []; + #wsUrl; + + connect(url) { + this.#wsUrl = url; + if (this.#isConnected) { + throw new Error('WebSocket already connected'); + } + + if (this.#isConnecting) { + throw new Error('WebSocket connection in progress'); + } + + this.#isConnecting = true; + + this.#webSocket = new WebSocket(url); + + const boundConnected = this.#connected.bind(this); + this.#webSocket.addEventListener('open', boundConnected); + + const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this); + this.#webSocket.addEventListener('error', boundCleanUpAndReconnect); + + const boundDisconnect = this.disconnect.bind(this); + this.#webSocket.addEventListener('close', boundCleanUpAndReconnect); + + const boundMessage = this.#message.bind(this); + this.#webSocket.addEventListener('message', boundMessage); + + this.addEventListener( + 'disconnected', + () => { + this.#webSocket.removeEventListener('open', boundConnected); + this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect); + this.#webSocket.removeEventListener('close', boundDisconnect); + }, + { once: true } + ); + } + + //Do not use Event dispatching for this. Unnecessary overhead. + registerMessageCallback(callback) { + this.#messageCallbacks.push(callback); + + return () => { + this.#messageCallbacks = this.#messageCallbacks.filter((cb) => cb !== callback); + }; + } + + #connected() { + console.debug('Websocket connected.'); + this.#isConnected = true; + this.#isConnecting = false; + this.#currentWaitIndex = 0; + + this.dispatchEvent(new Event('connected')); + + this.#flushQueue(); + } + + #cleanUpAndReconnect() { + console.warn('Websocket closed. Attempting to reconnect...'); + this.disconnect(); + this.#reconnect(); + } + + #message(event) { + this.#messageCallbacks.forEach((callback) => callback(event.data)); + } + disconnect() { + this.#isConnected = false; + this.#isConnecting = false; + + if (this.#webSocket.readyState === WebSocket.OPEN) { + this.#webSocket.close(); + } + + this.dispatchEvent(new Event('disconnected')); + this.#webSocket = undefined; + } + + #reconnect() { + if (this.#reconnectTimeoutHandle) { + return; + } + + this.#reconnectTimeoutHandle = setTimeout(() => { + this.connect(this.#wsUrl); + + this.#reconnectTimeoutHandle = undefined; + }, FALLBACK_AND_WAIT_MS[this.#currentWaitIndex]); + + if (this.#currentWaitIndex < FALLBACK_AND_WAIT_MS.length - 1) { + this.#currentWaitIndex++; + } + } + + enqueueMessage(message) { + this.#messageQueue.push(message); + this.#flushQueueIfReady(); + } + + #flushQueueIfReady() { + if (this.#isConnected) { + this.#flushQueue(); + } + } + + #flushQueue() { + while (this.#messageQueue.length > 0) { + if (!this.#isConnected) { + break; + } + + const message = this.#messageQueue.shift(); + this.#webSocket.send(message); + } + } + } + + class WorkerToWebSocketMessageBroker { + #websocket; + #messageBatcher; + + constructor(websocket, messageBatcher) { + this.#websocket = websocket; + this.#messageBatcher = messageBatcher; + } + + routeMessageToHandler(message) { + const { type } = message.data; + switch (type) { + case 'connect': + this.connect(message); + break; + case 'disconnect': + this.disconnect(message); + break; + case 'message': + this.#websocket.enqueueMessage(message.data.message); + break; + case 'setBatchingStrategy': + this.setBatchingStrategy(message); + break; + case 'setRate': + this.setRate(message); + break; + default: + throw new Error(`Unknown message type: ${type}`); + } + } + connect(message) { + const { url } = message.data; + this.#websocket.connect(url); + } + disconnect() { + this.#websocket.disconnect(); + } + message(message) { + const { subscribeMessage } = message.data; + this.#websocket.enqueueMessage(subscribeMessage); + } + setBatchingStrategy(message) { + const { serializedStrategy } = message.data; + const batchingStrategy = { + // eslint-disable-next-line no-new-func + shouldBatchMessage: new Function(serializedStrategy.shouldBatchMessage), + // eslint-disable-next-line no-new-func + getBatchIdFromMessage: new Function(serializedStrategy.getBatchIdFromMessage) + // Will also include maximum batch length here + }; + this.#messageBatcher.setBatchingStrategy(batchingStrategy); + } + setRate(message) { + const { rate } = message.data; + this.#throttledTelemetryEmitter.setRate(rate); + } + } + + class WebSocketToWorkerMessageBroker { + #websocket; + #worker; + #messageBatcher; + + constructor(websocket, messageBatcher, worker) { + this.#websocket = websocket; + this.#messageBatcher = messageBatcher; + this.#worker = worker; + } + + routeMessageToHandler(data) { + //Implement batching here + if (this.#messageBatcher.shouldBatchMessage(data)) { + this.#messageBatcher.addMessageToBatch(data); + } else { + this.#worker.postMessage(data); + } + } + } + + class MessageBatcher { + #batch; + #batchingStrategy; + + constructor() { + this.resetBatch(); + } + resetBatch() { + this.#batch = {}; + } + setBatchingStrategy(strategy) { + this.#batchingStrategy = strategy; + } + shouldBatchMessage(message) { + return ( + this.#batchingStrategy.shouldBatchMessage && + this.#batchingStrategy.shouldBatchMessage(message) + ); + } + addMessageToBatch(message) { + const batchId = this.#batchingStrategy.getBatchIdFromMessage(message); + + if (this.#batch[batchId] === undefined) { + this.#batch[batchId] = [message]; + } else { + this.#batch[batchId].push(message); + } + } + nextBatch() { + const batch = this.#batch; + this.resetBatch(); + + return batch; + } + } + + class ThrottledTelemetryEmitter { + #rate; + #messageBatcher; + #worker; + #intervalHandle; + + constructor(messageBatcher, worker) { + this.#messageBatcher = messageBatcher; + this.#worker = worker; + } + + setRate(rate) { + this.#rate = rate; + this.#stop(); + this.#start(); + } + + #start() { + if (this.#intervalHandle) { + return; + } + + this.#intervalHandle = setInterval(() => { + const batch = this.#messageBatcher.nextBatch(); + this.#worker.postMessage(batch); + }, this.#rate); + } + + #stop() { + if (this.#intervalHandle) { + clearInterval(this.#intervalHandle); + this.#intervalHandle = undefined; + } + } + } + + const websocket = new ResilientWebSocket(); + const messageBatcher = new MessageBatcher(); + const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); + const websocketBroker = new WebSocketToWorkerMessageBroker(websocket, messageBatcher, self); + + self.addEventListener('message', (message) => { + workerBroker.routeMessageToHandler(message); + }); + websocket.registerMessageCallback((data) => { + websocketBroker.routeMessageToHandler(data); + }); +} From 2f2af0bac509397f4887b7abe967d0768bf5e1b2 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 12 Jan 2024 17:00:03 -0800 Subject: [PATCH 03/20] Added configurable batch size and throttling rate --- .../telemetry/BatchingWebSocketProvider.js | 18 ++++++- src/api/telemetry/WebSocketWorker.js | 54 +++++++++++++++---- 2 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js index 20154cfe50f..69a2cfa51a0 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -45,12 +45,26 @@ class BatchingWebSocketProvider extends EventTarget { }); } + setRate(rate) { + this.#worker.postMessage({ + type: 'setRate', + rate + }); + } + + setMaxBatchSize(maxBatchLength) { + this.#worker.postMessage({ + type: 'setMaxBatchSize', + maxBatchLength + }); + } + routeMessageToHandler(message) { // Batch message would need to be handle differently here if (message.data.type === 'batch') { - this.dispatchEvent(new MessageEvent('batch', { data: message.data })); + this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); } else if (message.data.type === 'message') { - this.dispatchEvent(new MessageEvent('message', { data: message.data })); + this.dispatchEvent(new CustomEvent('message', { detail: message.data.message })); } else { throw new Error(`Unknown message type: ${message.data.type}`); } diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index f198cfd5c99..d37c98a0b4f 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -132,10 +132,12 @@ export default function installWorker() { class WorkerToWebSocketMessageBroker { #websocket; #messageBatcher; + #throttledTelemetryEmitter; - constructor(websocket, messageBatcher) { + constructor(websocket, messageBatcher, throttledTelemetryEmitter) { this.#websocket = websocket; this.#messageBatcher = messageBatcher; + this.#throttledTelemetryEmitter = throttledTelemetryEmitter; } routeMessageToHandler(message) { @@ -156,6 +158,9 @@ export default function installWorker() { case 'setRate': this.setRate(message); break; + case 'setMaxBatchSize': + this.#messageBatcher.setMaxBatchSize(message); + break; default: throw new Error(`Unknown message type: ${type}`); } @@ -175,9 +180,9 @@ export default function installWorker() { const { serializedStrategy } = message.data; const batchingStrategy = { // eslint-disable-next-line no-new-func - shouldBatchMessage: new Function(serializedStrategy.shouldBatchMessage), + shouldBatchMessage: new Function(`return ${serializedStrategy.shouldBatchMessage}`)(), // eslint-disable-next-line no-new-func - getBatchIdFromMessage: new Function(serializedStrategy.getBatchIdFromMessage) + getBatchIdFromMessage: new Function(`return ${serializedStrategy.getBatchIdFromMessage}`)() // Will also include maximum batch length here }; this.#messageBatcher.setBatchingStrategy(batchingStrategy); @@ -189,12 +194,10 @@ export default function installWorker() { } class WebSocketToWorkerMessageBroker { - #websocket; #worker; #messageBatcher; - constructor(websocket, messageBatcher, worker) { - this.#websocket = websocket; + constructor(messageBatcher, worker) { this.#messageBatcher = messageBatcher; this.#worker = worker; } @@ -204,7 +207,10 @@ export default function installWorker() { if (this.#messageBatcher.shouldBatchMessage(data)) { this.#messageBatcher.addMessageToBatch(data); } else { - this.#worker.postMessage(data); + this.#worker.postMessage({ + type: 'message', + message: data + }); } } } @@ -212,12 +218,15 @@ export default function installWorker() { class MessageBatcher { #batch; #batchingStrategy; + #hasBatch = false; + #maxBatchSize = 10; constructor() { this.resetBatch(); } resetBatch() { this.#batch = {}; + this.#hasBatch = false; } setBatchingStrategy(strategy) { this.#batchingStrategy = strategy; @@ -236,6 +245,16 @@ export default function installWorker() { } else { this.#batch[batchId].push(message); } + if (this.#batch[batchId].length > this.#maxBatchSize) { + this.#batch[batchId].shift(); + console.warn('Dropping message from batch. Batch size exceeded.'); + } + if (!this.#hasBatch) { + this.#hasBatch = true; + } + } + setMaxBatchSize(maxBatchSize) { + this.#maxBatchSize = maxBatchSize; } nextBatch() { const batch = this.#batch; @@ -243,6 +262,9 @@ export default function installWorker() { return batch; } + hasBatch() { + return this.#hasBatch; + } } class ThrottledTelemetryEmitter { @@ -268,8 +290,13 @@ export default function installWorker() { } this.#intervalHandle = setInterval(() => { - const batch = this.#messageBatcher.nextBatch(); - this.#worker.postMessage(batch); + if (this.#messageBatcher.hasBatch()) { + const batch = this.#messageBatcher.nextBatch(); + this.#worker.postMessage({ + type: 'batch', + batch + }); + } }, this.#rate); } @@ -283,8 +310,13 @@ export default function installWorker() { const websocket = new ResilientWebSocket(); const messageBatcher = new MessageBatcher(); - const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); - const websocketBroker = new WebSocketToWorkerMessageBroker(websocket, messageBatcher, self); + const throttledTelemetryEmitter = new ThrottledTelemetryEmitter(messageBatcher, self); + const workerBroker = new WorkerToWebSocketMessageBroker( + websocket, + messageBatcher, + throttledTelemetryEmitter + ); + const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self); self.addEventListener('message', (message) => { workerBroker.routeMessageToHandler(message); From 0061d162e10233e1467fdc7ef810877385a24beb Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 16 Jan 2024 14:59:23 -0800 Subject: [PATCH 04/20] Support batch size based throttling --- .../telemetry/BatchingWebSocketProvider.js | 21 ++++++++++++++++--- src/api/telemetry/WebSocketWorker.js | 16 +++++++------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js index 69a2cfa51a0..5b97c9d21e9 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -2,14 +2,18 @@ import installWorker from './WebSocketWorker.js'; class BatchingWebSocketProvider extends EventTarget { #worker; + #openmct; + #showingRateLimitNotification; - constructor() { + constructor(openmct) { super(); // Install worker, register listeners etc. const workerFunction = `(${installWorker.toString()})()`; const workerBlob = new Blob([workerFunction]); const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' }); this.#worker = new Worker(workerUrl); + this.#openmct = openmct; + this.#showingRateLimitNotification = false; this.routeMessageToHandler = this.routeMessageToHandler.bind(this); this.#worker.addEventListener('message', this.routeMessageToHandler); @@ -52,16 +56,27 @@ class BatchingWebSocketProvider extends EventTarget { }); } - setMaxBatchSize(maxBatchLength) { + setMaxBatchSize(maxBatchSize) { this.#worker.postMessage({ type: 'setMaxBatchSize', - maxBatchLength + maxBatchSize }); } routeMessageToHandler(message) { // Batch message would need to be handle differently here if (message.data.type === 'batch') { + if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) { + const notification = this.#openmct.notifications.alert( + 'Telemetry dropped due to client rate limiting.', + { hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' } + ); + this.#showingRateLimitNotification = true; + notification.once('minimized', () => { + this.#showingRateLimitNotification = false; + }); + return; + } this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); } else if (message.data.type === 'message') { this.dispatchEvent(new CustomEvent('message', { detail: message.data.message })); diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index d37c98a0b4f..eec154291e9 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -159,7 +159,7 @@ export default function installWorker() { this.setRate(message); break; case 'setMaxBatchSize': - this.#messageBatcher.setMaxBatchSize(message); + this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize); break; default: throw new Error(`Unknown message type: ${type}`); @@ -239,15 +239,15 @@ export default function installWorker() { } addMessageToBatch(message) { const batchId = this.#batchingStrategy.getBatchIdFromMessage(message); - - if (this.#batch[batchId] === undefined) { - this.#batch[batchId] = [message]; + let batch = this.#batch[batchId]; + if (batch === undefined) { + batch = this.#batch[batchId] = [message]; } else { - this.#batch[batchId].push(message); + batch.push(message); } - if (this.#batch[batchId].length > this.#maxBatchSize) { - this.#batch[batchId].shift(); - console.warn('Dropping message from batch. Batch size exceeded.'); + if (batch.length > this.#maxBatchSize) { + batch.shift(); + this.#batch.dropped = this.#batch.dropped || true; } if (!this.#hasBatch) { this.#hasBatch = true; From 74c1cdf468c259570a059f3f866af1722e926a13 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Tue, 16 Jan 2024 15:41:03 -0800 Subject: [PATCH 05/20] Default to latest strategy --- src/api/telemetry/TelemetryAPI.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index f1290b0f846..f1b07d3b3b0 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -403,13 +403,15 @@ export default class TelemetryAPI { * @returns {Function} a function which may be called to terminate * the subscription */ - subscribe(domainObject, callback, options) { + subscribe(domainObject, callback, options = { strategy: 'latest' }) { if (domainObject.type === 'unknown') { return () => {}; } // Default behavior is to use the latest strategy, as opposed to the new "batch" strategy - options.strategy = options.strategy || 'latest'; + if (options.strategy === undefined) { + options.strategy = 'latest'; + } const provider = this.findSubscriptionProvider(domainObject, options); From c28ced5c29e6319485a9b4d48f3cd885e123c268 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Thu, 18 Jan 2024 08:53:07 -0800 Subject: [PATCH 06/20] Don't hide original error --- src/plugins/remoteClock/RemoteClock.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plugins/remoteClock/RemoteClock.js b/src/plugins/remoteClock/RemoteClock.js index 6bffea15130..e2b3049e60a 100644 --- a/src/plugins/remoteClock/RemoteClock.js +++ b/src/plugins/remoteClock/RemoteClock.js @@ -77,7 +77,7 @@ export default class RemoteClock extends DefaultClock { this._subscribe(); }) .catch((error) => { - throw new Error(error); + throw error; }); } From 947810b5d799898d7e19b2f5c4202a074fbf48e3 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Thu, 18 Jan 2024 17:43:10 -0800 Subject: [PATCH 07/20] Added copyright statement --- .../telemetry/BatchingWebSocketProvider.js | 22 +++++++++++++++++++ src/api/telemetry/WebSocketWorker.js | 21 ++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js index 5b97c9d21e9..bb2bf6196b5 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -1,3 +1,25 @@ +/***************************************************************************** + * Open MCT Web, Copyright (c) 2014-2024, United States Government + * as represented by the Administrator of the National Aeronautics and Space + * Administration. All rights reserved. + * + * Open MCT Web is licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * Open MCT Web includes source code licensed under additional open source + * licenses. See the Open Source Licenses file (LICENSES.md) included with + * this source code distribution or the Licensing information page available + * at runtime from the About dialog for additional information. + *****************************************************************************/ + import installWorker from './WebSocketWorker.js'; class BatchingWebSocketProvider extends EventTarget { diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index eec154291e9..927943f27d3 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -1,3 +1,24 @@ +/***************************************************************************** + * Open MCT Web, Copyright (c) 2014-2024, United States Government + * as represented by the Administrator of the National Aeronautics and Space + * Administration. All rights reserved. + * + * Open MCT Web is licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * Open MCT Web includes source code licensed under additional open source + * licenses. See the Open Source Licenses file (LICENSES.md) included with + * this source code distribution or the Licensing information page available + * at runtime from the About dialog for additional information. + *****************************************************************************/ /* eslint-disable max-classes-per-file */ export default function installWorker() { const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; From f079c3a3b999e8b7498241da5eb321c6fb7db809 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 19 Jan 2024 13:15:30 -0800 Subject: [PATCH 08/20] Renamed BatchingWebSocketProvider to BatchingWebSocket --- src/api/telemetry/BatchingWebSocketProvider.js | 5 ++--- src/api/telemetry/TelemetryAPI.js | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js index bb2bf6196b5..95d281288d3 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -19,10 +19,9 @@ * this source code distribution or the Licensing information page available * at runtime from the About dialog for additional information. *****************************************************************************/ - import installWorker from './WebSocketWorker.js'; -class BatchingWebSocketProvider extends EventTarget { +class BatchingWebSocket extends EventTarget { #worker; #openmct; #showingRateLimitNotification; @@ -108,4 +107,4 @@ class BatchingWebSocketProvider extends EventTarget { } } -export default BatchingWebSocketProvider; +export default BatchingWebSocket; diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 6f34cabeaaa..e944932443f 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -23,7 +23,7 @@ import objectUtils from 'objectUtils'; import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js'; -import BatchingWebSocketProvider from './BatchingWebSocketProvider.js'; +import BatchingWebSocket from './BatchingWebSocketProvider.js'; import DefaultMetadataProvider from './DefaultMetadataProvider.js'; import TelemetryCollection from './TelemetryCollection.js'; import TelemetryMetadataManager from './TelemetryMetadataManager.js'; @@ -96,7 +96,7 @@ export default class TelemetryAPI { this.valueFormatterCache = new WeakMap(); this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry(); this.#isGreedyLAD = true; - this.BatchingWebSocketProvider = BatchingWebSocketProvider; + this.BatchingWebSocket = BatchingWebSocket; } abortAllRequests() { From 61edd0f810f419798821a73eec422d33a0fbbd15 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Fri, 19 Jan 2024 15:44:27 -0800 Subject: [PATCH 09/20] Adding docs --- .../telemetry/BatchingWebSocketProvider.js | 73 +++++++++++++++++-- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocketProvider.js index 95d281288d3..74621697965 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocketProvider.js @@ -21,6 +21,37 @@ *****************************************************************************/ import installWorker from './WebSocketWorker.js'; +/** + * Describes the strategy to be used when batching WebSocket messages + * + * @typedef BatchingStrategy + * @property {Function} shouldBatchMessage a function that accepts a single + * argument - the raw message received from the websocket. Every message + * received will be evaluated against this function so it should be performant. + * Note also that this function is executed in a worker, so it must be + * completely self-contained with no external dependencies. The function + * should return `true` if the message should be batched, and `false` if not. + * @property {Function} getBatchIdFromMessage a function that accepts a + * single argument - the raw message received from the websocket. Only messages + * where `shouldBatchMessage` has evaluated to true will be passed into this + * function. The function should return a unique value on which to batch the + * messages. For example a telemetry, channel, or parameter identifier. + */ + +/** + * Provides a reliable and convenient WebSocket abstraction layer that handles + * a lot of boilerplate common to managing WebSocket connections such as: + * - Establishing a WebSocket connection to a server + * - Reconnecting on error, with a fallback strategy + * - Queuing messages so that clients can send messages without concern for the current + * connection state of the WebSocket. + * + * The WebSocket that it manages is based in a dedicated worker so that network + * concerns are not handled on the main event loop. This allows for performant receipt + * and batching of messages without blocking either the UI or server. + * + * @memberof module:openmct.telemetry + */ class BatchingWebSocket extends EventTarget { #worker; #openmct; @@ -36,10 +67,14 @@ class BatchingWebSocket extends EventTarget { this.#openmct = openmct; this.#showingRateLimitNotification = false; - this.routeMessageToHandler = this.routeMessageToHandler.bind(this); - this.#worker.addEventListener('message', this.routeMessageToHandler); + const routeMessageToHandler = this.#routeMessageToHandler.bind(this); + this.#worker.addEventListener('message', routeMessageToHandler); } + /** + * Will establish a WebSocket connection to the provided url + * @param {string} url The URL to connect to + */ connect(url) { this.#worker.postMessage({ type: 'connect', @@ -47,10 +82,11 @@ class BatchingWebSocket extends EventTarget { }); } - disconnect() { - this.#worker.postMessage({ type: 'disconnect' }); - } - + /** + * Send a message to the WebSocket. + * @param {any} message The message to send. Can be any type supported by WebSockets. + * See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data + */ sendMessage(message) { this.#worker.postMessage({ type: 'message', @@ -58,6 +94,12 @@ class BatchingWebSocket extends EventTarget { }); } + /** + * Set the strategy used to both decide which raw messages to batch, and how to group + * them. + * @param {BatchingStrategy} strategy The batching strategy to use when evaluating + * raw messages from the WebSocket. + */ setBatchingStrategy(strategy) { const serializedStrategy = { shouldBatchMessage: strategy.shouldBatchMessage.toString(), @@ -70,6 +112,10 @@ class BatchingWebSocket extends EventTarget { }); } + /** + * When using batching, sets the rate at which batches of messages are released. + * @param {Number} rate the amount of time to wait, in ms, between batches. + */ setRate(rate) { this.#worker.postMessage({ type: 'setRate', @@ -77,6 +123,19 @@ class BatchingWebSocket extends EventTarget { }); } + /** + * @param {Number} maxBatchSize the maximum length of a batch of messages. For example, + * the maximum number of telemetry values to batch before dropping them + * Note that this is a fail-safe that is only invoked if performance drops to the + * point where Open MCT cannot keep up with the amount of telemetry it is receiving. + * In this event it will sacrifice the oldest telemetry in the batch in favor of the + * most recent telemetry. The user will be informed that telemetry has been dropped. + * + * This should be specced appropriately for the expected data rate. eg. If telemetry + * is received at 10Hz for each telemetry point, then a minimal combination of batch + * size and rate is 10 and 1000 respectively. Ideally you would add some margin, so + * 15 would probably be a better batch size. + */ setMaxBatchSize(maxBatchSize) { this.#worker.postMessage({ type: 'setMaxBatchSize', @@ -84,7 +143,7 @@ class BatchingWebSocket extends EventTarget { }); } - routeMessageToHandler(message) { + #routeMessageToHandler(message) { // Batch message would need to be handle differently here if (message.data.type === 'batch') { if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) { From 2e3dc4da9af402623973a33af89682640940ed3f Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 20 Jan 2024 17:54:33 -0800 Subject: [PATCH 10/20] renamed class. changed throttling strategy to be driven by the main thread --- ...SocketProvider.js => BatchingWebSocket.js} | 21 ++-- src/api/telemetry/WebSocketWorker.js | 101 ++++++------------ 2 files changed, 45 insertions(+), 77 deletions(-) rename src/api/telemetry/{BatchingWebSocketProvider.js => BatchingWebSocket.js} (96%) diff --git a/src/api/telemetry/BatchingWebSocketProvider.js b/src/api/telemetry/BatchingWebSocket.js similarity index 96% rename from src/api/telemetry/BatchingWebSocketProvider.js rename to src/api/telemetry/BatchingWebSocket.js index 74621697965..745f2566574 100644 --- a/src/api/telemetry/BatchingWebSocketProvider.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -20,7 +20,7 @@ * at runtime from the About dialog for additional information. *****************************************************************************/ import installWorker from './WebSocketWorker.js'; - +const DEFAULT_RATE_MS = 1000; /** * Describes the strategy to be used when batching WebSocket messages * @@ -37,7 +37,6 @@ import installWorker from './WebSocketWorker.js'; * function. The function should return a unique value on which to batch the * messages. For example a telemetry, channel, or parameter identifier. */ - /** * Provides a reliable and convenient WebSocket abstraction layer that handles * a lot of boilerplate common to managing WebSocket connections such as: @@ -56,6 +55,7 @@ class BatchingWebSocket extends EventTarget { #worker; #openmct; #showingRateLimitNotification; + #rate; constructor(openmct) { super(); @@ -66,6 +66,7 @@ class BatchingWebSocket extends EventTarget { this.#worker = new Worker(workerUrl); this.#openmct = openmct; this.#showingRateLimitNotification = false; + this.#rate = DEFAULT_RATE_MS; const routeMessageToHandler = this.#routeMessageToHandler.bind(this); this.#worker.addEventListener('message', routeMessageToHandler); @@ -80,6 +81,14 @@ class BatchingWebSocket extends EventTarget { type: 'connect', url }); + + this.#readyForNextBatch(); + } + + #readyForNextBatch() { + this.#worker.postMessage({ + type: 'readyForNextBatch' + }); } /** @@ -117,10 +126,7 @@ class BatchingWebSocket extends EventTarget { * @param {Number} rate the amount of time to wait, in ms, between batches. */ setRate(rate) { - this.#worker.postMessage({ - type: 'setRate', - rate - }); + this.#rate = rate; } /** @@ -158,6 +164,9 @@ class BatchingWebSocket extends EventTarget { return; } this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); + setTimeout(() => { + this.#readyForNextBatch(); + }, this.#rate); } else if (message.data.type === 'message') { this.dispatchEvent(new CustomEvent('message', { detail: message.data.message })); } else { diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index 927943f27d3..70a074d6208 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -153,12 +153,10 @@ export default function installWorker() { class WorkerToWebSocketMessageBroker { #websocket; #messageBatcher; - #throttledTelemetryEmitter; - constructor(websocket, messageBatcher, throttledTelemetryEmitter) { + constructor(websocket, messageBatcher) { this.#websocket = websocket; this.#messageBatcher = messageBatcher; - this.#throttledTelemetryEmitter = throttledTelemetryEmitter; } routeMessageToHandler(message) { @@ -176,8 +174,8 @@ export default function installWorker() { case 'setBatchingStrategy': this.setBatchingStrategy(message); break; - case 'setRate': - this.setRate(message); + case 'readyForNextBatch': + this.#messageBatcher.readyForNextBatch(); break; case 'setMaxBatchSize': this.#messageBatcher.setMaxBatchSize(message.data.maxBatchSize); @@ -193,10 +191,6 @@ export default function installWorker() { disconnect() { this.#websocket.disconnect(); } - message(message) { - const { subscribeMessage } = message.data; - this.#websocket.enqueueMessage(subscribeMessage); - } setBatchingStrategy(message) { const { serializedStrategy } = message.data; const batchingStrategy = { @@ -208,10 +202,6 @@ export default function installWorker() { }; this.#messageBatcher.setBatchingStrategy(batchingStrategy); } - setRate(message) { - const { rate } = message.data; - this.#throttledTelemetryEmitter.setRate(rate); - } } class WebSocketToWorkerMessageBroker { @@ -240,12 +230,17 @@ export default function installWorker() { #batch; #batchingStrategy; #hasBatch = false; - #maxBatchSize = 10; + #maxBatchSize; + #readyForNextBatch; + #worker; - constructor() { - this.resetBatch(); + constructor(worker) { + this.#maxBatchSize = 10; + this.#readyForNextBatch = false; + this.#worker = worker; + this.#resetBatch(); } - resetBatch() { + #resetBatch() { this.#batch = {}; this.#hasBatch = false; } @@ -270,73 +265,37 @@ export default function installWorker() { batch.shift(); this.#batch.dropped = this.#batch.dropped || true; } - if (!this.#hasBatch) { + if (this.#readyForNextBatch) { + this.#sendNextBatch(); + } else { this.#hasBatch = true; } } setMaxBatchSize(maxBatchSize) { this.#maxBatchSize = maxBatchSize; } - nextBatch() { - const batch = this.#batch; - this.resetBatch(); - - return batch; - } - hasBatch() { - return this.#hasBatch; - } - } - - class ThrottledTelemetryEmitter { - #rate; - #messageBatcher; - #worker; - #intervalHandle; - - constructor(messageBatcher, worker) { - this.#messageBatcher = messageBatcher; - this.#worker = worker; - } - - setRate(rate) { - this.#rate = rate; - this.#stop(); - this.#start(); - } - - #start() { - if (this.#intervalHandle) { - return; + readyForNextBatch() { + if (this.#hasBatch) { + this.#sendNextBatch(); + } else { + this.#readyForNextBatch = true; } - - this.#intervalHandle = setInterval(() => { - if (this.#messageBatcher.hasBatch()) { - const batch = this.#messageBatcher.nextBatch(); - this.#worker.postMessage({ - type: 'batch', - batch - }); - } - }, this.#rate); } - - #stop() { - if (this.#intervalHandle) { - clearInterval(this.#intervalHandle); - this.#intervalHandle = undefined; - } + #sendNextBatch() { + const batch = this.#batch; + this.#resetBatch(); + this.#worker.postMessage({ + type: 'batch', + batch + }); + this.#readyForNextBatch = false; + this.#hasBatch = false; } } const websocket = new ResilientWebSocket(); const messageBatcher = new MessageBatcher(); - const throttledTelemetryEmitter = new ThrottledTelemetryEmitter(messageBatcher, self); - const workerBroker = new WorkerToWebSocketMessageBroker( - websocket, - messageBatcher, - throttledTelemetryEmitter - ); + const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self); self.addEventListener('message', (message) => { From 0bdd5efcba0ab6090fa4831d87c187bf9c7af9d9 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sat, 20 Jan 2024 19:18:15 -0800 Subject: [PATCH 11/20] Renamed classes --- src/api/telemetry/BatchingWebSocket.js | 1 - src/api/telemetry/TelemetryAPI.js | 2 +- src/api/telemetry/WebSocketWorker.js | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js index 745f2566574..39fd704c432 100644 --- a/src/api/telemetry/BatchingWebSocket.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -161,7 +161,6 @@ class BatchingWebSocket extends EventTarget { notification.once('minimized', () => { this.#showingRateLimitNotification = false; }); - return; } this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch })); setTimeout(() => { diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index e944932443f..a951ebb94d6 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -23,7 +23,7 @@ import objectUtils from 'objectUtils'; import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js'; -import BatchingWebSocket from './BatchingWebSocketProvider.js'; +import BatchingWebSocket from './BatchingWebSocket.js'; import DefaultMetadataProvider from './DefaultMetadataProvider.js'; import TelemetryCollection from './TelemetryCollection.js'; import TelemetryMetadataManager from './TelemetryMetadataManager.js'; diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index 70a074d6208..01228fb4fc2 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -294,7 +294,7 @@ export default function installWorker() { } const websocket = new ResilientWebSocket(); - const messageBatcher = new MessageBatcher(); + const messageBatcher = new MessageBatcher(self); const workerBroker = new WorkerToWebSocketMessageBroker(websocket, messageBatcher); const websocketBroker = new WebSocketToWorkerMessageBroker(messageBatcher, self); From ad02ba7ffed5859321b5baabd8b8b3d0e3de5fed Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 22 Jan 2024 07:20:13 -0800 Subject: [PATCH 12/20] Added more documentation --- src/api/telemetry/BatchingWebSocket.js | 19 +++++++- src/api/telemetry/WebSocketWorker.js | 61 +++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js index 39fd704c432..3c646e30a7f 100644 --- a/src/api/telemetry/BatchingWebSocket.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -70,6 +70,13 @@ class BatchingWebSocket extends EventTarget { const routeMessageToHandler = this.#routeMessageToHandler.bind(this); this.#worker.addEventListener('message', routeMessageToHandler); + openmct.on( + 'destroy', + () => { + this.disconnect(); + }, + { once: true } + ); } /** @@ -106,7 +113,7 @@ class BatchingWebSocket extends EventTarget { /** * Set the strategy used to both decide which raw messages to batch, and how to group * them. - * @param {BatchingStrategy} strategy The batching strategy to use when evaluating + * @param {BatchingStrategy} strategy The batching strategy to use when evaluating * raw messages from the WebSocket. */ setBatchingStrategy(strategy) { @@ -149,6 +156,16 @@ class BatchingWebSocket extends EventTarget { }); } + /** + * Disconnect the associated WebSocket. Generally speaking there is no need to call + * this manually. + */ + disconnect() { + this.#worker.postMessage({ + type: 'disconnect' + }); + } + #routeMessageToHandler(message) { // Batch message would need to be handle differently here if (message.data.type === 'batch') { diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index 01228fb4fc2..346bc01827d 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -23,6 +23,18 @@ export default function installWorker() { const FALLBACK_AND_WAIT_MS = [1000, 5000, 5000, 10000, 10000, 30000]; + /** + * @typedef {import('./BatchingWebSocket').BatchingStrategy} BatchingStrategy + */ + + /** + * Provides a WebSocket connection that is resilient to errors and dropouts. + * On an error or dropout, will automatically reconnect. + * + * Additionally, messages will be queued and sent only when WebSocket is + * connected meaning that client code does not need to check the state of + * the socket before sending. + */ class ResilientWebSocket extends EventTarget { #webSocket; #isConnected = false; @@ -33,6 +45,10 @@ export default function installWorker() { #messageCallbacks = []; #wsUrl; + /** + * Establish a new WebSocket connection to the given URL + * @param {String} url + */ connect(url) { this.#wsUrl = url; if (this.#isConnected) { @@ -70,7 +86,13 @@ export default function installWorker() { ); } - //Do not use Event dispatching for this. Unnecessary overhead. + /** + * Register a callback to be invoked when a message is received on the WebSocket. + * This paradigm is used instead of the standard EventTarget or EventEmitter approach + * for performance reasons. + * @param {Function} callback The function to be invoked when a message is received + * @returns an unregister function + */ registerMessageCallback(callback) { this.#messageCallbacks.push(callback); @@ -99,6 +121,7 @@ export default function installWorker() { #message(event) { this.#messageCallbacks.forEach((callback) => callback(event.data)); } + disconnect() { this.#isConnected = false; this.#isConnecting = false; @@ -150,6 +173,10 @@ export default function installWorker() { } } + /** + * Handles messages over the worker interface, and + * sends corresponding WebSocket messages. + */ class WorkerToWebSocketMessageBroker { #websocket; #messageBatcher; @@ -204,6 +231,10 @@ export default function installWorker() { } } + /** + * Received messages from the WebSocket, and passes them along to the + * Worker interface and back to the main thread. + */ class WebSocketToWorkerMessageBroker { #worker; #messageBatcher; @@ -226,6 +257,9 @@ export default function installWorker() { } } + /** + * Responsible for batching messages according to the defined batching strategy. + */ class MessageBatcher { #batch; #batchingStrategy; @@ -244,15 +278,34 @@ export default function installWorker() { this.#batch = {}; this.#hasBatch = false; } + /** + * @param {BatchingStrategy} strategy + */ setBatchingStrategy(strategy) { this.#batchingStrategy = strategy; } + /** + * Applies the `shouldBatchMessage` function from the supplied batching strategy + * to each message to determine if it should be added to a batch. If not batched, + * the message is immediately sent over the worker to the main thread. + * @param {any} message the message received from the WebSocket. See the WebSocket + * documentation for more details - + * https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data + * @returns + */ shouldBatchMessage(message) { return ( this.#batchingStrategy.shouldBatchMessage && this.#batchingStrategy.shouldBatchMessage(message) ); } + /** + * Adds the given message to a batch. The batch group that the message is added + * to will be determined by the value returned by `getBatchIdFromMessage`. + * @param {any} message the message received from the WebSocket. See the WebSocket + * documentation for more details - + * https://developer.mozilla.org/en-US/docs/Web/API/MessageEvent/data + */ addMessageToBatch(message) { const batchId = this.#batchingStrategy.getBatchIdFromMessage(message); let batch = this.#batch[batchId]; @@ -274,6 +327,12 @@ export default function installWorker() { setMaxBatchSize(maxBatchSize) { this.#maxBatchSize = maxBatchSize; } + /** + * Indicates that client code is ready to receive the next batch of + * messages. If a batch is available, it will be immediately sent. + * Otherwise a flag will be set to send the next batch as soon as + * any new data is available. + */ readyForNextBatch() { if (this.#hasBatch) { this.#sendNextBatch(); From c4a3ace027a41b7344d72d83077e9eb0e8258b60 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 22 Jan 2024 18:35:15 -0800 Subject: [PATCH 13/20] Fixed broken tests --- src/api/telemetry/TelemetryAPISpec.js | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/api/telemetry/TelemetryAPISpec.js b/src/api/telemetry/TelemetryAPISpec.js index 59ff14b3bf6..ea0c9d169c5 100644 --- a/src/api/telemetry/TelemetryAPISpec.js +++ b/src/api/telemetry/TelemetryAPISpec.js @@ -90,7 +90,9 @@ describe('Telemetry API', () => { const callback = jasmine.createSpy('callback'); const unsubscribe = telemetryAPI.subscribe(domainObject, callback); - expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject); + expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, { + strategy: 'latest' + }); expect(telemetryProvider.subscribe).not.toHaveBeenCalled(); expect(unsubscribe).toEqual(jasmine.any(Function)); @@ -111,12 +113,16 @@ describe('Telemetry API', () => { const callback = jasmine.createSpy('callback'); const unsubscribe = telemetryAPI.subscribe(domainObject, callback); expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1); - expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject); + expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, { + strategy: 'latest' + }); expect(telemetryProvider.subscribe.calls.count()).toBe(1); expect(telemetryProvider.subscribe).toHaveBeenCalledWith( domainObject, jasmine.any(Function), - undefined + { + strategy: 'latest' + } ); const notify = telemetryProvider.subscribe.calls.mostRecent().args[1]; From e135498401263523453c697201eb0233a72a19c1 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Wed, 24 Jan 2024 07:54:10 -0800 Subject: [PATCH 14/20] Addressed review comments --- src/api/telemetry/BatchingWebSocket.js | 2 +- src/api/telemetry/TelemetryAPI.js | 29 +++++++++++++--- src/api/telemetry/TelemetryCollection.js | 2 +- src/api/telemetry/WebSocketWorker.js | 4 +-- src/plugins/plot/configuration/Model.js | 4 ++- src/plugins/plot/configuration/PlotSeries.js | 2 +- src/plugins/remoteClock/RemoteClock.js | 35 +++++++++----------- 7 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js index 3c646e30a7f..df2e7f98f07 100644 --- a/src/api/telemetry/BatchingWebSocket.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -74,6 +74,7 @@ class BatchingWebSocket extends EventTarget { 'destroy', () => { this.disconnect(); + URL.revokeObjectURL(workerUrl); }, { once: true } ); @@ -167,7 +168,6 @@ class BatchingWebSocket extends EventTarget { } #routeMessageToHandler(message) { - // Batch message would need to be handle differently here if (message.data.type === 'batch') { if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) { const notification = this.#openmct.notifications.alert( diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index a951ebb94d6..2cb3557c640 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -72,6 +72,11 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js'; * @memberof module:openmct.TelemetryAPI~ */ +const SUBSCRIBE_STRATEGY = { + LATEST: 'latest', + BATCH: 'batch' +}; + /** * Utilities for telemetry * @interface TelemetryAPI @@ -80,6 +85,10 @@ import TelemetryValueFormatter from './TelemetryValueFormatter.js'; export default class TelemetryAPI { #isGreedyLAD; + get SUBSCRIBE_STRATEGY() { + return SUBSCRIBE_STRATEGY; + } + constructor(openmct) { this.openmct = openmct; @@ -403,14 +412,14 @@ export default class TelemetryAPI { * @returns {Function} a function which may be called to terminate * the subscription */ - subscribe(domainObject, callback, options = { strategy: 'latest' }) { + subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) { if (domainObject.type === 'unknown') { return () => {}; } // Default behavior is to use the latest strategy, as opposed to the new "batch" strategy - if (options.strategy === undefined) { - options.strategy = 'latest'; + if (options.strategy === undefined || options.strategy === null) { + options.strategy = SUBSCRIBE_STRATEGY.LATEST; } const provider = this.findSubscriptionProvider(domainObject, options); @@ -429,7 +438,7 @@ export default class TelemetryAPI { if (provider) { subscriber.unsubscribe = provider.subscribe( domainObject, - options.strategy === 'batch' + options.strategy === SUBSCRIBE_STRATEGY.BATCH ? subscriptionCallbackForArray : subscriptionCallbackForSingleValue, options @@ -442,6 +451,12 @@ export default class TelemetryAPI { } function subscriptionCallbackForArray(value) { + if (value === undefined || value === null || value.length === 0) { + throw new Error( + 'Attempt to invoke telemetry subscription callback with no telemetry datum' + ); + } + if (!Array.isArray(value)) { value = [value]; } @@ -456,6 +471,12 @@ export default class TelemetryAPI { value = value[value.length - 1]; } + if (value === undefined || value === null) { + throw new Error( + 'Attempt to invoke telemetry subscription callback with no telemetry datum' + ); + } + subscriber.callbacks.forEach(function (cb) { cb(value); }); diff --git a/src/api/telemetry/TelemetryCollection.js b/src/api/telemetry/TelemetryCollection.js index 2b0a06e4f6c..d5ddfa61d10 100644 --- a/src/api/telemetry/TelemetryCollection.js +++ b/src/api/telemetry/TelemetryCollection.js @@ -182,7 +182,7 @@ export default class TelemetryCollection extends EventEmitter { } const options = { ...this.options }; //We always want to receive all available values in telemetry tables. - options.strategy = 'batch'; + options.strategy = this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH; this.unsubscribe = this.openmct.telemetry.subscribe( this.domainObject, diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index 346bc01827d..ece932e347b 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -126,9 +126,7 @@ export default function installWorker() { this.#isConnected = false; this.#isConnecting = false; - if (this.#webSocket.readyState === WebSocket.OPEN) { - this.#webSocket.close(); - } + this.#webSocket.close(); this.dispatchEvent(new Event('disconnected')); this.#webSocket = undefined; diff --git a/src/plugins/plot/configuration/Model.js b/src/plugins/plot/configuration/Model.js index 390c0432280..2a698c4c646 100644 --- a/src/plugins/plot/configuration/Model.js +++ b/src/plugins/plot/configuration/Model.js @@ -139,7 +139,9 @@ export default class Model extends EventEmitter { /** @typedef {any} TODO */ -/** @typedef {TODO} OpenMCT */ +/** + * @typedef {import('../../../../openmct.js').OpenMCT} OpenMCT + */ /** @template {object} T diff --git a/src/plugins/plot/configuration/PlotSeries.js b/src/plugins/plot/configuration/PlotSeries.js index 74d46974cd6..fdc26d277e4 100644 --- a/src/plugins/plot/configuration/PlotSeries.js +++ b/src/plugins/plot/configuration/PlotSeries.js @@ -218,7 +218,7 @@ export default class PlotSeries extends Model { }, { filters: this.filters, - strategy: 'batch' + strategy: this.openmct.telemetry.SUBSCRIBE_STRATEGY.BATCH } ); } diff --git a/src/plugins/remoteClock/RemoteClock.js b/src/plugins/remoteClock/RemoteClock.js index e2b3049e60a..db50e4b2e23 100644 --- a/src/plugins/remoteClock/RemoteClock.js +++ b/src/plugins/remoteClock/RemoteClock.js @@ -59,26 +59,21 @@ export default class RemoteClock extends DefaultClock { } start() { - this.openmct.objects - .get(this.identifier) - .then((domainObject) => { - // The start method is called when at least one listener registers with the clock. - // When the clock is changed, listeners are unregistered from the clock and the stop method is called. - // Sometimes, the objects.get call above does not resolve before the stop method is called. - // So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock. - if (this.eventNames().length === 0) { - return; - } - this.openmct.time.on('timeSystem', this._timeSystemChange); - this.timeTelemetryObject = domainObject; - this.metadata = this.openmct.telemetry.getMetadata(domainObject); - this._timeSystemChange(); - this._requestLatest(); - this._subscribe(); - }) - .catch((error) => { - throw error; - }); + this.openmct.objects.get(this.identifier).then((domainObject) => { + // The start method is called when at least one listener registers with the clock. + // When the clock is changed, listeners are unregistered from the clock and the stop method is called. + // Sometimes, the objects.get call above does not resolve before the stop method is called. + // So when we proceed with the clock subscription below, we first need to ensure that there is at least one listener for our clock. + if (this.eventNames().length === 0) { + return; + } + this.openmct.time.on('timeSystem', this._timeSystemChange); + this.timeTelemetryObject = domainObject; + this.metadata = this.openmct.telemetry.getMetadata(domainObject); + this._timeSystemChange(); + this._requestLatest(); + this._subscribe(); + }); } stop() { From 12b921a0066adea43fbf3c954abceab1ee2e37d1 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Wed, 24 Jan 2024 08:24:26 -0800 Subject: [PATCH 15/20] Clean up and reconnect on websocket close --- src/api/telemetry/WebSocketWorker.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index ece932e347b..8ddefc50569 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -68,8 +68,6 @@ export default function installWorker() { const boundCleanUpAndReconnect = this.#cleanUpAndReconnect.bind(this); this.#webSocket.addEventListener('error', boundCleanUpAndReconnect); - - const boundDisconnect = this.disconnect.bind(this); this.#webSocket.addEventListener('close', boundCleanUpAndReconnect); const boundMessage = this.#message.bind(this); @@ -80,7 +78,7 @@ export default function installWorker() { () => { this.#webSocket.removeEventListener('open', boundConnected); this.#webSocket.removeEventListener('error', boundCleanUpAndReconnect); - this.#webSocket.removeEventListener('close', boundDisconnect); + this.#webSocket.removeEventListener('close', boundCleanUpAndReconnect); }, { once: true } ); @@ -126,7 +124,11 @@ export default function installWorker() { this.#isConnected = false; this.#isConnecting = false; - this.#webSocket.close(); + // On WebSocket error, both error callback and close callback are invoked, resulting in + // this function being called twice, and websocket being destroyed and deallocated. + if (this.#webSocket !== undefined && this.#webSocket !== null) { + this.#webSocket.close(); + } this.dispatchEvent(new Event('disconnected')); this.#webSocket = undefined; From 3f446139b8bc426fd01fa4d245750524e549b640 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sun, 28 Jan 2024 15:13:39 -0800 Subject: [PATCH 16/20] Better management of subscription strategies --- src/api/telemetry/TelemetryAPI.js | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 2cb3557c640..60ad56c64c6 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -413,35 +413,44 @@ export default class TelemetryAPI { * the subscription */ subscribe(domainObject, callback, options = { strategy: SUBSCRIBE_STRATEGY.LATEST }) { + const requestedStrategy = options.strategy || SUBSCRIBE_STRATEGY.LATEST; + if (domainObject.type === 'unknown') { return () => {}; } - // Default behavior is to use the latest strategy, as opposed to the new "batch" strategy - if (options.strategy === undefined || options.strategy === null) { - options.strategy = SUBSCRIBE_STRATEGY.LATEST; - } - const provider = this.findSubscriptionProvider(domainObject, options); + const supportsBatching = + Boolean(provider?.supportsBatching) && provider?.supportsBatching(domainObject, options); if (!this.subscribeCache) { this.subscribeCache = {}; } const keyString = objectUtils.makeKeyString(domainObject.identifier); - let subscriber = this.subscribeCache[keyString]; + const supportedStrategy = supportsBatching ? requestedStrategy : SUBSCRIBE_STRATEGY.LATEST; + // Override the requested strategy with the strategy supported by the provider + const optionsWithSupportedStrategy = { + ...options, + strategy: supportedStrategy + }; + // If batching is supported, we need to cache a subscription for each strategy - + // latest and batched. + const cacheKey = `${keyString}:${supportedStrategy}`; + let subscriber = this.subscribeCache[cacheKey]; if (!subscriber) { - subscriber = this.subscribeCache[keyString] = { + subscriber = this.subscribeCache[cacheKey] = { callbacks: [callback] }; if (provider) { subscriber.unsubscribe = provider.subscribe( domainObject, - options.strategy === SUBSCRIBE_STRATEGY.BATCH + //Format the data based on strategy requested by view + requestedStrategy === SUBSCRIBE_STRATEGY.BATCH ? subscriptionCallbackForArray : subscriptionCallbackForSingleValue, - options + optionsWithSupportedStrategy ); } else { subscriber.unsubscribe = function () {}; @@ -488,7 +497,7 @@ export default class TelemetryAPI { }); if (subscriber.callbacks.length === 0) { subscriber.unsubscribe(); - delete this.subscribeCache[keyString]; + delete this.subscribeCache[cacheKey]; } }.bind(this); } From c4f83c13987ff64d6a5be8fbd799649d24f14c67 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Sun, 28 Jan 2024 15:14:02 -0800 Subject: [PATCH 17/20] Add tests to catch edge cases where two subscribers request different strategies --- src/api/telemetry/TelemetryAPISpec.js | 113 ++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/src/api/telemetry/TelemetryAPISpec.js b/src/api/telemetry/TelemetryAPISpec.js index ea0c9d169c5..5f6d726d8ac 100644 --- a/src/api/telemetry/TelemetryAPISpec.js +++ b/src/api/telemetry/TelemetryAPISpec.js @@ -327,6 +327,119 @@ describe('Telemetry API', () => { signal }); }); + describe('telemetry batching support', () => { + let callbacks; + let unsubFunc; + + beforeEach(() => { + callbacks = []; + unsubFunc = jasmine.createSpy('unsubscribe'); + telemetryProvider.supportsBatching = jasmine.createSpy('supportsBatching'); + telemetryProvider.supportsBatching.and.returnValue(true); + telemetryProvider.supportsSubscribe.and.returnValue(true); + + telemetryProvider.subscribe.and.callFake(function (obj, cb, options) { + callbacks.push(cb); + + return unsubFunc; + }); + + telemetryAPI.addProvider(telemetryProvider); + }); + + it('caches subscriptions for batched and latest telemetry subscriptions', () => { + const latestCallback1 = jasmine.createSpy('latestCallback1'); + const unsubscribeFromLatest1 = telemetryAPI.subscribe(domainObject, latestCallback1, { + strategy: 'latest' + }); + const latestCallback2 = jasmine.createSpy('latestCallback2'); + const unsubscribeFromLatest2 = telemetryAPI.subscribe(domainObject, latestCallback2, { + strategy: 'latest' + }); + + //Expect a single cached subscription for latest telemetry + expect(telemetryProvider.subscribe.calls.count()).toBe(1); + + const batchedCallback1 = jasmine.createSpy('batchedCallback1'); + const unsubscribeFromBatched1 = telemetryAPI.subscribe(domainObject, batchedCallback1, { + strategy: 'batch' + }); + + const batchedCallback2 = jasmine.createSpy('batchedCallback2'); + const unsubscribeFromBatched2 = telemetryAPI.subscribe(domainObject, batchedCallback2, { + strategy: 'batch' + }); + + //Expect a single cached subscription for each strategy telemetry + expect(telemetryProvider.subscribe.calls.count()).toBe(2); + + unsubscribeFromLatest1(); + unsubscribeFromLatest2(); + unsubscribeFromBatched1(); + unsubscribeFromBatched2(); + + expect(unsubFunc).toHaveBeenCalledTimes(2); + }); + it('subscriptions with the latest strategy are always invoked with a single value', () => { + const latestCallback = jasmine.createSpy('latestCallback1'); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); + + const batchedValues = [1, 2, 3]; + callbacks.forEach((cb) => { + cb(batchedValues); + }); + + expect(latestCallback).toHaveBeenCalledWith(3); + + const singleValue = 1; + callbacks.forEach((cb) => { + cb(singleValue); + }); + + expect(latestCallback).toHaveBeenCalledWith(1); + }); + + it('subscriptions with the batch strategy are always invoked with an array', () => { + const batchedCallback1 = jasmine.createSpy('batchedCallback1'); + telemetryAPI.subscribe(domainObject, batchedCallback1, { + strategy: 'batch' + }); + + const batchedValues = [1, 2, 3]; + callbacks.forEach((cb) => { + cb(batchedValues); + }); + + // Callbacks for the 'batch' strategy are always called with an array of values + expect(batchedCallback1).toHaveBeenCalledWith(batchedValues); + + // Callbacks for the 'batch' strategy are always called with an array of values, even if there is only one value + callbacks.forEach((cb) => { + cb(1); + }); + + expect(batchedCallback1).toHaveBeenCalledWith([1]); + }); + + it('legacy providers are left unchanged, with a single subscription', () => { + delete telemetryProvider.supportsBatching; + + const batchCallback = jasmine.createSpy('batchCallback'); + telemetryAPI.subscribe(domainObject, batchCallback, { + strategy: 'batch' + }); + + const latestCallback = jasmine.createSpy('latestCallback'); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); + + expect(telemetryProvider.subscribe).toHaveBeenCalledTimes(1); + expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest'); + }); + }); }); describe('metadata', () => { From 2cfd7295b886d1ca0be36760e77fb310a736c0b5 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 29 Jan 2024 12:49:43 -0800 Subject: [PATCH 18/20] Ensure callbacks are invoked with telemetry in the requested format --- src/api/telemetry/TelemetryAPI.js | 66 +++++++++++++++++---------- src/api/telemetry/TelemetryAPISpec.js | 21 ++++++--- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 60ad56c64c6..3a5585c031a 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -84,6 +84,7 @@ const SUBSCRIBE_STRATEGY = { */ export default class TelemetryAPI { #isGreedyLAD; + #subscribeCache; get SUBSCRIBE_STRATEGY() { return SUBSCRIBE_STRATEGY; @@ -106,6 +107,7 @@ export default class TelemetryAPI { this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry(); this.#isGreedyLAD = true; this.BatchingWebSocket = BatchingWebSocket; + this.#subscribeCache = {}; } abortAllRequests() { @@ -423,8 +425,8 @@ export default class TelemetryAPI { const supportsBatching = Boolean(provider?.supportsBatching) && provider?.supportsBatching(domainObject, options); - if (!this.subscribeCache) { - this.subscribeCache = {}; + if (!this.#subscribeCache) { + this.#subscribeCache = {}; } const keyString = objectUtils.makeKeyString(domainObject.identifier); @@ -437,67 +439,81 @@ export default class TelemetryAPI { // If batching is supported, we need to cache a subscription for each strategy - // latest and batched. const cacheKey = `${keyString}:${supportedStrategy}`; - let subscriber = this.subscribeCache[cacheKey]; + let subscriber = this.#subscribeCache[cacheKey]; if (!subscriber) { - subscriber = this.subscribeCache[cacheKey] = { - callbacks: [callback] + subscriber = this.#subscribeCache[cacheKey] = { + latestCallbacks: [], + batchCallbacks: [] }; if (provider) { subscriber.unsubscribe = provider.subscribe( domainObject, - //Format the data based on strategy requested by view - requestedStrategy === SUBSCRIBE_STRATEGY.BATCH - ? subscriptionCallbackForArray - : subscriptionCallbackForSingleValue, + invokeCallbackWithRequestedStrategy, optionsWithSupportedStrategy ); } else { subscriber.unsubscribe = function () {}; } + } + + if (requestedStrategy === SUBSCRIBE_STRATEGY.BATCH) { + subscriber.batchCallbacks.push(callback); } else { - subscriber.callbacks.push(callback); + subscriber.latestCallbacks.push(callback); + } + + // Guarantees that view receive telemetry in the expected form + function invokeCallbackWithRequestedStrategy(data) { + invokeCallbacksWithArray(data, subscriber.batchCallbacks); + invokeCallbacksWithSingleValue(data, subscriber.latestCallbacks); } - function subscriptionCallbackForArray(value) { - if (value === undefined || value === null || value.length === 0) { + function invokeCallbacksWithArray(data, batchCallbacks) { + // + if (data === undefined || data === null || data.length === 0) { throw new Error( 'Attempt to invoke telemetry subscription callback with no telemetry datum' ); } - if (!Array.isArray(value)) { - value = [value]; + if (!Array.isArray(data)) { + data = [data]; } - subscriber.callbacks.forEach(function (cb) { - cb(value); + batchCallbacks.forEach((cb) => { + cb(data); }); } - function subscriptionCallbackForSingleValue(value) { - if (Array.isArray(value)) { - value = value[value.length - 1]; + function invokeCallbacksWithSingleValue(data, latestCallbacks) { + if (Array.isArray(data)) { + data = data[data.length - 1]; } - if (value === undefined || value === null) { + if (data === undefined || data === null) { throw new Error( 'Attempt to invoke telemetry subscription callback with no telemetry datum' ); } - subscriber.callbacks.forEach(function (cb) { - cb(value); + latestCallbacks.forEach((cb) => { + cb(data); }); } return function unsubscribe() { - subscriber.callbacks = subscriber.callbacks.filter(function (cb) { + console.error(subscriber); + subscriber.latestCallbacks = subscriber.latestCallbacks.filter(function (cb) { return cb !== callback; }); - if (subscriber.callbacks.length === 0) { + subscriber.batchCallbacks = subscriber.batchCallbacks.filter(function (cb) { + return cb !== callback; + }); + + if (subscriber.latestCallbacks.length === 0 && subscriber.batchCallbacks.length === 0) { subscriber.unsubscribe(); - delete this.subscribeCache[cacheKey]; + delete this.#subscribeCache[cacheKey]; } }.bind(this); } diff --git a/src/api/telemetry/TelemetryAPISpec.js b/src/api/telemetry/TelemetryAPISpec.js index 5f6d726d8ac..5cf221de516 100644 --- a/src/api/telemetry/TelemetryAPISpec.js +++ b/src/api/telemetry/TelemetryAPISpec.js @@ -402,10 +402,14 @@ describe('Telemetry API', () => { }); it('subscriptions with the batch strategy are always invoked with an array', () => { - const batchedCallback1 = jasmine.createSpy('batchedCallback1'); - telemetryAPI.subscribe(domainObject, batchedCallback1, { + const batchedCallback = jasmine.createSpy('batchedCallback1'); + const latestCallback = jasmine.createSpy('latestCallback1'); + telemetryAPI.subscribe(domainObject, batchedCallback, { strategy: 'batch' }); + telemetryAPI.subscribe(domainObject, latestCallback, { + strategy: 'latest' + }); const batchedValues = [1, 2, 3]; callbacks.forEach((cb) => { @@ -413,14 +417,17 @@ describe('Telemetry API', () => { }); // Callbacks for the 'batch' strategy are always called with an array of values - expect(batchedCallback1).toHaveBeenCalledWith(batchedValues); + expect(batchedCallback).toHaveBeenCalledWith(batchedValues); + // Callbacks for the 'latest' strategy are always called with a single value + expect(latestCallback).toHaveBeenCalledWith(3); - // Callbacks for the 'batch' strategy are always called with an array of values, even if there is only one value callbacks.forEach((cb) => { cb(1); }); - - expect(batchedCallback1).toHaveBeenCalledWith([1]); + // Callbacks for the 'batch' strategy are always called with an array of values, even if there is only one value + expect(batchedCallback).toHaveBeenCalledWith([1]); + // Callbacks for the 'latest' strategy are always called with a single value + expect(latestCallback).toHaveBeenCalledWith(1); }); it('legacy providers are left unchanged, with a single subscription', () => { @@ -430,13 +437,13 @@ describe('Telemetry API', () => { telemetryAPI.subscribe(domainObject, batchCallback, { strategy: 'batch' }); + expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest'); const latestCallback = jasmine.createSpy('latestCallback'); telemetryAPI.subscribe(domainObject, latestCallback, { strategy: 'latest' }); - expect(telemetryProvider.subscribe).toHaveBeenCalledTimes(1); expect(telemetryProvider.subscribe.calls.mostRecent().args[2].strategy).toBe('latest'); }); }); From b93647b23b03b7a8d0f3dac0f138e01b342e1874 Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 29 Jan 2024 13:10:11 -0800 Subject: [PATCH 19/20] Remove console out. Oops --- src/api/telemetry/TelemetryAPI.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/api/telemetry/TelemetryAPI.js b/src/api/telemetry/TelemetryAPI.js index 3a5585c031a..e6aa50bda68 100644 --- a/src/api/telemetry/TelemetryAPI.js +++ b/src/api/telemetry/TelemetryAPI.js @@ -503,7 +503,6 @@ export default class TelemetryAPI { } return function unsubscribe() { - console.error(subscriber); subscriber.latestCallbacks = subscriber.latestCallbacks.filter(function (cb) { return cb !== callback; }); From cdf549b024d8360c88e24abeffdc6daa90ccd57f Mon Sep 17 00:00:00 2001 From: Andrew Henry Date: Mon, 29 Jan 2024 13:44:06 -0800 Subject: [PATCH 20/20] Fix linting errors --- .cspell.json | 3 ++- src/api/telemetry/BatchingWebSocket.js | 2 +- src/api/telemetry/WebSocketWorker.js | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.cspell.json b/.cspell.json index 82e4d7df12e..31df39a88e2 100644 --- a/.cspell.json +++ b/.cspell.json @@ -493,7 +493,8 @@ "WCAG", "stackedplot", "Andale", - "checksnapshots" + "checksnapshots", + "specced" ], "dictionaries": ["npm", "softwareTerms", "node", "html", "css", "bash", "en_US"], "ignorePaths": [ diff --git a/src/api/telemetry/BatchingWebSocket.js b/src/api/telemetry/BatchingWebSocket.js index df2e7f98f07..204e6e9e707 100644 --- a/src/api/telemetry/BatchingWebSocket.js +++ b/src/api/telemetry/BatchingWebSocket.js @@ -145,7 +145,7 @@ class BatchingWebSocket extends EventTarget { * In this event it will sacrifice the oldest telemetry in the batch in favor of the * most recent telemetry. The user will be informed that telemetry has been dropped. * - * This should be specced appropriately for the expected data rate. eg. If telemetry + * This should be set appropriately for the expected data rate. eg. If telemetry * is received at 10Hz for each telemetry point, then a minimal combination of batch * size and rate is 10 and 1000 respectively. Ideally you would add some margin, so * 15 would probably be a better batch size. diff --git a/src/api/telemetry/WebSocketWorker.js b/src/api/telemetry/WebSocketWorker.js index 8ddefc50569..f09e7d12f7f 100644 --- a/src/api/telemetry/WebSocketWorker.js +++ b/src/api/telemetry/WebSocketWorker.js @@ -32,7 +32,7 @@ export default function installWorker() { * On an error or dropout, will automatically reconnect. * * Additionally, messages will be queued and sent only when WebSocket is - * connected meaning that client code does not need to check the state of + * connected meaning that client code does not need to check the state of * the socket before sending. */ class ResilientWebSocket extends EventTarget {