Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support telemetry batching and move WebSocket handling to worker #7391

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
69b2f05
Support subscription batching from API, Tables, and Plots
akhenry Jan 9, 2024
a87ffee
Added batching worker
akhenry Jan 11, 2024
2f2af0b
Added configurable batch size and throttling rate
akhenry Jan 13, 2024
0061d16
Support batch size based throttling
akhenry Jan 16, 2024
74c1cdf
Default to latest strategy
akhenry Jan 16, 2024
e530fd8
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 17, 2024
c28ced5
Don't hide original error
akhenry Jan 18, 2024
947810b
Added copyright statement
akhenry Jan 19, 2024
f079c3a
Renamed BatchingWebSocketProvider to BatchingWebSocket
akhenry Jan 19, 2024
4c86b66
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 19, 2024
61edd0f
Adding docs
akhenry Jan 19, 2024
2e3dc4d
renamed class. changed throttling strategy to be driven by the main t…
akhenry Jan 21, 2024
0bdd5ef
Renamed classes
akhenry Jan 21, 2024
ad02ba7
Added more documentation
akhenry Jan 22, 2024
c4a3ace
Fixed broken tests
akhenry Jan 23, 2024
e135498
Addressed review comments
akhenry Jan 24, 2024
34b36d5
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 24, 2024
12b921a
Clean up and reconnect on websocket close
akhenry Jan 24, 2024
a6c4a45
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
6a04597
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
3f44613
Better management of subscription strategies
akhenry Jan 28, 2024
c4f83c1
Add tests to catch edge cases where two subscribers request different…
akhenry Jan 28, 2024
2cfd729
Ensure callbacks are invoked with telemetry in the requested format
akhenry Jan 29, 2024
a6bb0da
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 29, 2024
b93647b
Remove console out. Oops
akhenry Jan 29, 2024
cdf549b
Fix linting errors
akhenry Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions src/api/telemetry/BatchingWebSocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*****************************************************************************
* Open MCT Web, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT Web includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import installWorker from './WebSocketWorker.js';
const DEFAULT_RATE_MS = 1000;
/**
* Describes the strategy to be used when batching WebSocket messages
*
* @typedef BatchingStrategy
* @property {Function} shouldBatchMessage a function that accepts a single
* argument - the raw message received from the websocket. Every message
* received will be evaluated against this function so it should be performant.
* Note also that this function is executed in a worker, so it must be
* completely self-contained with no external dependencies. The function
* should return `true` if the message should be batched, and `false` if not.
* @property {Function} getBatchIdFromMessage a function that accepts a
* single argument - the raw message received from the websocket. Only messages
* where `shouldBatchMessage` has evaluated to true will be passed into this
* function. The function should return a unique value on which to batch the
* messages. For example a telemetry, channel, or parameter identifier.
*/
/**
* Provides a reliable and convenient WebSocket abstraction layer that handles
* a lot of boilerplate common to managing WebSocket connections such as:
* - Establishing a WebSocket connection to a server
* - Reconnecting on error, with a fallback strategy
* - Queuing messages so that clients can send messages without concern for the current
* connection state of the WebSocket.
*
* The WebSocket that it manages is based in a dedicated worker so that network
* concerns are not handled on the main event loop. This allows for performant receipt
* and batching of messages without blocking either the UI or server.
*
* @memberof module:openmct.telemetry
*/
class BatchingWebSocket extends EventTarget {
#worker;
#openmct;
#showingRateLimitNotification;
#rate;

constructor(openmct) {
super();

Check warning on line 61 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L61

Added line #L61 was not covered by tests
// Install worker, register listeners etc.
const workerFunction = `(${installWorker.toString()})()`;
const workerBlob = new Blob([workerFunction]);
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
this.#worker = new Worker(workerUrl);
this.#openmct = openmct;
this.#showingRateLimitNotification = false;
this.#rate = DEFAULT_RATE_MS;

Check warning on line 69 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L63-L69

Added lines #L63 - L69 were not covered by tests

const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
this.#worker.addEventListener('message', routeMessageToHandler);
openmct.on(

Check warning on line 73 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L71-L73

Added lines #L71 - L73 were not covered by tests
'destroy',
() => {
this.disconnect();

Check warning on line 76 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L76

Added line #L76 was not covered by tests
scottbell marked this conversation as resolved.
Show resolved Hide resolved
},
{ once: true }
);
}

/**
* Will establish a WebSocket connection to the provided url
* @param {string} url The URL to connect to
*/
connect(url) {
this.#worker.postMessage({

Check warning on line 87 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L87

Added line #L87 was not covered by tests
type: 'connect',
url
});

this.#readyForNextBatch();

Check warning on line 92 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L92

Added line #L92 was not covered by tests
}

#readyForNextBatch() {
this.#worker.postMessage({

Check warning on line 96 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L96

Added line #L96 was not covered by tests
type: 'readyForNextBatch'
});
}

/**
* Send a message to the WebSocket.
* @param {any} message The message to send. Can be any type supported by WebSockets.
* See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/send#data
*/
sendMessage(message) {
this.#worker.postMessage({

Check warning on line 107 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L107

Added line #L107 was not covered by tests
type: 'message',
message
});
}

/**
* Set the strategy used to both decide which raw messages to batch, and how to group
* them.
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
* raw messages from the WebSocket.
*/
setBatchingStrategy(strategy) {
const serializedStrategy = {

Check warning on line 120 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L120

Added line #L120 was not covered by tests
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};

this.#worker.postMessage({

Check warning on line 125 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L125

Added line #L125 was not covered by tests
type: 'setBatchingStrategy',
serializedStrategy
});
}

/**
* When using batching, sets the rate at which batches of messages are released.
* @param {Number} rate the amount of time to wait, in ms, between batches.
*/
setRate(rate) {
this.#rate = rate;

Check warning on line 136 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L136

Added line #L136 was not covered by tests
}

/**
* @param {Number} maxBatchSize the maximum length of a batch of messages. For example,
* the maximum number of telemetry values to batch before dropping them
* Note that this is a fail-safe that is only invoked if performance drops to the
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
* most recent telemetry. The user will be informed that telemetry has been dropped.
*
* This should be specced appropriately for the expected data rate. eg. If telemetry
* is received at 10Hz for each telemetry point, then a minimal combination of batch
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
* 15 would probably be a better batch size.
*/
setMaxBatchSize(maxBatchSize) {
this.#worker.postMessage({

Check warning on line 153 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L153

Added line #L153 was not covered by tests
type: 'setMaxBatchSize',
maxBatchSize
});
}

/**
* Disconnect the associated WebSocket. Generally speaking there is no need to call
* this manually.
*/
disconnect() {
this.#worker.postMessage({

Check warning on line 164 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L164

Added line #L164 was not covered by tests
type: 'disconnect'
});
}

#routeMessageToHandler(message) {
// Batch message would need to be handle differently here
scottbell marked this conversation as resolved.
Show resolved Hide resolved
if (message.data.type === 'batch') {
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
const notification = this.#openmct.notifications.alert(

Check warning on line 173 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L171-L173

Added lines #L171 - L173 were not covered by tests
scottbell marked this conversation as resolved.
Show resolved Hide resolved
'Telemetry dropped due to client rate limiting.',
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
);
this.#showingRateLimitNotification = true;
notification.once('minimized', () => {
this.#showingRateLimitNotification = false;

Check warning on line 179 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L177-L179

Added lines #L177 - L179 were not covered by tests
});
}
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
setTimeout(() => {
this.#readyForNextBatch();

Check warning on line 184 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L182-L184

Added lines #L182 - L184 were not covered by tests
}, this.#rate);
} else if (message.data.type === 'message') {
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));

Check warning on line 187 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L186-L187

Added lines #L186 - L187 were not covered by tests
} else {
throw new Error(`Unknown message type: ${message.data.type}`);

Check warning on line 189 in src/api/telemetry/BatchingWebSocket.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocket.js#L189

Added line #L189 was not covered by tests
}
}
}

export default BatchingWebSocket;
58 changes: 50 additions & 8 deletions src/api/telemetry/TelemetryAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import objectUtils from 'objectUtils';

import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
import BatchingWebSocket from './BatchingWebSocket.js';
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
import TelemetryCollection from './TelemetryCollection.js';
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
Expand Down Expand Up @@ -54,6 +55,23 @@
* @memberof module:openmct.TelemetryAPI~
*/

/**
* Describes and bounds requests for telemetry data.
*
* @typedef TelemetrySubscriptionOptions
* @property {String} [strategy] symbolic identifier directing providers on how
scottbell marked this conversation as resolved.
Show resolved Hide resolved
* to handle telemetry subscriptions. The default behavior is 'latest' which will
* always return a single telemetry value with each callback, and in the event
* of throttling will always prioritize the latest data, meaning intermediate
* data will be skipped. Alternatively, the `batch` strategy can be used, which
* will return all telemetry values since the last callback. This strategy is
* useful for cases where intermediate data is important, such as when
* rendering a telemetry plot or table. If `batch` is specified, the subscription
* callback will be invoked with an Array.
*
* @memberof module:openmct.TelemetryAPI~
*/

/**
* Utilities for telemetry
* @interface TelemetryAPI
Expand All @@ -78,6 +96,7 @@
this.valueFormatterCache = new WeakMap();
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
this.#isGreedyLAD = true;
this.BatchingWebSocket = BatchingWebSocket;
}

abortAllRequests() {
Expand Down Expand Up @@ -378,18 +397,23 @@
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
* @param {module:openmct.DomainObject} domainObject the object
* which has associated telemetry
* @param {TelemetryRequestOptions} options configuration items for subscription
* @param {TelemetrySubscriptionOptions} options configuration items for subscription
* @param {Function} callback the callback to invoke with new data, as
* it becomes available
* @returns {Function} a function which may be called to terminate
* the subscription
*/
subscribe(domainObject, callback, options) {
subscribe(domainObject, callback, options = { strategy: 'latest' }) {
if (domainObject.type === 'unknown') {
return () => {};
}

const provider = this.findSubscriptionProvider(domainObject);
// Default behavior is to use the latest strategy, as opposed to the new "batch" strategy
if (options.strategy === undefined) {
scottbell marked this conversation as resolved.
Show resolved Hide resolved
options.strategy = 'latest';
}

const provider = this.findSubscriptionProvider(domainObject, options);

if (!this.subscribeCache) {
this.subscribeCache = {};
Expand All @@ -405,11 +429,9 @@
if (provider) {
subscriber.unsubscribe = provider.subscribe(
domainObject,
function (value) {
subscriber.callbacks.forEach(function (cb) {
cb(value);
});
},
options.strategy === 'batch'
? subscriptionCallbackForArray
: subscriptionCallbackForSingleValue,
options
);
} else {
Expand All @@ -419,6 +441,26 @@
subscriber.callbacks.push(callback);
}

function subscriptionCallbackForArray(value) {
if (!Array.isArray(value)) {
value = [value];
}

subscriber.callbacks.forEach(function (cb) {
cb(value);
});
}

function subscriptionCallbackForSingleValue(value) {
if (Array.isArray(value)) {
value = value[value.length - 1];

Check warning on line 456 in src/api/telemetry/TelemetryAPI.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/TelemetryAPI.js#L456

Added line #L456 was not covered by tests
scottbell marked this conversation as resolved.
Show resolved Hide resolved
}

subscriber.callbacks.forEach(function (cb) {
cb(value);
});
}

return function unsubscribe() {
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
return cb !== callback;
Expand Down
12 changes: 9 additions & 3 deletions src/api/telemetry/TelemetryAPISpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ describe('Telemetry API', () => {

const callback = jasmine.createSpy('callback');
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
strategy: 'latest'
});
expect(telemetryProvider.subscribe).not.toHaveBeenCalled();
expect(unsubscribe).toEqual(jasmine.any(Function));

Expand All @@ -111,12 +113,16 @@ describe('Telemetry API', () => {
const callback = jasmine.createSpy('callback');
const unsubscribe = telemetryAPI.subscribe(domainObject, callback);
expect(telemetryProvider.supportsSubscribe.calls.count()).toBe(1);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject);
expect(telemetryProvider.supportsSubscribe).toHaveBeenCalledWith(domainObject, {
strategy: 'latest'
});
expect(telemetryProvider.subscribe.calls.count()).toBe(1);
expect(telemetryProvider.subscribe).toHaveBeenCalledWith(
domainObject,
jasmine.any(Function),
undefined
{
strategy: 'latest'
}
);

const notify = telemetryProvider.subscribe.calls.mostRecent().args[1];
Expand Down
5 changes: 4 additions & 1 deletion src/api/telemetry/TelemetryCollection.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter {
if (this.unsubscribe) {
this.unsubscribe();
}
const options = { ...this.options };
//We always want to receive all available values in telemetry tables.
options.strategy = 'batch';

this.unsubscribe = this.openmct.telemetry.subscribe(
this.domainObject,
(datum) => this._processNewTelemetry(datum),
this.options
options
);
}

Expand Down
Loading
Loading