Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: clear stale queues #1952

Draft
wants to merge 1 commit into
base: beta/events-queue-fix-opt-1
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -724,40 +724,6 @@ class RetryQueue implements IQueue<QueueItemData> {
}

checkReclaim() {
const createReclaimStartTask = (store: IStore) => () => {
if (store.get(QueueStatuses.RECLAIM_END) !== this.id) {
return;
}

if (store.get(QueueStatuses.RECLAIM_START) !== this.id) {
return;
}

this.reclaim(store.id);
};
const createReclaimEndTask = (store: IStore) => () => {
if (store.get(QueueStatuses.RECLAIM_START) !== this.id) {
return;
}

store.set(QueueStatuses.RECLAIM_END, this.id);

this.schedule.run(
createReclaimStartTask(store),
this.timeouts.reclaimWait,
ScheduleModes.ABANDON,
);
};
const tryReclaim = (store: IStore) => {
store.set(QueueStatuses.RECLAIM_START, this.id);
store.set(QueueStatuses.ACK, this.schedule.now());

this.schedule.run(
createReclaimEndTask(store),
this.timeouts.reclaimWait,
ScheduleModes.ABANDON,
);
};
const findOtherQueues = (name: string): IStore[] => {
const res: IStore[] = [];
const storageEngine = this.store.getOriginalEngine();
Expand Down Expand Up @@ -798,15 +764,8 @@ class RetryQueue implements IQueue<QueueItemData> {
return res;
};

findOtherQueues(this.name).forEach(store => {
if (this.schedule.now() - store.get(QueueStatuses.ACK) < this.timeouts.reclaimTimeout) {
return;
}

tryReclaim(store);
});

this.schedule.run(this.checkReclaim, this.timeouts.reclaimTimer, ScheduleModes.RESCHEDULE);
// Instead of reclaiming stale queues, clear them
findOtherQueues(this.name).forEach(store => this.clearQueueEntries(store, 0));
Comment on lines +767 to +768
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential data loss by clearing stale queues instead of reclaiming

Changing the checkReclaim method to clear stale queues (clearQueueEntries) rather than reclaiming them could lead to loss of unprocessed queue items, resulting in possible data loss.

Consider evaluating whether it's acceptable to discard these items. If not, you might need to implement a mechanism to reclaim unprocessed items from stale queues to ensure data integrity.

}

clear() {
Expand Down
Loading