-
Notifications
You must be signed in to change notification settings - Fork 19
fix: missing events not being processed properly #212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughExposes Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Caller as Missing/Room Service
participant EventSvc as EventService
participant Validator as Signature Validator
participant Staging as StagingAreaService
participant Lock as Per-Room Lock
participant Queue as Room Queue
Caller->>EventSvc: processIncomingPDUs(origin, [PDUs])
EventSvc->>EventSvc: group PDUs by room_id
loop each room
EventSvc->>Validator: validateEvent(event, origin)
alt origin missing signature, local signed
Validator->>Validator: use local server as originToValidateSignatures
end
Validator-->>EventSvc: validation result
alt valid & not seen
EventSvc->>EventSvc: generate eventId & stage
EventSvc->>Lock: try acquire per-room lock
alt lock acquired
EventSvc->>Queue: enqueue room for staging processing
else
EventSvc->>EventSvc: skip enqueue (lock unavailable)
end
else invalid/seen
EventSvc->>EventSvc: skip event
end
end
sequenceDiagram
autonumber
participant Missing as MissingEventService
participant Room as RoomService
participant EventSvc as EventService
Note over Missing,Room: Replaces direct persist with unified processing
Missing->>EventSvc: processIncomingPDUs(origin, [fetchedEvent])
Room->>EventSvc: processIncomingPDUs(residentServer||origin, [joinEventFinal])
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #212 +/- ##
==========================================
- Coverage 80.99% 80.94% -0.06%
==========================================
Files 63 63
Lines 4693 4696 +3
==========================================
Hits 3801 3801
- Misses 892 895 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/federation-sdk/src/services/event.service.ts (1)
1-1: Pipeline formatting issue needs to be addressed.The pipeline failure indicates a Biome formatting issue. This should be resolved before the PR can be merged.
Run the following script to verify and fix the formatting issue:
#!/bin/bash # Check and fix formatting issues with Biome bunx @biomejs/biome check --apply packages/federation-sdk/src/services/event.service.ts # Show what changed git diff packages/federation-sdk/src/services/event.service.ts
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/federation-sdk/src/services/event.service.ts(2 hunks)packages/federation-sdk/src/services/missing-event.service.ts(1 hunks)packages/federation-sdk/src/services/room.service.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/federation-sdk/src/services/event.service.ts (3)
packages/room/src/manager/event-wrapper.ts (2)
origin(85-91)event(105-114)packages/room/src/types/v3-11.ts (1)
Pdu(729-729)packages/core/src/utils/checkSignAndHashes.ts (1)
checkSignAndHashes(12-48)
packages/federation-sdk/src/services/missing-event.service.ts (1)
packages/room/src/manager/event-wrapper.ts (2)
origin(85-91)event(105-114)
🪛 GitHub Actions: my-workflow
packages/federation-sdk/src/services/event.service.ts
[error] 1-1: File content differs from formatting output. bunx @biomejs/biome ci --diagnostic-level=error reported formatting differences for this file.
packages/federation-sdk/src/services/room.service.ts
[error] 1-1: File content differs from formatting output. bunx @biomejs/biome ci --diagnostic-level=error reported formatting differences for this file.
🔇 Additional comments (5)
packages/federation-sdk/src/services/room.service.ts (1)
990-990: LGTM! Proper integration with centralized PDU processing.This change successfully routes the remote join event through the centralized
processIncomingPDUsmethod instead of direct persistence. The approach ensures:
- Consistent validation and processing across all PDU pathways
- Per-room locking and concurrency management
- Origin-aware signature validation for join events
- Integration with the updated event processing pipeline
The use of
residentServeras the origin parameter is appropriate as it represents the authority server for the room.packages/federation-sdk/src/services/missing-event.service.ts (1)
53-53: LGTM! Proper integration with centralized PDU processing.This change successfully routes fetched missing events through the centralized
processIncomingPDUsmethod instead of direct persistence. This ensures:
- Consistent validation and processing for all event sources
- Per-room locking and concurrency management
- Origin-aware signature validation
- Integration with the unified event processing pipeline
The origin parameter correctly uses the fetching server's origin.
packages/federation-sdk/src/services/event.service.ts (3)
157-157: LGTM! Appropriate visibility change for centralized PDU processing.Making
processIncomingPDUspublic is the correct approach to enable other services (RoomService, MissingEventService) to use the centralized PDU processing pipeline. This follows TypeScript best practices: "start with a default public access modifier for methods that form the class's core interface. Then, use private and protected access modifiers to manage internal details".The method now serves as part of the EventService's public API for processing incoming PDUs.
157-217: Enhanced concurrent processing with proper room grouping and validation.The updated logic provides significant improvements:
- Room-based grouping: Events are organized by room ID for parallel processing
- Per-room locking: Prevents race conditions while enabling cross-room concurrency
- Comprehensive validation: Each event goes through schema and signature validation
- Duplicate detection: Prevents reprocessing of existing events
- Staging mechanism: Events are staged before processing to ensure atomicity
The concurrent processing approach (
Promise.all) is well-designed for handling multiple rooms simultaneously while maintaining per-room consistency.
265-275: Smart origin-aware signature validation for join events.This logic elegantly handles the join event signature validation scenario where:
- The event origin differs from the actual signing server (common in federation joins)
- Local server signs the join event but the origin remains the remote server
- Validation needs to use the correct signing server's key
The fallback mechanism correctly identifies when to use the local server name for signature validation while preserving the original event origin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
packages/federation-sdk/src/services/event.service.ts (3)
176-181: Sanitize logs to avoid leaking event content (PII) on validation failuresLogging the full event can expose message content and other PII. Log minimal metadata instead (room_id, type, sender, optionally event_id) and the error.
Apply this diff:
- this.logger.error({ - msg: 'Event validation failed', - event, - error: err, - }); + const safeMeta = { + room_id: event.room_id, + type: event.type, + sender: event.sender, + }; + let event_id: string | undefined; + try { + event_id = generateId(event); + } catch (_) { + /* ignore */ + } + this.logger.error({ + msg: 'Event validation failed', + ...safeMeta, + ...(event_id ? { event_id } : {}), + error: err, + });
196-214: Handle duplicate-key races when staging events to prevent hard failuresTwo instances can race to stage the same event. A duplicate-key error (e.g., Mongo code 11000) will currently reject the whole room processing and potentially the entire Promise.all. Catch and ignore duplicates; only error on unexpected failures.
Apply this diff:
- // save the event as staged to be processed - await this.eventStagingRepository.create(eventId, origin, event); - - // acquire a lock for processing the event - const lock = await this.lockRepository.getLock( - roomId, - this.configService.instanceId, - ); - if (!lock) { - this.logger.debug(`Couldn't acquire a lock for room ${roomId}`); - continue; - } - - // if we have a lock, we can process the event - // void this.stagingAreaService.processEventForRoom(roomId); - - // TODO change this to call stagingAreaService directly (line above) - this.stagingAreaQueue.enqueue(roomId); + // save the event as staged to be processed + try { + await this.eventStagingRepository.create(eventId, origin, event); + } catch (err: any) { + // ignore duplicate-key (already staged by us or another instance) + if (err?.code === 11000) { + this.logger.debug( + `Event ${eventId} already staged for room ${roomId}`, + ); + } else { + this.logger.error({ + msg: 'Failed to stage event', + event_id: eventId, + room_id: roomId, + error: err, + }); + continue; + } + } + + // acquire a lock for processing the event + const lock = await this.lockRepository.getLock( + roomId, + this.configService.instanceId, + ); + if (!lock) { + this.logger.debug(`Couldn't acquire a lock for room ${roomId}`); + continue; + } + + // if we have a lock, we can process the event + this.stagingAreaQueue.enqueue(roomId);
261-283: Require both signatures and hashes before verification
checkSignAndHashesdestructurespdu.hashes.sha256; with the current&&guard, missing either field can throw. Require both.Apply this diff:
- if (!event.hashes && !event.signatures) { - throw new Error('M_MISSING_SIGNATURES_OR_HASHES'); - } + if (!event.hashes || !event.signatures) { + throw new Error('M_MISSING_SIGNATURES_OR_HASHES'); + }Also, make the signer-origin fallback more robust by ensuring the signature sets are non-empty:
- if ( - !event.signatures[origin] && - event.signatures[this.configService.serverName] - ) { - originToValidateSignatures = this.configService.serverName; - } + const hasOriginSig = + !!event.signatures[origin] && + Object.keys(event.signatures[origin]!).length > 0; + const hasLocalSig = + !!event.signatures[this.configService.serverName] && + Object.keys(event.signatures[this.configService.serverName]!).length > 0; + if (!hasOriginSig && hasLocalSig) { + originToValidateSignatures = this.configService.serverName; + }
🧹 Nitpick comments (2)
packages/federation-sdk/src/services/event.service.ts (2)
199-207: Acquire room lock once per room, not per eventLocking per event adds contention and cost. Stage all events for the room first, then try acquiring the room lock once and enqueue if at least one event was newly staged.
Proposed approach:
- Track a boolean
stagedAnyinside the room loop.- After processing all events for the room, attempt
getLock(roomId, instanceId)once; enqueue if lock acquired andstagedAnytrue.- This also reduces duplicate queue enqueues.
168-216: Bound cross-room parallelismPromise.all over all rooms can create unbounded concurrency. Limit with a small pool (e.g., 8–16), configurable via ConfigService.
If adding a tiny dep is acceptable, use p-limit:
- // process each room's events in parallel - // TODO implement a concurrency limit - await Promise.all( - Array.from(eventsByRoomId.entries()).map(async ([roomId, events]) => { + // process each room's events with a concurrency limit + const limit = this.configService.eventProcessingConcurrency ?? 12; + const entries = Array.from(eventsByRoomId.entries()); + await Promise.all(entries.map(([roomId, events]) => + // wrap in a simple semaphore (or use p-limit if available) + this.stagingAreaQueue.withConcurrency(limit, async () => { for await (const event of events) { // ... } - }), - ); + }), + ));If
withConcurrencyis not available, consider introducing a small semaphore utility or usingp-limit.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/federation-sdk/src/services/event.service.ts(2 hunks)packages/federation-sdk/src/services/room.service.ts(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/federation-sdk/src/services/room.service.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/federation-sdk/src/services/event.service.ts (3)
packages/room/src/manager/event-wrapper.ts (2)
origin(85-91)event(105-114)packages/room/src/types/v3-11.ts (1)
Pdu(729-729)packages/core/src/utils/checkSignAndHashes.ts (1)
checkSignAndHashes(12-48)
🔇 Additional comments (1)
packages/federation-sdk/src/services/event.service.ts (1)
271-276: Confirm tests covering signature fallback scenarios
- Event with only local signature (sendJoin echo) passes validation
- Event with neither origin nor local signature fails with M_MISSING_SIGNATURES_OR_HASHES
- Event with an empty signature object for the origin is treated as “no signature”
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/federation-sdk/src/repositories/event-staging.repository.ts (1)
43-48: Consider adding an index for the sort fields to optimize query performance.The method sorts by
event.depthandcreatedAt, but the existing index only coversroomIdandcreatedAt. Consider adding a compound index that includes the depth field for better query performance.Add this index in the constructor:
constructor( @inject('EventStagingCollection') private readonly collection: Collection<EventStagingStore>, ) { this.collection.createIndex({ roomId: 1, createdAt: 1 }); + this.collection.createIndex({ roomId: 1, 'event.depth': 1, createdAt: 1 }); }packages/federation-sdk/src/services/staging-area.service.ts (1)
113-117: Consider validating that both auth and prev events are arrays.While
extractEventsFromIncomingPDUhandles missing arrays with defaults, consider adding defensive checks here as well.const [authEvents, prevEvents] = this.extractEventsFromIncomingPDU( event.event, ); + +// Defensive validation +if (!Array.isArray(authEvents) || !Array.isArray(prevEvents)) { + this.logger.error({ + msg: 'Invalid event structure: auth_events or prev_events not an array', + eventId, + authEvents: Array.isArray(authEvents), + prevEvents: Array.isArray(prevEvents) + }); + throw new Error('Invalid event structure'); +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Jira integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/federation-sdk/src/repositories/event-staging.repository.ts(1 hunks)packages/federation-sdk/src/services/event.service.ts(3 hunks)packages/federation-sdk/src/services/staging-area.service.ts(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/federation-sdk/src/services/event.service.ts
🧰 Additional context used
🧬 Code graph analysis (2)
packages/federation-sdk/src/repositories/event-staging.repository.ts (2)
packages/room/src/manager/event-wrapper.ts (1)
roomId(75-77)packages/core/src/models/event.model.ts (1)
EventStagingStore(27-29)
packages/federation-sdk/src/services/staging-area.service.ts (1)
packages/core/src/models/event.model.ts (1)
EventStagingStore(27-29)
🔇 Additional comments (8)
packages/federation-sdk/src/repositories/event-staging.repository.ts (1)
43-48: LGTM! The new method enables proper depth-based event processing.The
getLeastDepthEventForRoommethod correctly prioritizes events by depth first (lower depth = earlier in the event graph), with creation time as a tiebreaker. This aligns with Matrix's event ordering semantics.packages/federation-sdk/src/services/staging-area.service.ts (7)
24-29: LGTM! Proper custom error for authorization event handling.The
MissingAuthorizationEventsErrorclass provides clear semantic distinction between missing authorization events (which require postponement) and other errors.
45-49: Good refactor: Returning a tuple provides better separation of concerns.Returning
[authEvents, prevEvents]as a tuple instead of concatenating them allows downstream code to handle authorization events and previous events differently, which is essential for the new authorization flow.
74-86: Good error handling differentiation.The code properly distinguishes between
MissingAuthorizationEventsError(logged as info with postponement) and other errors (logged as error). This prevents flooding error logs with expected postponement cases.
113-117: LGTM! Proper destructuring of auth and prev events.The code correctly destructures the tuple returned by
extractEventsFromIncomingPDUand combines them for dependency checking.
144-148: Good optimization: Early exit for missing authorization events.Throwing
MissingAuthorizationEventsErrorwhen authorization events are missing prevents unnecessary processing that would fail anyway. The use ofArray.some()withincludesis efficient.
154-154: Authorization event type usage is correct
Verified that this call to getAuthEventIds uses event.event.type consistently with other invocations; no changes needed.
51-52: No creation-order assumptions detected; depth-based processing is safe
Staging-area now usesgetLeastDepthEventForRoom(sorted byevent.depth, thencreatedAt); no other code relies on pure creation-time ordering.
| // TODO: what should we do to avoid infinite loops in case the next event is always the same event | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Address the TODO about infinite loops.
The TODO comment raises a valid concern about potential infinite loops if the same event keeps being returned. This could happen if an event permanently lacks its authorization events.
Consider implementing one of these strategies:
- Track retry attempts per event and skip after a threshold
- Implement exponential backoff for events that repeatedly fail
- Move permanently failing events to a dead letter queue
Would you like me to open an issue to track implementing a retry limit mechanism?
Summary by CodeRabbit
Bug Fixes
Refactor