Refresh membership before directory RPC routing#10088
Conversation
There was a problem hiding this comment.
Pull request overview
Ensures directory RPC routing/ownership decisions are made using up-to-date cluster membership by refreshing/applying membership snapshots when incoming directory operations reference newer membership versions than the local directory has applied.
Changes:
- Add version-aware membership refresh/apply step before processing directory register/unregister operations which carry
GrainAddressmembership versions. - Switch
LocalGrainDirectorymembership tracking fromISiloStatusOracleevent subscription toIClusterMembershipServicesnapshot/updates processing. - Move “deactivate activations on terminating silo” logic out of
Catalogand intoLocalGrainDirectory’s membership processing path.
Show a summary per file
| File | Description |
|---|---|
| test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs | Updates test wiring to provide a cluster membership service dependency. |
| src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs | Applies/refreshed membership snapshots and refreshes membership prior to register/unregister routing decisions. |
| src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs | Adjusts handoff processing queue behavior and filters incoming registrations based on silo status. |
| src/Orleans.Runtime/Catalog/Catalog.cs | Removes silo-status-change/deactivation logic now handled by the directory. |
Copilot's findings
- Files reviewed: 4/4 changed files
- Comments generated: 3
| private async Task ExecutePendingOperations() | ||
| { | ||
| using (await executorLock.LockAsync()) | ||
| { | ||
| var dequeueCount = 0; | ||
| while (true) | ||
| { | ||
| // Get the next operation, or exit if there are none. | ||
| (string Name, object State, Func<GrainDirectoryHandoffManager, object, Task> Action) op; | ||
| lock (this) | ||
| { | ||
| if (this.pendingOperations.Count == 0) break; | ||
|
|
||
| op = this.pendingOperations.Peek(); | ||
| } | ||
|
|
||
| dequeueCount++; | ||
|
|
||
| try | ||
| { | ||
| await op.Action(this, op.State); | ||
| // Success, reset the dequeue count | ||
| dequeueCount = 0; | ||
| } | ||
| catch (Exception exception) | ||
| { | ||
| if (dequeueCount < MAX_OPERATION_DEQUEUE) | ||
| { | ||
| LogWarningOperationFailedRetry(logger, exception, op.Name); | ||
| await Task.Delay(RetryDelay); | ||
| } | ||
| else | ||
| lock (this) | ||
| { | ||
| LogWarningOperationFailedNoRetry(logger, exception, op.Name); | ||
| this.pendingOperations.Dequeue(); | ||
| } | ||
| } | ||
| if (dequeueCount == 0 || dequeueCount >= MAX_OPERATION_DEQUEUE) | ||
| catch (Exception exception) | ||
| { | ||
| lock (this) | ||
| if (!this.localDirectory.Running) | ||
| { | ||
| // Remove the operation from the queue if it was a success | ||
| // or if we tried too many times | ||
| this.pendingOperations.Dequeue(); | ||
| return; | ||
| } | ||
|
|
||
| LogWarningOperationFailedRetry(logger, exception, op.Name); | ||
| await Task.Delay(RetryDelay); | ||
| } |
There was a problem hiding this comment.
Fixed in the processing-cleanup branch (a4cc38a56e): failed handoff work is retried but moved to the back of the queue, so later queued operations can make progress.
| private void EnqueueOperation(string name, object state, Func<GrainDirectoryHandoffManager, object, Task> action) | ||
| { | ||
| lock (this) | ||
| { | ||
| this.pendingOperations.Enqueue((name, state, action)); | ||
| if (this.pendingOperations.Count <= 2) | ||
| { | ||
| this.localDirectory.RemoteGrainDirectory.WorkItemGroup.QueueTask(ExecutePendingOperations, localDirectory.RemoteGrainDirectory); | ||
| } | ||
| this.localDirectory.RemoteGrainDirectory.WorkItemGroup.QueueTask(ExecutePendingOperations, localDirectory.RemoteGrainDirectory); | ||
| } |
There was a problem hiding this comment.
This was fixed by the merged base PR #10086 and is present after rebasing on upstream/main: enqueueing is gated again instead of scheduling one executor per operation.
| DirectoryInstruments.RegistrationsSingleActIssued.Add(1); | ||
| } | ||
|
|
||
| await RefreshMembershipIfNewer(address, previousAddress); | ||
|
|
||
| // see if the owner is somewhere else (returns null if we are owner) | ||
| var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); |
There was a problem hiding this comment.
Fixed in 5586e593fe: added a targeted test covering a register request whose address carries a newer membership version, ensuring membership is applied before forwarding and no extra refresh is requested when the current snapshot is already new enough.
6e548f3 to
65cc3fc
Compare
Evict LocalGrainDirectory directory and cache entries for terminating silos immediately, and only evict unknown-silo entries when the address was registered before the applied membership snapshot. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Refresh and apply cluster membership snapshots when grain directory RPCs receive GrainAddress values from a newer membership version before making ownership decisions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
65cc3fc to
5586e59
Compare
Part 3 of 7 split from #10085.
Problem:
Directory RPCs can receive GrainAddress values which were observed at a newer membership version than the local directory has applied, causing routing and ownership decisions to be made with stale membership data.
Solution:
Refresh and apply cluster membership before processing directory operations when incoming addresses reference a newer membership version.
Stack:
Merge after #10087. This branch is stacked on split/pr10085-02-stale-version-cleanup; until earlier PRs merge, GitHub may show earlier stack changes. Incremental compare: ReubenBond/orleans@split/pr10085-02-stale-version-cleanup...split/pr10085-03-refresh-newer-membership
Review focus:
Refresh timing before register/unregister paths and avoiding unnecessary refreshes when the local snapshot is already current.