Skip to content

Commit 77cd69b

Browse files
committed
Implement createEmptyCheckpoints filter for mongodb storage.
1 parent 8a55aa1 commit 77cd69b

File tree

2 files changed

+70
-43
lines changed

2 files changed

+70
-43
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 68 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -680,27 +680,57 @@ export class MongoBucketBatch
680680
}
681681
);
682682

683-
const updateResult = await this.db.sync_rules.findOneAndUpdate(
684-
{
685-
_id: this.group_id
686-
},
683+
const can_checkpoint = {
684+
$and: [
685+
{ $eq: ['$snapshot_done', true] },
686+
{
687+
$or: [{ $eq: ['$last_checkpoint_lsn', null] }, { $lte: ['$last_checkpoint_lsn', { $literal: lsn }] }]
688+
},
689+
{
690+
$or: [{ $eq: ['$no_checkpoint_before', null] }, { $lte: ['$no_checkpoint_before', { $literal: lsn }] }]
691+
}
692+
]
693+
};
694+
695+
const new_keepalive_op = {
696+
$cond: [
697+
can_checkpoint,
698+
{ $literal: null },
699+
{
700+
$toString: {
701+
$max: [{ $toLong: '$keepalive_op' }, { $literal: this.persisted_op }]
702+
}
703+
}
704+
]
705+
};
706+
707+
const new_last_checkpoint = {
708+
$cond: [
709+
can_checkpoint,
710+
{
711+
$max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }]
712+
},
713+
'$last_checkpoint'
714+
]
715+
};
716+
717+
let filter: mongo.Filter<SyncRuleDocument> = { _id: this.group_id };
718+
if (!createEmptyCheckpoints) {
719+
// Only create checkpoint if we have new data
720+
filter = {
721+
_id: this.group_id,
722+
$expr: {
723+
$or: [{ $ne: ['$keepalive_op', new_keepalive_op] }, { $ne: ['$last_checkpoint', new_last_checkpoint] }]
724+
}
725+
};
726+
}
727+
728+
let updateResult = await this.db.sync_rules.findOneAndUpdate(
729+
filter,
687730
[
688731
{
689732
$set: {
690-
_can_checkpoint: {
691-
$and: [
692-
{ $eq: ['$snapshot_done', true] },
693-
{
694-
$or: [{ $eq: ['$last_checkpoint_lsn', null] }, { $lte: ['$last_checkpoint_lsn', { $literal: lsn }] }]
695-
},
696-
{
697-
$or: [
698-
{ $eq: ['$no_checkpoint_before', null] },
699-
{ $lte: ['$no_checkpoint_before', { $literal: lsn }] }
700-
]
701-
}
702-
]
703-
}
733+
_can_checkpoint: can_checkpoint
704734
}
705735
},
706736
{
@@ -715,28 +745,8 @@ export class MongoBucketBatch
715745
last_fatal_error: {
716746
$cond: ['$_can_checkpoint', { $literal: null }, '$last_fatal_error']
717747
},
718-
keepalive_op: {
719-
$cond: [
720-
'$_can_checkpoint',
721-
// Checkpoint: set to null
722-
{ $literal: null },
723-
// Not checkpoint: update to max of existing keepalive_op and persisted_op
724-
{
725-
$toString: {
726-
$max: [{ $toLong: '$keepalive_op' }, { $literal: this.persisted_op }]
727-
}
728-
}
729-
]
730-
},
731-
last_checkpoint: {
732-
$cond: [
733-
'$_can_checkpoint',
734-
{
735-
$max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }]
736-
},
737-
'$last_checkpoint'
738-
]
739-
},
748+
keepalive_op: new_keepalive_op,
749+
last_checkpoint: new_last_checkpoint,
740750
// Unset snapshot_lsn on checkpoint
741751
snapshot_lsn: {
742752
$cond: ['$_can_checkpoint', { $literal: null }, '$snapshot_lsn']
@@ -759,7 +769,24 @@ export class MongoBucketBatch
759769
}
760770
);
761771
if (updateResult == null) {
762-
throw new ReplicationAssertionError('Failed to load sync_rules document during checkpoint update');
772+
const existing = await this.db.sync_rules.findOne(
773+
{ _id: this.group_id },
774+
{
775+
session: this.session,
776+
projection: {
777+
snapshot_done: 1,
778+
last_checkpoint_lsn: 1,
779+
no_checkpoint_before: 1,
780+
keepalive_op: 1
781+
}
782+
}
783+
);
784+
if (existing == null) {
785+
throw new ReplicationAssertionError('Failed to load sync_rules document during checkpoint update');
786+
}
787+
// No-op update - reuse existing document for downstream logic.
788+
// This can happen when last_checkpoint and keepalive_op would remain unchanged.
789+
updateResult = existing;
763790
}
764791
const checkpointCreated = updateResult.snapshot_done === true && updateResult.last_checkpoint_lsn === lsn;
765792

packages/service-core-tests/src/tests/register-data-storage-data-tests.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,7 +1180,7 @@ bucket_definitions:
11801180

11811181
testChecksumBatching(generateStorageFactory);
11821182

1183-
test.only('empty checkpoints (1)', async () => {
1183+
test('empty checkpoints (1)', async () => {
11841184
await using factory = await generateStorageFactory();
11851185
const syncRules = await factory.updateSyncRules({
11861186
content: `
@@ -1215,7 +1215,7 @@ bucket_definitions:
12151215
});
12161216
});
12171217

1218-
test.only('empty checkpoints (2)', async () => {
1218+
test('empty checkpoints (2)', async () => {
12191219
await using factory = await generateStorageFactory();
12201220
const syncRules = await factory.updateSyncRules({
12211221
content: `

0 commit comments

Comments
 (0)