Change Feed Processor: Adds Lease container export support#5579
Change Feed Processor: Adds Lease container export support#5579
Conversation
dce1b84 to
f6b9083
Compare
|
I am curious if there are multiple version of leases present and if a customer tries to import v1 leases into a lease container where v2 leases are present? Is that a case in .NET SDK? |
If there are existing leases, and the |
makes sense, I am curious about the case when |
9f631f1 to
cc61cc3
Compare
c4bb4f5 to
44167b7
Compare
No there is no conflict resolution, it will just skip that lease |
44167b7 to
8f42cec
Compare
kirankumarkolli
left a comment
There was a problem hiding this comment.
Please check my comments
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
|
@sdkReviewAgent-2 |
- Make ShutdownAsync abstract in DocumentServiceLeaseStoreManager - Add no-op override in DocumentServiceLeaseStoreManagerCosmos - Consolidate ShutdownAsync call to single line in ChangeFeedProcessorCore Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
|
@sdkReviewAgent |
PR Deep Review SummaryThanks for iterating on this — a lot of prior feedback has already been addressed (stream disposal via Posting 7 new findings below (1 🔴 Blocking, 4 🟡 Recommendations, 2 🟢/💬). I reviewed all existing comments and skipped items that are already covered by prior reviewers. Top concern: the runtime guard intended to reject non-resizable Details inline. |
Remove SetLength validation from WithInMemoryLeaseContainer. Non-expandable stream errors will propagate naturally from ShutdownAsync when it tries to write. Also reorders validation so state-check runs before stream inspection. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
Re-review after commit
|
|
✅ Review complete (44:03) No new comments — existing review coverage is sufficient. Steps: ✓ context, correctness, cross-sdk, design, history, past-prs, synthesis, test-coverage |
…ss and clarity Addresses review feedback on PR #5579: - Persists lease state before stopping partition manager so shutdown ordering is deterministic (no try/finally/rethrow needed). - Extracts serialization into InMemoryLeaseJsonFormat helper so writer and reader cannot drift. - Writes SetLength before Serialize so a non-resizable buffer fails fast with a descriptive exception instead of corrupting stream contents. - Removes the unused SemaphoreSlim gate; ShutdownAsync has a single call site via ChangeFeedProcessor.StopAsync. - Throws on duplicate lease ids during deserialization and leaves the source stream positioned at 0 on success. - Expands XML documentation on WithInMemoryLeaseContainer and the base DocumentServiceLeaseStoreManager.ShutdownAsync. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Expands round-trip coverage to verify Properties (with unicode), FeedRange, and Timestamp survive persist/reload. - Adds Deserialize_DuplicateIds_Throws to lock in the dup-id guard. - Adds Deserialize_LeavesStreamPositionAtZero to pin the exit contract. - Updates ShutdownAsync_WithDisposedStream_Throws to expect InvalidOperationException after the SetLength-first reorder. - Removes StopAsync_AfterStop_CanRestart and StopAsync_WhenPartitionManagerThrows_StillCallsShutdownAsync; both premises are obsolete under the persist-first ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
The WithInMemoryLeaseContainer XML had a contradiction: the param guard rail warns against fixed-size MemoryStreams (new MemoryStream(byte[])), but the usage example a few lines later told callers to rehydrate by passing a new MemoryStream 'seeded with those bytes' -- exactly the non-resizable constructor the guard rail forbids. Replace the rehydrate sentence with an expandable pattern (new MemoryStream(), Write, Position = 0) so the example is consistent with the guard rail and actually works on restore. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
|
Azure Pipelines: Successfully started running 1 pipeline(s). |
## Release 3.59.0 ### Version Changes - ClientOfficialVersion: 3.58.0 → 3.59.0 - ClientPreviewVersion: 3.59.0 → 3.60.0 - ClientPreviewSuffixVersion: preview.0 → preview.0 ### Changelog (3.59.0 GA) #### Added - [5579](#5579) Change Feed Processor: Adds Lease container export support - [5709](#5709) Performance: Adds caching for URL-encoded AAD authorization signature - [5731](#5731) DNS dot-suffix: Adds TCP DNS dot-suffix for Direct mode to avoid Kubernetes ndots latency - [5755](#5755) Exceptionless: Adds enabling exception less 400 status code - [5756](#5756) Exceptionless: Adds enabling exception less 404/1002 status code - [5757](#5757) Exceptionless: Adds enabling exception less 403 - [5779](#5779) Direct: Adds Direct package version bump to 3.42.4 - [5786](#5786) Region Availability: Adds missing regions from Direct 3.42.4 - [5788](#5788) Socket Handler: Adds HTTP/2 PING keep-alive to detect broken connections in pool #### Fixed - [5553](#5553) NativeDLLs: Fixes Conditionally include win-x64 native DLLs based on RuntimeIdentifier - [5588](#5588) LINQ: Fixes memory leak from Expression.Compile() in all call sites - [5617](#5617) ChangeFeedProcessor: Fixes first-change skip during initial startup by anchoring StartTime - [5636](#5636) CosmosClientBuilder: Fixes self-referencing loop in GetSerializedConfiguration with STJ TypeInfoResolver - [5748](#5748) Routing: Fixes GetOverlappingRanges CPU overhead from repeated JSON deserialization - [5807](#5807) ChangeFeedProcessor: Fixes lease de-duplication for /partitionKey-partitioned lease containers ### Changelog (3.60.0-preview.0) #### Added - [5804](#5804) SemanticReranking: Adds Configurable Request Timeout #### Fixed - [5783](#5783) Container: Fixes SemanticRerankAsync TypeLoadException in derived classes ### API Contract Diff (GA) ```diff diff --git "a/Microsoft.Azure.Cosmos\\contracts\\API_3.58.0.txt" "b/Microsoft.Azure.Cosmos\\contracts\\API_3.59.0.txt" index 1b74a69..6fa9352 100644 --- "a/Microsoft.Azure.Cosmos\\contracts\\API_3.58.0.txt" +++ "b/Microsoft.Azure.Cosmos\\contracts\\API_3.59.0.txt" @@ -60,6 +60,7 @@ namespace Microsoft.Azure.Cosmos public ChangeFeedProcessor Build(); public ChangeFeedProcessorBuilder WithErrorNotification(Container.ChangeFeedMonitorErrorDelegate errorDelegate); public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(); + public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState); public ChangeFeedProcessorBuilder WithInstanceName(string instanceName); public ChangeFeedProcessorBuilder WithLeaseAcquireNotification(Container.ChangeFeedMonitorLeaseAcquireDelegate acquireDelegate); public ChangeFeedProcessorBuilder WithLeaseConfiguration(Nullable<TimeSpan> acquireInterval=default(Nullable<TimeSpan>), Nullable<TimeSpan> expirationInterval=default(Nullable<TimeSpan>), Nullable<TimeSpan> renewInterval=default(Nullable<TimeSpan>)); @@ -956,6 +957,7 @@ namespace Microsoft.Azure.Cosmos public const string NorwayWest = "Norway West"; public const string PolandCentral = "Poland Central"; public const string QatarCentral = "Qatar Central"; + public const string SaudiArabiaEast = "Saudi Arabia East"; public const string SingaporeCentral = "Singapore Central"; public const string SingaporeNorth = "Singapore North"; public const string SouthAfricaNorth = "South Africa North"; @@ -963,6 +965,7 @@ namespace Microsoft.Azure.Cosmos public const string SouthCentralUS = "South Central US"; public const string SouthCentralUS2 = "South Central US 2"; public const string SoutheastAsia = "Southeast Asia"; + public const string SoutheastAsia3 = "Southeast Asia 3"; public const string SoutheastUS = "Southeast US"; public const string SoutheastUS3 = "Southeast US 3"; public const string SoutheastUS5 = "Southeast US 5"; @@ -990,6 +993,7 @@ namespace Microsoft.Azure.Cosmos public const string USSecWest = "USSec West"; public const string USSecWestCentral = "USSec West Central"; public const string WestCentralUS = "West Central US"; + public const string WestCentralUSFRE = "West Central US FRE"; public const string WestEurope = "West Europe"; public const string WestIndia = "West India"; public const string WestUS = "West US"; ``` ### API Contract Diff (Preview) ```diff diff --git "a/Microsoft.Azure.Cosmos\\contracts\\API_3.59.0-preview.0.txt" "b/Microsoft.Azure.Cosmos\\contracts\\API_3.60.0-preview.0.txt" index 1ae52c0..58df10f 100644 --- "a/Microsoft.Azure.Cosmos\\contracts\\API_3.59.0-preview.0.txt" +++ "b/Microsoft.Azure.Cosmos\\contracts\\API_3.60.0-preview.0.txt" @@ -91,6 +91,7 @@ namespace Microsoft.Azure.Cosmos public ChangeFeedProcessor Build(); public ChangeFeedProcessorBuilder WithErrorNotification(Container.ChangeFeedMonitorErrorDelegate errorDelegate); public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(); + public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState); public ChangeFeedProcessorBuilder WithInstanceName(string instanceName); public ChangeFeedProcessorBuilder WithLeaseAcquireNotification(Container.ChangeFeedMonitorLeaseAcquireDelegate acquireDelegate); public ChangeFeedProcessorBuilder WithLeaseConfiguration(Nullable<TimeSpan> acquireInterval=default(Nullable<TimeSpan>), Nullable<TimeSpan> expirationInterval=default(Nullable<TimeSpan>), Nullable<TimeSpan> renewInterval=default(Nullable<TimeSpan>)); @@ -302,7 +303,7 @@ namespace Microsoft.Azure.Cosmos public abstract Task<ResponseMessage> ReplaceItemStreamAsync(Stream streamPayload, string id, PartitionKey partitionKey, ItemRequestOptions requestOptions=null, CancellationToken cancellationToken=default(CancellationToken)); public abstract Task<ThroughputResponse> ReplaceThroughputAsync(ThroughputProperties throughputProperties, RequestOptions requestOptions=null, CancellationToken cancellationToken=default(CancellationToken)); public abstract Task<ThroughputResponse> ReplaceThroughputAsync(int throughput, RequestOptions requestOptions=null, CancellationToken cancellationToken=default(CancellationToken)); - public abstract Task<SemanticRerankResult> SemanticRerankAsync(string rerankContext, IEnumerable<string> documents, IDictionary<string, object> options=null, CancellationToken cancellationToken=default(CancellationToken)); + public virtual Task<SemanticRerankResult> SemanticRerankAsync(string rerankContext, IEnumerable<string> documents, IDictionary<string, object> options=null, CancellationToken cancellationToken=default(CancellationToken)); public abstract Task<ItemResponse<T>> UpsertItemAsync<T>(T item, Nullable<PartitionKey> partitionKey=default(Nullable<PartitionKey>), ItemRequestOptions requestOptions=null, CancellationToken cancellationToken=default(CancellationToken)); public abstract Task<ResponseMessage> UpsertItemStreamAsync(Stream streamPayload, PartitionKey partitionKey, ItemRequestOptions requestOptions=null, CancellationToken cancellationToken=default(CancellationToken)); public delegate Task ChangeFeedHandlerWithManualCheckpoint<T>(ChangeFeedProcessorContext context, IReadOnlyCollection<T> changes, Func<Task> checkpointAsync, CancellationToken cancellationToken); @@ -407,6 +408,7 @@ namespace Microsoft.Azure.Cosmos public int GatewayModeMaxConnectionLimit { get; set; } public Func<HttpClient> HttpClientFactory { get; set; } public Nullable<TimeSpan> IdleTcpConnectionTimeout { get; set; } + public TimeSpan InferenceRequestTimeout { get; set; } public bool LimitToEndpoint { get; set; } public Nullable<int> MaxRequestsPerTcpConnection { get; set; } public Nullable<int> MaxRetryAttemptsOnRateLimitedRequests { get; set; } @@ -1092,6 +1094,7 @@ namespace Microsoft.Azure.Cosmos public const string NorwayWest = "Norway West"; public const string PolandCentral = "Poland Central"; public const string QatarCentral = "Qatar Central"; + public const string SaudiArabiaEast = "Saudi Arabia East"; public const string SingaporeCentral = "Singapore Central"; public const string SingaporeNorth = "Singapore North"; public const string SouthAfricaNorth = "South Africa North"; @@ -1099,6 +1102,7 @@ namespace Microsoft.Azure.Cosmos public const string SouthCentralUS = "South Central US"; public const string SouthCentralUS2 = "South Central US 2"; public const string SoutheastAsia = "Southeast Asia"; + public const string SoutheastAsia3 = "Southeast Asia 3"; public const string SoutheastUS = "Southeast US"; public const string SoutheastUS3 = "Southeast US 3"; public const string SoutheastUS5 = "Southeast US 5"; @@ -1126,6 +1130,7 @@ namespace Microsoft.Azure.Cosmos public const string USSecWest = "USSec West"; public const string USSecWestCentral = "USSec West Central"; public const string WestCentralUS = "West Central US"; + public const string WestCentralUSFRE = "West Central US FRE"; public const string WestEurope = "West Europe"; public const string WestIndia = "West India"; public const string WestUS = "West US"; @@ -1504,6 +1509,7 @@ namespace Microsoft.Azure.Cosmos.Fluent public CosmosClientBuilder WithEnableRemoteRegionPreferredForSessionRetry(bool enableRemoteRegionPreferredForSessionRetry); public CosmosClientBuilder WithFaultInjection(IFaultInjector faultInjector); public CosmosClientBuilder WithHttpClientFactory(Func<HttpClient> httpClientFactory); + public CosmosClientBuilder WithInferenceRequestTimeout(TimeSpan inferenceRequestTimeout); public CosmosClientBuilder WithLimitToEndpoint(bool limitToEndpoint); public CosmosClientBuilder WithPriorityLevel(PriorityLevel priorityLevel); public CosmosClientBuilder WithReadConsistencyStrategy(ReadConsistencyStrategy readConsistencyStrategy); ``` ### Checklist - [ ] Changelog entries reviewed by team - [ ] API contract diff reviewed by Kiran and Kirill - [ ] Preview APIs reviewed (email sent to azurecosmossdkdotnet@microsoft.com) - [ ] Kiran sign-off obtained Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Description
This PR adds the ability to persist and restore in-memory ChangeFeed processor lease state via a
MemoryStream. When a processor is built withWithInMemoryLeaseContainer(MemoryStream), the stream serves as both input and output: existing data in the stream initializes the lease container on startup, and the current lease state is automatically written back to the stream when the processor stops. This enables backup, restore, and restart scenarios for in-memory lease containers without requiring a Cosmos DB lease container.API
Design Decisions
Single MemoryStream for Read and Write
MemoryStreamserves dual purpose: if it contains data on init, leases are deserialized from it. OnStopAsync, current lease state is serialized back into the same stream.Persist-First Shutdown Ordering
ChangeFeedProcessorCore.StopAsynccallsstoreManager.ShutdownAsync()beforepartitionManager.StopAsync().Stream Writer Correctness
DocumentServiceLeaseContainerInMemory.ShutdownAsynccallsSetLength(serializedBytes.Length)before writing. If the supplied stream is not expandable and cannot hold the new payload,SetLengththrows and the user's stream is left untouched — no partial-write corruption.InvalidOperationExceptionwith a message pointing callers atnew MemoryStream()instead ofnew MemoryStream(byte[]).Automatic Persistence on Stop
StopAsyncflow, via a virtualShutdownAsync()lifecycle hook on the internalDocumentServiceLeaseContainerbase class, overridden only by the in-memory implementation.Shared Serialization Format
InMemoryLeaseJsonFormat, a single internal helper that owns encoding, buffer size, andJsonSerializersettings. This prevents silent drift between writer and reader.Duplicate Lease Detection
DocumentServiceLeaseStoreManagerInMemory.DeserializeLeaseStatefails fast withInvalidOperationExceptionif the persisted state contains duplicate lease ids, rather than silently overwriting entries.No Changes to Cosmos-Backed Leases
ShutdownAsyncbase class method is a no-op for implementations that manage their own persistence.In-Memory Only
Usage Example
Test Coverage
Builder Tests (
ChangeFeedProcessorBuilderTests)WithInMemoryLeaseContainerWithStreamInitializesStoreCorrectly— Verifies leases are restored from a populated stream.WithInMemoryLeaseContainerWithEmptyStreamInitializesEmptyStore— Empty stream creates an empty container.WithInMemoryLeaseContainerWithEmptyArrayStreamInitializesEmptyStore— Covers the empty-array seed variant.WithInMemoryLeaseContainerWithNullStreamThrows— Validates null argument handling.WithInMemoryLeaseContainerWithStreamCannotCombineWithLeaseContainer— Prevents combining with Cosmos container.WithInMemoryLeaseContainerWithStreamCannotCombineWithExistingInMemory— Prevents double in-memory configuration.WithInMemoryLeaseContainerWithCorruptedStreamThrowsInvalidOperation— Malformed JSON surfaces asInvalidOperationException.WithInMemoryLeaseContainer_FullLifecycle_RestoreProcessStopPersist— End-to-end restore → use → stop → re-persist.In-Memory Container Tests (
DocumentServiceLeaseContainerInMemoryTests)ShutdownAsync_WithNoStream_IsNoOp— No-op when no stream is configured.ShutdownAsync_WritesExpectedCount— Correct lease count serialized (parameterized: 0, 2).ShutdownAsync_StreamPositionResetToZero— Stream position reset for consumers.ShutdownAsync_WithNonEpkLease_StillSerializes— Non-EPK lease types persist correctly.ShutdownAsync_WithDisposedStream_Throws— Disposed stream surfaces asInvalidOperationException.ShutdownAsync_WithNonResizableStream_SameSizeData_WritesSuccessfully— Fixed buffer sized exactly right still works.ShutdownAsync_WithNonResizableStream_LargerData_ThrowsInvalidOperation— Fixed buffer too small fails fast, before any partial write.PersistThenDeserialize_RoundTrip_PreservesData— Full round-trip preservesLeaseToken,ContinuationToken,Owner,Properties(including unicode),FeedRange, andTimestamp.PersistOverwritesPreviousStreamContent— Second persist replaces previous data (no stale trailing bytes).Deserialize_DuplicateIds_Throws— Duplicate lease ids in persisted state fail fast.Deserialize_LeavesStreamPositionAtZero— Reader resets stream position so subsequent writers see a fresh stream.Processor Core Tests (
ChangeFeedProcessorCoreTests)StopAsync_CallsShutdownAsync— VerifiesShutdownAsyncis invoked during stop.StopAsync_WithInMemoryLeases_PersistsStateToStream— Persist-first ordering produces a populated stream.StopAsync_WhenShutdownAsyncThrows_ExceptionPropagates— Persistence failure surfaces to the caller and skips partition shutdown.Type of change
Closing issues
closes #5580