-
Notifications
You must be signed in to change notification settings - Fork 19
chore: limit staging area processing by 10 tries #278
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a per-event numeric Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant SA as StagingAreaService
participant Repo as EventStagingRepository
participant Lock as LockManager
participant Proc as EventProcessor
participant Store as EventStore
rect rgba(230,240,255,0.5)
note over SA,Repo: Acquire room lock
SA->>Lock: acquire(roomId)
end
loop Until no more events
SA->>Repo: findOneAndUpdate(roomId)<br/>sort: got, depth, createdAt<br/>inc got by 1
Repo-->>SA: event (pre-update) or null
alt No event
SA->>Lock: release(roomId)
note over SA: Exit loop
else Event found
SA->>Lock: touch(roomId) %% refresh lock timestamp
alt event.got > MAX_EVENT_RETRY
SA->>Store: unstage(event)
note over SA: Skip over-retried event
else Within retry limit
SA->>Proc: resolve deps / auth / convert PDU / notify
alt Success
SA->>Store: unstage(event)
else Missing deps/auth/partial state
SA->>Store: postpone(event)
else Other error
SA->>Store: postpone(event)
end
end
end
end
SA->>Lock: release(roomId)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #278 +/- ##
=======================================
Coverage 60.46% 60.46%
=======================================
Files 67 67
Lines 6673 6673
=======================================
Hits 4035 4035
Misses 2638 2638 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
61084b6 to
b8f2baf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/federation-sdk/src/server-discovery/discovery.ts (1)
40-40: Significant cache duration reduction.The default cache age has been reduced from 1 hour (3,600,000 ms) to 5 minutes (300,000 ms), a 12x decrease. This will result in:
- More frequent DNS lookups and
.well-knownendpoint requests- Fresher server discovery data
- Potentially increased load on discovery infrastructure
Ensure this change aligns with your operational requirements and that the infrastructure can handle the increased request rate.
packages/federation-sdk/src/services/staging-area.service.ts (1)
151-151: Minor: Consider consistency in error messages.The error messages at Lines 151 and 156 use slightly different phrasing ("Added missing events, postponing" vs "Error processing event, postponing"). While both are clear, consider using a consistent format across all postponement scenarios for easier log parsing.
Also applies to: 156-156
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
packages/core/src/models/event.model.ts(1 hunks)packages/federation-sdk/src/repositories/event-staging.repository.ts(3 hunks)packages/federation-sdk/src/server-discovery/discovery.ts(1 hunks)packages/federation-sdk/src/services/staging-area.service.ts(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/federation-sdk/src/services/staging-area.service.ts (1)
packages/core/src/models/event.model.ts (1)
EventStagingStore(40-44)
🔇 Additional comments (7)
packages/core/src/models/event.model.ts (1)
42-42: LGTM!The addition of the
gotfield to track retry attempts is straightforward and aligns with the PR objective to limit staging area processing.packages/federation-sdk/src/repositories/event-staging.repository.ts (3)
13-18: LGTM!The updated index correctly prioritizes events by retry count (
got), then depth, then creation time, supporting the new retry limit logic.
38-38: LGTM!Initializing
gotto 0 on insert is correct, establishing the baseline for retry counting.
52-64: LGTM!The atomic increment using
findOneAndUpdatewithreturnDocument: 'before'is thread-safe and correctly returns the document before the increment, which is essential for the retry limit check in the service layer.packages/federation-sdk/src/services/staging-area.service.ts (3)
29-39: LGTM!The
MAX_EVENT_RETRYconfiguration follows the same pattern as other environment variable parsing in the codebase, with proper validation and a sensible default of 10.
93-100: LGTM!The refactored do-while pattern with explicit null checking improves code clarity and correctly handles the case when no staged events remain.
113-117: Excellent addition for batch processing reliability.Updating the lock timestamp before processing each event prevents lock timeouts during long-running batch operations, ensuring that the lock isn't prematurely released while actively processing events.
| if (event.got > MAX_EVENT_RETRY) { | ||
| this.logger.warn( | ||
| `Event ${event._id} has been tried more than 10 times, removing from staging area`, | ||
| ); | ||
| await this.eventService.markEventAsUnstaged(event); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential off-by-one error in retry limit.
With got starting at 0 and findOneAndUpdate returning the document before the increment (due to returnDocument: 'before'), the current comparison event.got > MAX_EVENT_RETRY allows 11 processing attempts (when got is 0 through 10) instead of 10.
If the intent is to limit processing to exactly MAX_EVENT_RETRY attempts, change the comparison to >=:
- if (event.got > MAX_EVENT_RETRY) {
+ if (event.got >= MAX_EVENT_RETRY) {Additionally, the log message hardcodes "10" and should use the variable:
this.logger.warn(
- `Event ${event._id} has been tried more than 10 times, removing from staging area`,
+ `Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`,
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (event.got > MAX_EVENT_RETRY) { | |
| this.logger.warn( | |
| `Event ${event._id} has been tried more than 10 times, removing from staging area`, | |
| ); | |
| await this.eventService.markEventAsUnstaged(event); | |
| continue; | |
| } | |
| if (event.got >= MAX_EVENT_RETRY) { | |
| this.logger.warn( | |
| `Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`, | |
| ); | |
| await this.eventService.markEventAsUnstaged(event); | |
| continue; | |
| } |
b8f2baf to
7887e2f
Compare
7887e2f to
5e24d2d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
packages/federation-sdk/src/services/staging-area.service.ts (1)
102-108: Fix the off-by-one error in retry limit.With
gotstarting at 0 andfindOneAndUpdatereturning the document before the increment (due toreturnDocument: 'before'), the current comparisonevent.got > MAX_EVENT_RETRYallows 11 processing attempts instead of 10:
- 1st attempt: got=0, check 0 > 10 (false), process, increment to 1
- 2nd attempt: got=1, check 1 > 10 (false), process, increment to 2
- ...
- 11th attempt: got=10, check 10 > 10 (false), process, increment to 11
- 12th attempt: got=11, check 11 > 10 (true), skip
Apply this diff to limit processing to exactly MAX_EVENT_RETRY attempts:
- if (event.got > MAX_EVENT_RETRY) { + if (event.got >= MAX_EVENT_RETRY) { this.logger.warn( - `Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`, + `Event ${event._id} has been tried ${event.got} times (max: ${MAX_EVENT_RETRY}), removing from staging area`, );Additionally, updated the log message to use the actual value and the variable for clarity.
🧹 Nitpick comments (1)
packages/core/src/models/event.model.ts (1)
42-42: Consider a more descriptive field name.The field name
gotis not immediately clear in its purpose. Consider renaming it to something more descriptive likeretryCount,attemptCount, orprocessingAttemptsto improve code readability.Apply this diff to improve clarity:
- got: number; + retryCount: number;Note: This would require updates in the repository and service layers where this field is referenced.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
packages/core/src/models/event.model.ts(1 hunks)packages/federation-sdk/src/repositories/event-staging.repository.ts(3 hunks)packages/federation-sdk/src/server-discovery/discovery.ts(1 hunks)packages/federation-sdk/src/services/staging-area.service.ts(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
packages/federation-sdk/src/services/staging-area.service.ts (1)
packages/core/src/models/event.model.ts (1)
EventStagingStore(40-44)
🔇 Additional comments (8)
packages/federation-sdk/src/server-discovery/discovery.ts (1)
40-40: Verify the impact of reducing cache duration by 12x.The default cache duration has been reduced from 1 hour to 5 minutes. While this may help with faster error recovery and fresher server discovery data, it will significantly increase the frequency of DNS lookups and well-known requests.
Consider the following:
- Will this 12x increase in discovery requests cause excessive load on external servers or DNS resolvers?
- Have you load-tested this change in a production-like environment?
- Is there a specific issue this addresses, or is it a precautionary measure?
You may want to profile the discovery request rate before and after this change to ensure it doesn't introduce performance issues.
packages/federation-sdk/src/repositories/event-staging.repository.ts (2)
13-18: Index change aligns with new retry logic.The updated index now includes
gotas the first field afterroomId, which correctly prioritizes events with fewer processing attempts. This supports the new retry-gating mechanism introduced in the staging area service.
52-64: Verify the atomicity and concurrency safety of the retrieval logic.The
findOneAndUpdatewithreturnDocument: 'before'ensures atomic increment of thegotcounter while retrieving the event. This prevents race conditions where multiple instances might process the same event simultaneously.However, verify that the sorting strategy
{ got: 1, 'event.depth': 1, createdAt: 1 }aligns with your intended processing order. Events with lower retry counts are prioritized, which makes sense, but ensure this doesn't inadvertently starve deeper events in the DAG.packages/federation-sdk/src/services/staging-area.service.ts (5)
29-39: LGTM! Environment variable parsing is robust.The MAX_EVENT_RETRY parsing logic correctly validates the environment variable and provides a sensible default of 10. The error handling ensures that invalid values are caught early.
93-100: Loop structure is clear and handles empty cases correctly.The do-while pattern with an explicit break on no events is a clear approach. The debug logging helps with observability.
114-117: Excellent addition of lock refresh logic.Updating the lock timestamp before processing each event prevents the lock from timing out during batch processing. This is crucial for long-running batches and prevents other instances from acquiring the lock prematurely.
151-160: Error handling improvements are consistent.The updated error messages now consistently use "postponing event processing" across all error cases, which improves log clarity. The change from unconditional unstaging to postponing for unexpected errors (lines 155-160) is more conservative and safer, allowing potentially transient failures to be retried.
162-162: Do-while loop termination is correct.The loop continues while
eventis truthy, which correctly terminates when no more events are found or after processing all available events.
Summary by CodeRabbit
New Features
Improvements