Skip to content

Commit 5eb19a0

Browse files
(op bunching) Added logic to process messages in a bunch in SharedObject (#23836)
This PR adds support for DDSes to process a 'bunch' of messages. Op bunching was added via the following PRs: #22839, #22840 and #22841. Added a `processMessagesCore` function to `SharedObjectCore` that DDSes can override to process a message bunch. This is marked optional today because it's a breaking change. It is similar to the `processCore` function and will replace it eventually. For now, `processCore` has been marked as deprecated. This change updates the relative ordering of the `pre-op` and `op` events emitted by the shared object. Previously, the `pre-op` event would be emitted before a message was processed and `op` event was emitted immediately afterwards. With this change, the ordering of these events w.r.t. to the message being processed is still the same. However, there may be other ops processed and other event fired in between. Note that the only guarantee these events provide is that the `pre-op` event is emitted before a message is procesed and the `op` event is processed after. So, this change doesn't break that guarantee.
1 parent 6d876ba commit 5eb19a0

File tree

4 files changed

+177
-11
lines changed

4 files changed

+177
-11
lines changed

.changeset/itchy-pants-brake.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
"@fluidframework/shared-object-base": minor
3+
---
4+
---
5+
"section": other
6+
---
7+
8+
Change when the `pre-op` and `op` events on `ISharedObjectEvents` are emitted
9+
10+
Previous behavior - `pre-op` was emitted immediately before an op was processed. Then the op was processed and `op` was emitted immediately after that.
11+
12+
New behavior - `pre-op` will still be emitted before an op is processed and `op` will still be emitted after an op is processed. However, these won't be immediate and other ops in a batch for the shared object may be processed in between.
13+
14+
Note that these events are for internal use only as mentioned in the @remarks section of their definition.

.changeset/witty-falcons-wonder.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
"@fluidframework/shared-object-base": minor
3+
---
4+
---
5+
"section": deprecation
6+
---
7+
8+
Deprecate `processCore` on `SharedObject` and `SharedObjectCore` in favor of `processMessagesCore`
9+
10+
A new function `processMessagesCore` has been added in place of `processCore`, which will be called to process multiple messages instead of a single one on the channel. This is part of a feature called "Op bunching" where contiguous ops in a grouped batch are bunched and processed together by the shared object.
11+
12+
Implementations of `SharedObject` and `SharedObjectCore` must now also implement `processMessagesCore`. A basic implementation could be to iterate over the messages' content and process them one by one as it happens now. Note that some DDS may be able to optimize processing by processing the messages together.

packages/dds/shared-object-base/api-report/shared-object-base.legacy.alpha.api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ export abstract class SharedObjectCore<TEvent extends ISharedObjectEvents = ISha
7878
protected newAckBasedPromise<T>(executor: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: unknown) => void) => void): Promise<T>;
7979
protected onConnect(): void;
8080
protected abstract onDisconnect(): void;
81+
// @deprecated
8182
protected abstract processCore(message: ISequencedDocumentMessage, local: boolean, localOpMetadata: unknown): void;
83+
protected processMessagesCore?(messagesCollection: IRuntimeMessageCollection): void;
8284
protected reSubmitCore(content: unknown, localOpMetadata: unknown): void;
8385
protected rollback(content: unknown, localOpMetadata: unknown): void;
8486
// (undocumented)

packages/dds/shared-object-base/src/sharedObject.ts

Lines changed: 149 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
blobCountPropertyName,
3434
totalBlobSizePropertyName,
3535
type IRuntimeMessageCollection,
36+
type IRuntimeMessagesContent,
3637
} from "@fluidframework/runtime-definitions/internal";
3738
import {
3839
toDeltaManagerInternal,
@@ -161,6 +162,20 @@ export abstract class SharedObjectCore<
161162
const { opProcessingHelper, callbacksHelper } = this.setUpSampledTelemetryHelpers();
162163
this.opProcessingHelper = opProcessingHelper;
163164
this.callbacksHelper = callbacksHelper;
165+
166+
const processMessagesCore = this.processMessagesCore?.bind(this);
167+
this.processMessagesHelper =
168+
processMessagesCore === undefined
169+
? (messagesCollection: IRuntimeMessageCollection) =>
170+
processHelper(messagesCollection, this.process.bind(this))
171+
: (messagesCollection: IRuntimeMessageCollection) => {
172+
processMessagesCoreHelper(
173+
messagesCollection,
174+
this.opProcessingHelper,
175+
this.emitInternal.bind(this),
176+
processMessagesCore,
177+
);
178+
};
164179
}
165180

166181
/**
@@ -384,13 +399,43 @@ export abstract class SharedObjectCore<
384399
* @param local - True if the shared object is local
385400
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
386401
* For messages from a remote client, this will be undefined.
402+
*
403+
* @deprecated Replaced by {@link SharedObjectCore.processMessagesCore}.
387404
*/
388405
protected abstract processCore(
389406
message: ISequencedDocumentMessage,
390407
local: boolean,
391408
localOpMetadata: unknown,
392409
): void;
393410

411+
/* eslint-disable jsdoc/check-indentation */
412+
/**
413+
* Process a 'bunch' of messages for this shared object.
414+
*
415+
* @remarks
416+
* A 'bunch' is a group of messages that have the following properties:
417+
* - They are all part of the same grouped batch, which entails:
418+
* - They are contiguous in sequencing order.
419+
* - They are all from the same client.
420+
* - They are all based on the same reference sequence number.
421+
* - They are not interleaved with messages from other clients.
422+
* - They are not interleaved with messages from other DDS in the container.
423+
* Derived classes should override this if they need to do custom processing on a 'bunch' of remote messages.
424+
* @param messageCollection - The collection of messages to process.
425+
*
426+
*/
427+
/* eslint-enable jsdoc/check-indentation */
428+
protected processMessagesCore?(messagesCollection: IRuntimeMessageCollection): void;
429+
430+
/**
431+
* Calls {@link SharedObjectCore.processCore} or {@link SharedObjectCore.processMessagesCore} depending on whether
432+
* processMessagesCore is defined. This helper is used to keep the code cleaner while we have to support both these
433+
* function.
434+
*/
435+
private readonly processMessagesHelper: (
436+
messagesCollection: IRuntimeMessageCollection,
437+
) => void;
438+
394439
/**
395440
* Called when the object has disconnected from the delta stream.
396441
*/
@@ -557,6 +602,8 @@ export abstract class SharedObjectCore<
557602
* @param local - Whether the message originated from the local client
558603
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
559604
* For messages from a remote client, this will be undefined.
605+
*
606+
* @deprecated Replaced by {@link SharedObjectCore.processMessages}.
560607
*/
561608
private process(
562609
message: ISequencedDocumentMessage,
@@ -582,23 +629,42 @@ export abstract class SharedObjectCore<
582629
this.emitInternal("op", message, local, this);
583630
}
584631

632+
/* eslint-disable jsdoc/check-indentation */
585633
/**
586-
* Process messages for this shared object. The messages here are contiguous messages for this object in a batch.
634+
* Process a bunch of messages for this shared object. A bunch is group of messages that have the following properties:
635+
* - They are all part of the same grouped batch, which entails:
636+
* - They are contiguous in sequencing order.
637+
* - They are all from the same client.
638+
* - They are all based on the same reference sequence number.
639+
* - They are not interleaved with messages from other clients.
640+
* - They are not interleaved with messages from other DDS in the container.
587641
* @param messageCollection - The collection of messages to process.
642+
*
588643
*/
644+
/* eslint-enable jsdoc/check-indentation */
589645
private processMessages(messagesCollection: IRuntimeMessageCollection): void {
590-
const { envelope, messagesContent, local } = messagesCollection;
591-
for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) {
592-
this.process(
593-
{
594-
...envelope,
595-
clientSequenceNumber,
596-
contents: parseHandles(contents, this.serializer),
597-
},
598-
local,
646+
this.verifyNotClosed(); // This will result in container closure.
647+
648+
// Decode any handles in the contents before processing the messages.
649+
const decodedMessagesContent: IRuntimeMessagesContent[] = [];
650+
for (const {
651+
contents,
652+
localOpMetadata,
653+
clientSequenceNumber,
654+
} of messagesCollection.messagesContent) {
655+
const decodedMessageContent: IRuntimeMessagesContent = {
656+
contents: parseHandles(contents, this.serializer),
599657
localOpMetadata,
600-
);
658+
clientSequenceNumber,
659+
};
660+
decodedMessagesContent.push(decodedMessageContent);
601661
}
662+
663+
const decodedMessagesCollection: IRuntimeMessageCollection = {
664+
...messagesCollection,
665+
messagesContent: decodedMessagesContent,
666+
};
667+
this.processMessagesHelper(decodedMessagesCollection);
602668
}
603669

604670
/**
@@ -943,3 +1009,75 @@ function isChannel(loadable: IFluidLoadable): loadable is IChannel {
9431009
// This assumes no other IFluidLoadable has an `attributes` field, and thus may not be fully robust.
9441010
return (loadable as IChannel).attributes !== undefined;
9451011
}
1012+
1013+
/**
1014+
* Utility that processes the given messages in the message collection together by calling `processMessagesCore`.
1015+
* This will be called when {@link SharedObjectCore.processMessagesCore} is defined.
1016+
*/
1017+
function processMessagesCoreHelper(
1018+
messagesCollection: IRuntimeMessageCollection,
1019+
opProcessingHelper: SampledTelemetryHelper<void, ProcessTelemetryProperties>,
1020+
emitInternal: (
1021+
event: "pre-op" | "op",
1022+
op: ISequencedDocumentMessage,
1023+
local: boolean,
1024+
) => void,
1025+
processMessagesCore: (messagesCollection: IRuntimeMessageCollection) => void,
1026+
): void {
1027+
const { envelope, local, messagesContent } = messagesCollection;
1028+
1029+
const emitEvents = (
1030+
event: "pre-op" | "op",
1031+
messagesContentForEvent: readonly IRuntimeMessagesContent[],
1032+
): void => {
1033+
for (const { contents, clientSequenceNumber } of messagesContentForEvent) {
1034+
const message: ISequencedDocumentMessage = {
1035+
...envelope,
1036+
contents,
1037+
clientSequenceNumber,
1038+
};
1039+
emitInternal(event, message, local);
1040+
}
1041+
};
1042+
1043+
emitEvents("pre-op", messagesContent);
1044+
opProcessingHelper.measure(
1045+
(): ICustomData<ProcessTelemetryProperties> => {
1046+
processMessagesCore(messagesCollection);
1047+
const telemetryProperties: ProcessTelemetryProperties = {
1048+
sequenceDifference: envelope.sequenceNumber - envelope.referenceSequenceNumber,
1049+
};
1050+
return {
1051+
customData: telemetryProperties,
1052+
};
1053+
},
1054+
local ? "local" : "remote",
1055+
);
1056+
emitEvents("op", messagesContent);
1057+
}
1058+
1059+
/**
1060+
* Utility that processes the given messages in the message collection one by one by calling `process`. This will
1061+
* be called when {@link SharedObjectCore.processMessagesCore} is not defined.
1062+
*/
1063+
function processHelper(
1064+
messagesCollection: IRuntimeMessageCollection,
1065+
process: (
1066+
message: ISequencedDocumentMessage,
1067+
local: boolean,
1068+
localOpMetadata: unknown,
1069+
) => void,
1070+
): void {
1071+
const { envelope, local, messagesContent } = messagesCollection;
1072+
for (const { contents, localOpMetadata, clientSequenceNumber } of messagesContent) {
1073+
process(
1074+
{
1075+
...envelope,
1076+
contents,
1077+
clientSequenceNumber,
1078+
},
1079+
local,
1080+
localOpMetadata,
1081+
);
1082+
}
1083+
}

0 commit comments

Comments
 (0)