Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support telemetry batching and move WebSocket handling to worker #7391

Merged
merged 26 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
69b2f05
Support subscription batching from API, Tables, and Plots
akhenry Jan 9, 2024
a87ffee
Added batching worker
akhenry Jan 11, 2024
2f2af0b
Added configurable batch size and throttling rate
akhenry Jan 13, 2024
0061d16
Support batch size based throttling
akhenry Jan 16, 2024
74c1cdf
Default to latest strategy
akhenry Jan 16, 2024
e530fd8
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 17, 2024
c28ced5
Don't hide original error
akhenry Jan 18, 2024
947810b
Added copyright statement
akhenry Jan 19, 2024
f079c3a
Renamed BatchingWebSocketProvider to BatchingWebSocket
akhenry Jan 19, 2024
4c86b66
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 19, 2024
61edd0f
Adding docs
akhenry Jan 19, 2024
2e3dc4d
renamed class. changed throttling strategy to be driven by the main t…
akhenry Jan 21, 2024
0bdd5ef
Renamed classes
akhenry Jan 21, 2024
ad02ba7
Added more documentation
akhenry Jan 22, 2024
c4a3ace
Fixed broken tests
akhenry Jan 23, 2024
e135498
Addressed review comments
akhenry Jan 24, 2024
34b36d5
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 24, 2024
12b921a
Clean up and reconnect on websocket close
akhenry Jan 24, 2024
a6c4a45
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
6a04597
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 26, 2024
3f44613
Better management of subscription strategies
akhenry Jan 28, 2024
c4f83c1
Add tests to catch edge cases where two subscribers request different…
akhenry Jan 28, 2024
2cfd729
Ensure callbacks are invoked with telemetry in the requested format
akhenry Jan 29, 2024
a6bb0da
Merge branch 'master' into subscriptions-support-array-callbacks
akhenry Jan 29, 2024
b93647b
Remove console out. Oops
akhenry Jan 29, 2024
cdf549b
Fix linting errors
akhenry Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/api/telemetry/BatchingWebSocketProvider.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*****************************************************************************
* Open MCT Web, Copyright (c) 2014-2024, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT Web is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT Web includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/

import installWorker from './WebSocketWorker.js';

class BatchingWebSocketProvider extends EventTarget {
#worker;
#openmct;
#showingRateLimitNotification;

constructor(openmct) {
super();

Check warning on line 31 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L31

Added line #L31 was not covered by tests
// Install worker, register listeners etc.
const workerFunction = `(${installWorker.toString()})()`;
const workerBlob = new Blob([workerFunction]);
const workerUrl = URL.createObjectURL(workerBlob, { type: 'application/javascript' });
this.#worker = new Worker(workerUrl);
this.#openmct = openmct;
this.#showingRateLimitNotification = false;

Check warning on line 38 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L33-L38

Added lines #L33 - L38 were not covered by tests

this.routeMessageToHandler = this.routeMessageToHandler.bind(this);
this.#worker.addEventListener('message', this.routeMessageToHandler);

Check warning on line 41 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L40-L41

Added lines #L40 - L41 were not covered by tests
}

connect(url) {
this.#worker.postMessage({

Check warning on line 45 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L45

Added line #L45 was not covered by tests
type: 'connect',
url
});
}

disconnect() {
this.#worker.postMessage({ type: 'disconnect' });

Check warning on line 52 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L52

Added line #L52 was not covered by tests
}

sendMessage(message) {
this.#worker.postMessage({

Check warning on line 56 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L56

Added line #L56 was not covered by tests
type: 'message',
message
});
}

setBatchingStrategy(strategy) {
const serializedStrategy = {

Check warning on line 63 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L63

Added line #L63 was not covered by tests
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};

this.#worker.postMessage({

Check warning on line 68 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L68

Added line #L68 was not covered by tests
type: 'setBatchingStrategy',
serializedStrategy
});
}

setRate(rate) {
this.#worker.postMessage({

Check warning on line 75 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L75

Added line #L75 was not covered by tests
type: 'setRate',
rate
});
}

setMaxBatchSize(maxBatchSize) {
this.#worker.postMessage({

Check warning on line 82 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L82

Added line #L82 was not covered by tests
type: 'setMaxBatchSize',
maxBatchSize
});
}

routeMessageToHandler(message) {
// Batch message would need to be handle differently here
if (message.data.type === 'batch') {
if (message.data.batch.dropped === true && !this.#showingRateLimitNotification) {
const notification = this.#openmct.notifications.alert(

Check warning on line 92 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L90-L92

Added lines #L90 - L92 were not covered by tests
'Telemetry dropped due to client rate limiting.',
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
);
this.#showingRateLimitNotification = true;
notification.once('minimized', () => {
this.#showingRateLimitNotification = false;

Check warning on line 98 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L96-L98

Added lines #L96 - L98 were not covered by tests
});
return;

Check warning on line 100 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L100

Added line #L100 was not covered by tests
}
this.dispatchEvent(new CustomEvent('batch', { detail: message.data.batch }));
} else if (message.data.type === 'message') {
this.dispatchEvent(new CustomEvent('message', { detail: message.data.message }));

Check warning on line 104 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L102-L104

Added lines #L102 - L104 were not covered by tests
} else {
throw new Error(`Unknown message type: ${message.data.type}`);

Check warning on line 106 in src/api/telemetry/BatchingWebSocketProvider.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/BatchingWebSocketProvider.js#L106

Added line #L106 was not covered by tests
}
}
}

export default BatchingWebSocketProvider;
58 changes: 50 additions & 8 deletions src/api/telemetry/TelemetryAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import objectUtils from 'objectUtils';

import CustomStringFormatter from '../../plugins/displayLayout/CustomStringFormatter.js';
import BatchingWebSocketProvider from './BatchingWebSocketProvider.js';
import DefaultMetadataProvider from './DefaultMetadataProvider.js';
import TelemetryCollection from './TelemetryCollection.js';
import TelemetryMetadataManager from './TelemetryMetadataManager.js';
Expand Down Expand Up @@ -54,6 +55,23 @@
* @memberof module:openmct.TelemetryAPI~
*/

/**
* Describes and bounds requests for telemetry data.
*
* @typedef TelemetrySubscriptionOptions
* @property {String} [strategy] symbolic identifier directing providers on how
scottbell marked this conversation as resolved.
Show resolved Hide resolved
* to handle telemetry subscriptions. The default behavior is 'latest' which will
* always return a single telemetry value with each callback, and in the event
* of throttling will always prioritize the latest data, meaning intermediate
* data will be skipped. Alternatively, the `batch` strategy can be used, which
* will return all telemetry values since the last callback. This strategy is
* useful for cases where intermediate data is important, such as when
* rendering a telemetry plot or table. If `batch` is specified, the subscription
* callback will be invoked with an Array.
*
* @memberof module:openmct.TelemetryAPI~
*/

/**
* Utilities for telemetry
* @interface TelemetryAPI
Expand All @@ -78,6 +96,7 @@
this.valueFormatterCache = new WeakMap();
this.requestInterceptorRegistry = new TelemetryRequestInterceptorRegistry();
this.#isGreedyLAD = true;
this.BatchingWebSocketProvider = BatchingWebSocketProvider;
}

abortAllRequests() {
Expand Down Expand Up @@ -378,18 +397,23 @@
* @memberof module:openmct.TelemetryAPI~TelemetryProvider#
* @param {module:openmct.DomainObject} domainObject the object
* which has associated telemetry
* @param {TelemetryRequestOptions} options configuration items for subscription
* @param {TelemetrySubscriptionOptions} options configuration items for subscription
* @param {Function} callback the callback to invoke with new data, as
* it becomes available
* @returns {Function} a function which may be called to terminate
* the subscription
*/
subscribe(domainObject, callback, options) {
subscribe(domainObject, callback, options = { strategy: 'latest' }) {
if (domainObject.type === 'unknown') {
return () => {};
}

const provider = this.findSubscriptionProvider(domainObject);
// Default behavior is to use the latest strategy, as opposed to the new "batch" strategy
if (options.strategy === undefined) {
scottbell marked this conversation as resolved.
Show resolved Hide resolved
options.strategy = 'latest';
}

const provider = this.findSubscriptionProvider(domainObject, options);

if (!this.subscribeCache) {
this.subscribeCache = {};
Expand All @@ -405,11 +429,9 @@
if (provider) {
subscriber.unsubscribe = provider.subscribe(
domainObject,
function (value) {
subscriber.callbacks.forEach(function (cb) {
cb(value);
});
},
options.strategy === 'batch'
? subscriptionCallbackForArray
: subscriptionCallbackForSingleValue,
options
);
} else {
Expand All @@ -419,6 +441,26 @@
subscriber.callbacks.push(callback);
}

function subscriptionCallbackForArray(value) {
if (!Array.isArray(value)) {
value = [value];
}

subscriber.callbacks.forEach(function (cb) {
cb(value);
});
}

function subscriptionCallbackForSingleValue(value) {
if (Array.isArray(value)) {
value = value[value.length - 1];

Check warning on line 456 in src/api/telemetry/TelemetryAPI.js

View check run for this annotation

Codecov / codecov/patch

src/api/telemetry/TelemetryAPI.js#L456

Added line #L456 was not covered by tests
scottbell marked this conversation as resolved.
Show resolved Hide resolved
}

subscriber.callbacks.forEach(function (cb) {
cb(value);
});
}

return function unsubscribe() {
subscriber.callbacks = subscriber.callbacks.filter(function (cb) {
return cb !== callback;
Expand Down
5 changes: 4 additions & 1 deletion src/api/telemetry/TelemetryCollection.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,14 @@ export default class TelemetryCollection extends EventEmitter {
if (this.unsubscribe) {
this.unsubscribe();
}
const options = { ...this.options };
//We always want to receive all available values in telemetry tables.
options.strategy = 'batch';

this.unsubscribe = this.openmct.telemetry.subscribe(
this.domainObject,
(datum) => this._processNewTelemetry(datum),
this.options
options
);
}

Expand Down
Loading
Loading