diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs index 53ac46b969..9ef00449c5 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorBuilder.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos { using System; + using System.IO; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using static Microsoft.Azure.Cosmos.Container; @@ -219,14 +220,63 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer) /// The instance of to use. public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer() { - if (this.leaseContainer != null) + this.ValidateNoLeaseContainerConfigured(); + + if (string.IsNullOrEmpty(this.InstanceName)) { - throw new InvalidOperationException("The builder already defined a lease container."); + this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName; } - if (this.LeaseStoreManager != null) + this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(); + return this; + } + + /// + /// Uses an in-memory container to maintain state of the leases, optionally initialized from a + /// containing previously persisted lease state. + /// + /// When the processor is stopped via , the current lease state + /// is automatically written back to the same stream, allowing the state to be + /// restored when creating a new processor instance. + /// + /// Using an in-memory container restricts the scaling capability to just the instance running the current processor. + /// + /// + /// + /// must not be invoked concurrently from multiple threads; the + /// in-memory container expects a single shutdown call and does not synchronize concurrent writers to + /// . + /// + /// + /// + /// A that serves as both input and output for lease state. + /// If the stream contains data, leases are deserialized and used to initialize the container. + /// When the processor stops, the current lease state is serialized back into this stream. + /// The stream must be writable and expandable (for example, created via new MemoryStream()). + /// A fixed-size stream such as new MemoryStream(byte[]) will fail at shutdown if the + /// serialized lease state exceeds the original buffer capacity. + /// A is required (rather than the base type) so that + /// the lease state can be trimmed via when a new snapshot is smaller + /// than the previously persisted one. To integrate with -based persistence + /// (e.g., a file or blob), call after + /// to obtain the persisted bytes; create an expandable + /// (new MemoryStream()), write the bytes into it, set + /// back to 0, and pass it to this method. + /// + /// The instance of to use. + /// Thrown when is null. + public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState) + { + if (leaseState == null) { - throw new InvalidOperationException("The builder already defined an in-memory lease container instance."); + throw new ArgumentNullException(nameof(leaseState)); + } + + this.ValidateNoLeaseContainerConfigured(); + + if (!leaseState.CanWrite) + { + throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState)); } if (string.IsNullOrEmpty(this.InstanceName)) @@ -234,7 +284,9 @@ public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer() this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName; } - this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(); + // Deserialization of lease state (if any) is handled inside the manager + // so that serialization and deserialization are co-located in the same layer. + this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(leaseState); return this; } @@ -317,5 +369,18 @@ public ChangeFeedProcessor Build() this.isBuilt = true; return this.changeFeedProcessor; } + + private void ValidateNoLeaseContainerConfigured() + { + if (this.leaseContainer != null) + { + throw new InvalidOperationException("The builder already defined a lease container."); + } + + if (this.LeaseStoreManager != null) + { + throw new InvalidOperationException("The builder already defined an in-memory lease container instance."); + } + } } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs index 60d2dd2c9f..1926575301 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/ChangeFeedProcessorCore.cs @@ -5,7 +5,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed { using System; - using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; @@ -79,7 +78,14 @@ public override async Task StartAsync() public override async Task StopAsync() { DefaultTrace.TraceInformation("Stopping processor..."); + + // Persist in-memory lease state before stopping the partition manager so that + // a subsequent partition-manager shutdown failure cannot prevent recovery of the + // lease snapshot. No-op for Cosmos-backed leases. + await this.documentServiceLeaseStoreManager.ShutdownAsync().ConfigureAwait(false); + await this.partitionManager.StopAsync().ConfigureAwait(false); + DefaultTrace.TraceInformation("Processor stopped."); } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs index 867b37da51..01f6c3106c 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseContainerInMemory.cs @@ -1,21 +1,32 @@ -//------------------------------------------------------------ +//------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading.Tasks; internal sealed class DocumentServiceLeaseContainerInMemory : DocumentServiceLeaseContainer { private readonly ConcurrentDictionary container; + private readonly MemoryStream leaseStateStream; public DocumentServiceLeaseContainerInMemory(ConcurrentDictionary container) + : this(container, leaseStateStream: null) + { + } + + public DocumentServiceLeaseContainerInMemory( + ConcurrentDictionary container, + MemoryStream leaseStateStream) { this.container = container; + this.leaseStateStream = leaseStateStream; } public override Task> GetAllLeasesAsync() @@ -27,5 +38,46 @@ public override Task> GetOwnedLeasesAsync() { return Task.FromResult>(this.container.Values.AsEnumerable()); } + + /// + /// Persists the current in-memory lease state into the user-supplied . + /// + /// + /// Must only be invoked from the single ChangeFeedProcessor.StopAsync call path; + /// concurrent invocation is not supported and may corrupt the stream. + /// + /// A completed task once the stream has been populated, or a no-op if no stream was supplied. + public Task ShutdownAsync() + { + if (this.leaseStateStream == null) + { + return Task.CompletedTask; + } + + byte[] serializedBytes = InMemoryLeaseJsonFormat.Serialize(this.container.Values.ToList()); + + // Resize the target stream BEFORE writing. If the stream is not expandable and + // cannot hold the new payload, SetLength throws NotSupportedException and the + // user's stream is left untouched (no partial-write corruption). If SetLength + // succeeds, the subsequent Write is guaranteed to fit. + try + { + this.leaseStateStream.SetLength(serializedBytes.Length); + } + catch (NotSupportedException ex) + { + throw new InvalidOperationException( + "Failed to persist lease state because the MemoryStream is not expandable and the serialized " + + "state exceeds its capacity. Use 'new MemoryStream()' or a MemoryStream with sufficient " + + "capacity instead of 'new MemoryStream(byte[])' to create a resizable stream.", + ex); + } + + this.leaseStateStream.Position = 0; + this.leaseStateStream.Write(serializedBytes, 0, serializedBytes.Length); + this.leaseStateStream.Position = 0; + + return Task.CompletedTask; + } } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs index 3379e19af1..f8e7597271 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManager.cs @@ -4,6 +4,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { + using System.Threading.Tasks; + /// /// The DocumentServiceLeaseStoreManager defines a way to perform operations with . /// @@ -29,5 +31,13 @@ internal abstract class DocumentServiceLeaseStoreManager /// for particular monitoring collection and lease container prefix. /// public abstract DocumentServiceLeaseStore LeaseStore { get; } + + /// + /// Called when the processor is stopping. Implementations may override to perform + /// cleanup or state persistence. The default implementation (for Cosmos-backed + /// lease stores) is a no-op. Exceptions thrown from this method propagate to the + /// caller of . + /// + public abstract Task ShutdownAsync(); } } diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs index 14efed91fe..0a3cca437a 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerCosmos.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { using System; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos; /// @@ -80,5 +81,10 @@ internal DocumentServiceLeaseStoreManagerCosmos( public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer; public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer; + + public override Task ShutdownAsync() + { + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerInMemory.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerInMemory.cs index cd8441af5d..d99703057a 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerInMemory.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/DocumentServiceLeaseStoreManagerInMemory.cs @@ -6,6 +6,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement { using System; using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Newtonsoft.Json; /// /// Lease manager that is using In-Memory as lease storage. @@ -15,15 +19,33 @@ internal sealed class DocumentServiceLeaseStoreManagerInMemory : DocumentService private readonly DocumentServiceLeaseStore leaseStore; private readonly DocumentServiceLeaseManager leaseManager; private readonly DocumentServiceLeaseCheckpointer leaseCheckpointer; - private readonly DocumentServiceLeaseContainer leaseContainer; + private readonly DocumentServiceLeaseContainerInMemory leaseContainer; public DocumentServiceLeaseStoreManagerInMemory() : this(new ConcurrentDictionary()) { } + /// + /// Initializes a new instance from a containing + /// previously persisted lease state. Deserialization is co-located here so + /// that the manager owns the lease JSON format for both read (restore) and + /// write (ShutdownAsync → persist). + /// + internal DocumentServiceLeaseStoreManagerInMemory(MemoryStream leaseStateStream) + : this(DocumentServiceLeaseStoreManagerInMemory.DeserializeLeaseState(leaseStateStream), leaseStateStream) + { + } + internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary container) - : this(new DocumentServiceLeaseUpdaterInMemory(container), container) + : this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream: null) + { + } + + internal DocumentServiceLeaseStoreManagerInMemory( + ConcurrentDictionary container, + MemoryStream leaseStateStream) + : this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream) { } @@ -35,7 +57,8 @@ internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary internal DocumentServiceLeaseStoreManagerInMemory( DocumentServiceLeaseUpdater leaseUpdater, - ConcurrentDictionary container) // For testing purposes only. + ConcurrentDictionary container, + MemoryStream leaseStateStream = null) { if (leaseUpdater == null) throw new ArgumentException(nameof(leaseUpdater)); @@ -47,7 +70,7 @@ internal DocumentServiceLeaseStoreManagerInMemory( leaseUpdater, new PartitionedByIdCollectionRequestOptionsFactory()); - this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container); + this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container, leaseStateStream); } public override DocumentServiceLeaseStore LeaseStore => this.leaseStore; @@ -57,5 +80,61 @@ internal DocumentServiceLeaseStoreManagerInMemory( public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer; public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer; + + public override Task ShutdownAsync() + { + return this.leaseContainer.ShutdownAsync(); + } + + /// + /// Deserializes lease state from a into a dictionary. + /// This is the counterpart of the serialization in + /// . + /// + private static ConcurrentDictionary DeserializeLeaseState( + MemoryStream leaseStateStream) + { + ConcurrentDictionary container = + new ConcurrentDictionary(); + + if (leaseStateStream == null || leaseStateStream.Length == 0) + { + return container; + } + + List leases; + try + { + leases = InMemoryLeaseJsonFormat.Deserialize(leaseStateStream); + } + catch (JsonException ex) + { + throw new InvalidOperationException( + "Failed to deserialize lease state from the provided MemoryStream. " + + "Ensure the stream contains valid lease state JSON previously persisted by the ChangeFeedProcessor.", + ex); + } + + foreach (DocumentServiceLease lease in leases) + { + if (string.IsNullOrEmpty(lease?.Id)) + { + throw new InvalidOperationException("Lease state contains a null or invalid lease entry."); + } + + if (!container.TryAdd(lease.Id, lease)) + { + throw new InvalidOperationException( + $"Lease state contains duplicate lease id '{lease.Id}'. The persisted stream is corrupt."); + } + } + + // Leave the caller's stream positioned at the start so it is symmetric with + // the state produced by ShutdownAsync and the stream remains immediately + // re-readable by the caller (e.g., to persist it elsewhere). + leaseStateStream.Position = 0; + + return container; + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/InMemoryLeaseJsonFormat.cs b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/InMemoryLeaseJsonFormat.cs new file mode 100644 index 0000000000..da1c99a63f --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/LeaseManagement/InMemoryLeaseJsonFormat.cs @@ -0,0 +1,87 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement +{ + using System.Collections.Generic; + using System.IO; + using System.Text; + using Newtonsoft.Json; + + /// + /// Centralized JSON format used to serialize and deserialize in-memory lease state + /// to/from a . Co-locating the format here prevents silent + /// drift between the writer () + /// and the reader (). + /// + /// + /// Each carries a Timestamp field that is serialized verbatim. + /// If the elapsed time between stopping a processor and starting the next one exceeds the configured + /// lease expiration interval (default 60 seconds), restored leases will initially appear expired and + /// the current host will re-acquire each one on the first balancing cycle after StartAsync. This is + /// a one-time self-healing event per restored lease; it does not cause data loss and, because the + /// in-memory container is single-host by design, it does not cause ownership flapping. The only + /// observable effect is a burst of lease-acquire trace messages at startup. + /// + internal static class InMemoryLeaseJsonFormat + { + /// + /// StreamReader/Writer default buffer size. Exposed as a constant so the read and + /// write paths cannot drift apart. + /// + private const int BufferSize = 1024; + + /// + /// UTF-8 without BOM. Matches the default StreamWriter encoding for portability. + /// + private static readonly Encoding SerializationEncoding = new UTF8Encoding(encoderShouldEmitUTF8Identifier: false); + + /// + /// Serializes to a byte array using the in-memory lease JSON format. + /// + public static byte[] Serialize(IReadOnlyCollection leases) + { + using (MemoryStream temp = new MemoryStream()) + { + using (StreamWriter writer = new StreamWriter(temp, encoding: SerializationEncoding, bufferSize: BufferSize, leaveOpen: true)) + using (JsonTextWriter jsonWriter = new JsonTextWriter(writer)) + { + JsonSerializer serializer = JsonSerializer.Create(); + serializer.Serialize(jsonWriter, leases); + } + + return temp.ToArray(); + } + } + + /// + /// Deserializes an array of previously produced by + /// . Returns an empty list when is empty + /// or null. Throws when the stream content is not valid JSON + /// in the expected shape; callers are responsible for wrapping that into an implementation + /// specific exception. + /// + public static List Deserialize(Stream source) + { + if (source == null || source.Length == 0) + { + return new List(); + } + + source.Position = 0; + + using (StreamReader sr = new StreamReader( + source, + encoding: SerializationEncoding, + detectEncodingFromByteOrderMarks: true, + bufferSize: BufferSize, + leaveOpen: true)) + using (JsonTextReader jsonReader = new JsonTextReader(sr)) + { + JsonSerializer serializer = JsonSerializer.Create(); + return serializer.Deserialize>(jsonReader) ?? new List(); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorBuilderTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorBuilderTests.cs index 45af2b8d5c..5b5633b8c0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorBuilderTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorBuilderTests.cs @@ -5,11 +5,18 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Linq; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.Configuration; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; using Microsoft.Azure.Cosmos.Tests; + using Microsoft.Azure.Documents.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; + using Newtonsoft.Json; [TestClass] [TestCategory("ChangeFeed")] @@ -246,6 +253,306 @@ public void ConvertsToUTC() Assert.IsInstanceOfType(builder.Build(), typeof(ChangeFeedProcessor)); } + #region WithInMemoryLeaseContainer(MemoryStream) Tests + + [TestMethod] + public async Task WithInMemoryLeaseContainerWithStreamInitializesStoreCorrectly() + { + // Build a MemoryStream with lease data + DocumentServiceLeaseCoreEpk lease = new DocumentServiceLeaseCoreEpk + { + LeaseId = "stream-lease", + LeaseToken = "0", + ContinuationToken = "stream-continuation", + Owner = "stream-owner", + FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) + }; + + ConcurrentDictionary sourceContainer = new ConcurrentDictionary(); + sourceContainer.TryAdd(lease.Id, lease); + MemoryStream leaseState = new MemoryStream(); + DocumentServiceLeaseContainerInMemory source = new DocumentServiceLeaseContainerInMemory(sourceContainer, leaseState); + await source.ShutdownAsync(); + + DocumentServiceLeaseStoreManager capturedManager = null; + + Action verifier = (DocumentServiceLeaseStoreManager leaseStoreManager, + Container leaseContainer, + string instanceName, + ChangeFeedLeaseOptions changeFeedLeaseOptions, + ChangeFeedProcessorOptions changeFeedProcessorOptions, + Container monitoredContainer) => + { + capturedManager = leaseStoreManager; + Assert.IsInstanceOfType(leaseStoreManager, typeof(DocumentServiceLeaseStoreManagerInMemory)); + }; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + verifier); + + builder.WithInMemoryLeaseContainer(leaseState); + builder.Build(); + + Assert.IsNotNull(capturedManager); + IReadOnlyList allLeases = await capturedManager.LeaseContainer.GetAllLeasesAsync(); + Assert.AreEqual(1, allLeases.Count); + Assert.AreEqual("0", allLeases[0].CurrentLeaseToken); + Assert.AreEqual("stream-continuation", allLeases[0].ContinuationToken); + } + + [TestMethod] + public async Task WithInMemoryLeaseContainerWithEmptyStreamInitializesEmptyStore() + { + MemoryStream leaseState = new MemoryStream(); + + DocumentServiceLeaseStoreManager capturedManager = null; + + Action verifier = (DocumentServiceLeaseStoreManager leaseStoreManager, + Container leaseContainer, + string instanceName, + ChangeFeedLeaseOptions changeFeedLeaseOptions, + ChangeFeedProcessorOptions changeFeedProcessorOptions, + Container monitoredContainer) => capturedManager = leaseStoreManager; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + verifier); + + builder.WithInMemoryLeaseContainer(leaseState); + builder.Build(); + + Assert.IsNotNull(capturedManager); + IReadOnlyList allLeases = await capturedManager.LeaseContainer.GetAllLeasesAsync(); + Assert.AreEqual(0, allLeases.Count); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void WithInMemoryLeaseContainerWithNullStreamThrows() + { + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + builder.WithInMemoryLeaseContainer((MemoryStream)null); + } + + [TestMethod] + public async Task WithInMemoryLeaseContainer_FullLifecycle_RestoreProcessStopPersist() + { + // Arrange — create initial lease state in a stream + DocumentServiceLeaseCoreEpk originalLease = new DocumentServiceLeaseCoreEpk + { + LeaseId = "lifecycle-lease", + LeaseToken = "0", + ContinuationToken = "original-continuation", + Owner = "original-owner", + FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) + }; + + ConcurrentDictionary seedContainer = new ConcurrentDictionary(); + seedContainer.TryAdd(originalLease.Id, originalLease); + MemoryStream leaseState = new MemoryStream(); + DocumentServiceLeaseContainerInMemory seed = new DocumentServiceLeaseContainerInMemory(seedContainer, leaseState); + await seed.ShutdownAsync(); + + // Act — build with the populated stream, capturing the store manager + DocumentServiceLeaseStoreManager capturedManager = null; + + Action verifier = (DocumentServiceLeaseStoreManager leaseStoreManager, + Container leaseContainer, + string instanceName, + ChangeFeedLeaseOptions changeFeedLeaseOptions, + ChangeFeedProcessorOptions changeFeedProcessorOptions, + Container monitoredContainer) => + { + capturedManager = leaseStoreManager; + }; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + verifier); + + builder.WithInMemoryLeaseContainer(leaseState); + builder.Build(); + + // Verify leases were restored + Assert.IsNotNull(capturedManager); + IReadOnlyList restoredLeases = await capturedManager.LeaseContainer.GetAllLeasesAsync(); + Assert.AreEqual(1, restoredLeases.Count); + Assert.AreEqual("original-continuation", restoredLeases[0].ContinuationToken); + + // Simulate stop — persist state back to the same stream + await capturedManager.ShutdownAsync(); + + // Assert — stream is still usable and contains valid serialized state + Assert.IsTrue(leaseState.CanRead, "Stream should still be readable after ShutdownAsync"); + Assert.IsTrue(leaseState.Length > 0, "Stream should contain serialized lease data"); + + // Verify the persisted data round-trips correctly + leaseState.Position = 0; + using (StreamReader sr = new StreamReader(leaseState, leaveOpen: true)) + using (JsonTextReader jsonReader = new JsonTextReader(sr)) + { + List persisted = JsonSerializer.Create().Deserialize>(jsonReader); + + Assert.AreEqual(1, persisted.Count); + Assert.AreEqual("lifecycle-lease", persisted[0].Id); + Assert.AreEqual("original-continuation", persisted[0].ContinuationToken); + Assert.IsNotNull(persisted[0].FeedRange); + } + } + + #endregion + + #region Edge Case Tests + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void WithInMemoryLeaseContainerWithCorruptedStreamThrows() + { + byte[] garbage = System.Text.Encoding.UTF8.GetBytes("not valid json {{{"); + MemoryStream corruptedStream = new MemoryStream(); + corruptedStream.Write(garbage, 0, garbage.Length); + corruptedStream.Position = 0; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + builder.WithInMemoryLeaseContainer(corruptedStream); + } + + [TestMethod] + public async Task WithInMemoryLeaseContainerWithEmptyArrayStreamInitializesEmptyStore() + { + byte[] emptyArray = System.Text.Encoding.UTF8.GetBytes("[]"); + MemoryStream stream = new MemoryStream(); + stream.Write(emptyArray, 0, emptyArray.Length); + stream.Position = 0; + + DocumentServiceLeaseStoreManager capturedManager = null; + + Action verifier = (DocumentServiceLeaseStoreManager leaseStoreManager, + Container leaseContainer, + string instanceName, + ChangeFeedLeaseOptions changeFeedLeaseOptions, + ChangeFeedProcessorOptions changeFeedProcessorOptions, + Container monitoredContainer) => + { + capturedManager = leaseStoreManager; + }; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + verifier); + + builder.WithInMemoryLeaseContainer(stream); + builder.Build(); + + Assert.IsNotNull(capturedManager); + IReadOnlyList allLeases = await capturedManager.LeaseContainer.GetAllLeasesAsync(); + Assert.AreEqual(0, allLeases.Count); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void WithInMemoryLeaseContainerWithNullLeaseEntryThrows() + { + byte[] nullEntry = System.Text.Encoding.UTF8.GetBytes("[null]"); + MemoryStream stream = new MemoryStream(); + stream.Write(nullEntry, 0, nullEntry.Length); + stream.Position = 0; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + builder.WithInMemoryLeaseContainer(stream); + } + + [TestMethod] + [ExpectedException(typeof(InvalidOperationException))] + public void WithInMemoryLeaseContainerWithEmptyLeaseIdThrows() + { + byte[] emptyId = System.Text.Encoding.UTF8.GetBytes("[{\"id\":\"\"}]"); + MemoryStream stream = new MemoryStream(); + stream.Write(emptyId, 0, emptyId.Length); + stream.Position = 0; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + builder.WithInMemoryLeaseContainer(stream); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public void WithInMemoryLeaseContainerWithReadOnlyStreamThrows() + { + byte[] data = System.Text.Encoding.UTF8.GetBytes("[]"); + MemoryStream readOnlyStream = new MemoryStream(data, writable: false); + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + builder.WithInMemoryLeaseContainer(readOnlyStream); + } + + [TestMethod] + public void WithInMemoryLeaseContainerWithCorruptedStreamThrowsInvalidOperation() + { + byte[] corruptedData = System.Text.Encoding.UTF8.GetBytes("this is not valid JSON{{{"); + MemoryStream corruptedStream = new MemoryStream(); + corruptedStream.Write(corruptedData, 0, corruptedData.Length); + corruptedStream.Position = 0; + + ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorBuilder("workflowName", + ChangeFeedProcessorBuilderTests.GetMockedContainer(), + ChangeFeedProcessorBuilderTests.GetMockedProcessor(), + ChangeFeedProcessorBuilderTests.GetEmptyInitialization()); + + InvalidOperationException ex = Assert.ThrowsException( + () => builder.WithInMemoryLeaseContainer(corruptedStream)); + + Assert.IsTrue(ex.Message.Contains("Failed to deserialize lease state")); + Assert.IsNotNull(ex.InnerException); + } + + #endregion + private static ContainerInternal GetMockedContainer(string containerName = null) { Mock mockedContainer = MockCosmosUtil.CreateMockContainer(containerName: containerName); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs index c439950182..4a444b03dd 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs @@ -5,7 +5,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { using System; + using System.Collections.Concurrent; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -351,6 +353,128 @@ public async Task StopAsync() .Verify(store => store.GetAllLeasesAsync(), Times.Exactly(2)); } + [TestMethod] + public async Task StopAsync_CallsShutdownAsync() + { + Mock leaseStore = new Mock(); + leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true); + + Mock leaseContainer = new Mock(); + leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); + + Mock leaseStoreManager = new Mock(); + leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); + leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); + leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.ShutdownAsync()).Returns(Task.CompletedTask); + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out Mock factory, out Mock observer); + processor.ApplyBuildConfiguration( + leaseStoreManager.Object, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + new ChangeFeedProcessorOptions(), + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + await processor.StartAsync(); + await processor.StopAsync(); + + leaseStoreManager + .Verify(store => store.ShutdownAsync(), Times.Once); + } + + [TestMethod] + public async Task StopAsync_WithInMemoryLeases_PersistsStateToStream() + { + // Arrange — real in-memory store with a real MemoryStream + DocumentServiceLeaseCoreEpk lease = new DocumentServiceLeaseCoreEpk + { + LeaseId = "e2e-lease", + LeaseToken = "0", + ContinuationToken = "e2e-continuation", + Owner = "e2e-owner", + FeedRange = new FeedRangeEpk(new Documents.Routing.Range("", "FF", true, false)) + }; + + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd(lease.Id, lease); + + MemoryStream leaseState = new MemoryStream(); + DocumentServiceLeaseStoreManagerInMemory storeManager = new DocumentServiceLeaseStoreManagerInMemory(container, leaseState); + + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + processor.ApplyBuildConfiguration( + storeManager, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + new ChangeFeedProcessorOptions(), + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + // Act — full lifecycle: start → stop (which triggers ShutdownAsync → persist) + await processor.StartAsync(); + await processor.StopAsync(); + + // Assert — stream is usable and contains valid serialized lease state + Assert.IsTrue(leaseState.CanRead, "Stream should still be readable after StopAsync"); + Assert.IsTrue(leaseState.Length > 0, "Stream should contain serialized lease data"); + + // Verify the persisted data deserializes correctly + leaseState.Position = 0; + using (StreamReader sr = new StreamReader(leaseState, leaveOpen: true)) + using (Newtonsoft.Json.JsonTextReader jsonReader = new Newtonsoft.Json.JsonTextReader(sr)) + { + List persisted = Newtonsoft.Json.JsonSerializer.Create() + .Deserialize>(jsonReader); + + Assert.AreEqual(1, persisted.Count); + Assert.AreEqual("e2e-lease", persisted[0].Id); + Assert.AreEqual("e2e-continuation", persisted[0].ContinuationToken); + Assert.IsNotNull(persisted[0].FeedRange); + Assert.IsInstanceOfType(persisted[0].FeedRange, typeof(FeedRangeEpk)); + } + } + + [TestMethod] + public async Task StopAsync_WhenShutdownAsyncThrows_ExceptionPropagates() + { + // Arrange — set up a processor where ShutdownAsync throws + Mock leaseStore = new Mock(); + leaseStore.Setup(l => l.IsInitializedAsync()).ReturnsAsync(true); + + Mock leaseContainer = new Mock(); + leaseContainer.Setup(l => l.GetOwnedLeasesAsync()).Returns(Task.FromResult(Enumerable.Empty())); + leaseContainer.Setup(l => l.GetAllLeasesAsync()).ReturnsAsync(new List()); + + Mock leaseStoreManager = new Mock(); + leaseStoreManager.Setup(l => l.LeaseContainer).Returns(leaseContainer.Object); + leaseStoreManager.Setup(l => l.LeaseManager).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.LeaseStore).Returns(leaseStore.Object); + leaseStoreManager.Setup(l => l.LeaseCheckpointer).Returns(Mock.Of); + leaseStoreManager.Setup(l => l.ShutdownAsync()).ThrowsAsync(new InvalidOperationException("Shutdown failed")); + + ChangeFeedProcessorCore processor = ChangeFeedProcessorCoreTests.CreateProcessor(out _, out _); + processor.ApplyBuildConfiguration( + leaseStoreManager.Object, + null, + "instanceName", + new ChangeFeedLeaseOptions(), + new ChangeFeedProcessorOptions(), + ChangeFeedProcessorCoreTests.GetMockedContainer("monitored")); + + await processor.StartAsync(); + + // Act & Assert — StopAsync propagates ShutdownAsync exceptions so callers + // know persistence failed. + InvalidOperationException ex = await Assert.ThrowsExceptionAsync( + () => processor.StopAsync()); + Assert.AreEqual("Shutdown failed", ex.Message); + + // Assert — ShutdownAsync was still invoked + leaseStoreManager.Verify(l => l.ShutdownAsync(), Times.Once); + } private static ChangeFeedProcessorCore CreateProcessor( out Mock factory, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs index 19f078d852..9e71e79e59 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerCosmosTests.cs @@ -4,15 +4,11 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { - using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; - using Microsoft.Azure.Cosmos.Fluent; - using Microsoft.Azure.Cosmos.Tests; - using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -73,13 +69,8 @@ public async Task GetOwnedLeasesAsync_ReturnsOnlyMatched() } - private static Container GetMockedContainer(string containerName = "myColl") + private static Container GetMockedContainer() { - Headers headers = new Headers - { - ContinuationToken = string.Empty - }; - MockFeedResponse cosmosFeedResponse = new MockFeedResponse() { Documents = DocumentServiceLeaseContainerCosmosTests.allLeases diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerInMemoryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerInMemoryTests.cs index 8c6258b27c..cb06473a25 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerInMemoryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/DocumentServiceLeaseContainerInMemoryTests.cs @@ -4,13 +4,17 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Tests { + using System; using System.Collections.Concurrent; using System.Collections.Generic; + using System.IO; using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement; + using Microsoft.Azure.Documents.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; + using Newtonsoft.Json; [TestClass] [TestCategory("ChangeFeed")] @@ -31,5 +35,336 @@ public async Task AllLeasesAreOwnedLeases() CollectionAssert.AreEqual(expectedLeases, ownedLeases.ToList()); CollectionAssert.AreEqual(allLeases.ToList(), ownedLeases.ToList()); } + + #region ShutdownAsync Tests + + [TestMethod] + public async Task ShutdownAsync_WithNoStream_IsNoOp() + { + // Arrange — container without a stream + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("lease0", new DocumentServiceLeaseCore { LeaseId = "lease0", LeaseToken = "0" }); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container); + + // Act — should not throw + await inMemoryContainer.ShutdownAsync(); + } + + [TestMethod] + [DataRow(0, DisplayName = "Empty container persists empty array")] + [DataRow(2, DisplayName = "Container with two leases persists both")] + public async Task ShutdownAsync_WritesExpectedCount(int leaseCount) + { + // Arrange + ConcurrentDictionary container = new ConcurrentDictionary(); + for (int i = 0; i < leaseCount; i++) + { + DocumentServiceLeaseCoreEpk lease = new DocumentServiceLeaseCoreEpk + { + LeaseId = $"lease{i}", + LeaseToken = i.ToString(), + Owner = $"instance{i}", + FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) + }; + container.TryAdd(lease.Id, lease); + } + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, stream); + + // Act + await inMemoryContainer.ShutdownAsync(); + + // Assert + Assert.IsTrue(stream.Length > 0, "Stream should contain data even for an empty lease list (serialized as [])."); + stream.Position = 0; + List deserialized = DeserializeLeasesFromStream(stream); + Assert.AreEqual(leaseCount, deserialized.Count); + } + + [TestMethod] + public async Task ShutdownAsync_StreamPositionResetToZero() + { + // Arrange + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("lease0", new DocumentServiceLeaseCoreEpk { LeaseId = "lease0", LeaseToken = "0", FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) }); + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, stream); + + // Act + await inMemoryContainer.ShutdownAsync(); + + // Assert — stream position should be 0 for the next reader + Assert.AreEqual(0, stream.Position); + } + + #endregion + + #region RoundTrip Tests + + [TestMethod] + public async Task PersistThenDeserialize_RoundTrip_PreservesData() + { + // Arrange + DateTime originalTimestamp = new DateTime(2023, 6, 15, 12, 34, 56, DateTimeKind.Utc); + DocumentServiceLeaseCoreEpk originalLease = new DocumentServiceLeaseCoreEpk + { + LeaseId = "roundtrip-lease", + LeaseToken = "0", + Owner = "original-owner", + ContinuationToken = "original-token", + Mode = "IncrementalFeed", + Properties = new Dictionary + { + { "custom", "value" }, + { "unicode", "日本語" }, + }, + FeedRange = new FeedRangeEpk(new Range("AA", "BB", true, false)), + Timestamp = originalTimestamp, + }; + + ConcurrentDictionary sourceContainer = new ConcurrentDictionary(); + sourceContainer.TryAdd(originalLease.Id, originalLease); + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory source = new DocumentServiceLeaseContainerInMemory(sourceContainer, stream); + + // Act — persist then deserialize through the StoreManager so we exercise the + // same code path that customers hit via WithInMemoryLeaseContainer(stream). + await source.ShutdownAsync(); + + DocumentServiceLeaseStoreManagerInMemory restoredManager = new DocumentServiceLeaseStoreManagerInMemory(stream); + IReadOnlyList restored = await restoredManager.LeaseContainer.GetAllLeasesAsync(); + Assert.AreEqual(1, restored.Count); + + DocumentServiceLease importedLease = restored[0]; + + // Assert — scalar fields are preserved verbatim. + Assert.IsNotNull(importedLease); + Assert.AreEqual("roundtrip-lease", importedLease.Id); + Assert.AreEqual("0", importedLease.CurrentLeaseToken); + Assert.AreEqual("original-token", importedLease.ContinuationToken); + Assert.AreEqual("IncrementalFeed", importedLease.Mode); + Assert.AreEqual("original-owner", importedLease.Owner); + + // Properties (including non-ASCII values) round-trip. + Assert.IsNotNull(importedLease.Properties); + Assert.AreEqual(2, importedLease.Properties.Count); + Assert.AreEqual("value", importedLease.Properties["custom"]); + Assert.AreEqual("日本語", importedLease.Properties["unicode"]); + + // FeedRange shape and values round-trip. + Assert.IsInstanceOfType(importedLease.FeedRange, typeof(FeedRangeEpk)); + FeedRangeEpk importedFeedRange = (FeedRangeEpk)importedLease.FeedRange; + Assert.AreEqual("AA", importedFeedRange.Range.Min); + Assert.AreEqual("BB", importedFeedRange.Range.Max); + Assert.IsTrue(importedFeedRange.Range.IsMinInclusive); + Assert.IsFalse(importedFeedRange.Range.IsMaxInclusive); + + // Timestamp is preserved verbatim (confirms H3 no-mutation behavior). + Assert.AreEqual(originalTimestamp, importedLease.Timestamp.ToUniversalTime()); + + // After restore, the stream is rewound to 0 so callers can re-read it. + Assert.AreEqual(0, stream.Position); + } + + [TestMethod] + public async Task PersistOverwritesPreviousStreamContent() + { + // Arrange + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("lease0", new DocumentServiceLeaseCoreEpk { LeaseId = "lease0", LeaseToken = "0", Owner = "first", FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) }); + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, stream); + + // First persist + await inMemoryContainer.ShutdownAsync(); + + // Now change the lease data + container.Clear(); + container.TryAdd("lease1", new DocumentServiceLeaseCoreEpk { LeaseId = "lease1", LeaseToken = "1", Owner = "second", FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) }); + + // Second persist + await inMemoryContainer.ShutdownAsync(); + + // Assert — stream should contain only the new data + stream.Position = 0; + List deserialized = DeserializeLeasesFromStream(stream); + Assert.AreEqual(1, deserialized.Count); + Assert.AreEqual("lease1", deserialized[0].Id); + Assert.AreEqual("second", deserialized[0].Owner); + } + + #endregion + + private static List DeserializeLeasesFromStream(Stream stream) + { + using (StreamReader sr = new StreamReader(stream, leaveOpen: true)) + using (JsonTextReader jsonReader = new JsonTextReader(sr)) + { + return JsonSerializer.Create().Deserialize>(jsonReader); + } + } + + #region Deserialize Tests + + [TestMethod] + public void Deserialize_DuplicateIds_Throws() + { + // Arrange — hand-crafted JSON with two leases sharing the same id. + string duplicateJson = + "[" + + "{\"id\":\"dup\",\"Owner\":\"o1\",\"LeaseToken\":\"0\",\"ContinuationToken\":\"c1\"}," + + "{\"id\":\"dup\",\"Owner\":\"o2\",\"LeaseToken\":\"1\",\"ContinuationToken\":\"c2\"}" + + "]"; + + MemoryStream stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(duplicateJson)); + + // Act & Assert — restore should fail fast, not silently overwrite. + InvalidOperationException ex = Assert.ThrowsException( + () => new DocumentServiceLeaseStoreManagerInMemory(stream)); + + Assert.IsTrue(ex.Message.Contains("duplicate lease id"), $"Unexpected message: {ex.Message}"); + Assert.IsTrue(ex.Message.Contains("dup"), $"Unexpected message: {ex.Message}"); + } + + [TestMethod] + public async Task Deserialize_LeavesStreamPositionAtZero() + { + // Arrange — serialize some leases first. + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("l0", new DocumentServiceLeaseCoreEpk { LeaseId = "l0", LeaseToken = "0", FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) }); + + MemoryStream stream = new MemoryStream(); + await new DocumentServiceLeaseContainerInMemory(container, stream).ShutdownAsync(); + + // Seek to end to simulate a stream reused across multiple operations. + stream.Position = stream.Length; + + // Act — deserialization should rewind the stream. + _ = new DocumentServiceLeaseStoreManagerInMemory(stream); + + // Assert — stream is at position 0, ready to be re-read by the caller. + Assert.AreEqual(0, stream.Position); + } + + #endregion + + #region Edge Case Tests + + [TestMethod] + public async Task ShutdownAsync_WithNonEpkLease_StillSerializes() + { + // Arrange — non-EPK lease (no FeedRange) + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("core-lease", new DocumentServiceLeaseCore { LeaseId = "core-lease", LeaseToken = "0", Owner = "owner" }); + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, stream); + + // Act + await inMemoryContainer.ShutdownAsync(); + + // Assert — lease is serialized + Assert.IsTrue(stream.Length > 0); + stream.Position = 0; + List deserialized = DeserializeLeasesFromStream(stream); + Assert.AreEqual(1, deserialized.Count); + Assert.AreEqual("core-lease", deserialized[0].Id); + } + + [TestMethod] + public async Task ShutdownAsync_WithDisposedStream_Throws() + { + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd("lease0", new DocumentServiceLeaseCoreEpk { LeaseId = "lease0", LeaseToken = "0", FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) }); + + MemoryStream stream = new MemoryStream(); + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, stream); + + stream.Dispose(); + + // Disposed MemoryStream reports CanWrite=false, so SetLength throws NotSupportedException + // which the container surfaces as a descriptive InvalidOperationException (same shape + // as the non-resizable-stream-too-small path). + await Assert.ThrowsExceptionAsync( + () => inMemoryContainer.ShutdownAsync()); + } + + [TestMethod] + public async Task ShutdownAsync_WithNonResizableStream_SameSizeData_WritesSuccessfully() + { + // Arrange — simulate the documented restore pattern: new MemoryStream(byte[]) + // which creates a non-expandable stream. + DocumentServiceLeaseCoreEpk lease = new DocumentServiceLeaseCoreEpk + { + LeaseId = "0", + LeaseToken = "0", + ContinuationToken = "1", + Owner = "owner", + FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) + }; + + ConcurrentDictionary container = new ConcurrentDictionary(); + container.TryAdd(lease.Id, lease); + + // First, serialize to get realistic bytes (non-expandable stream needs sufficient capacity) + MemoryStream temp = new MemoryStream(); + DocumentServiceLeaseContainerInMemory seeder = new DocumentServiceLeaseContainerInMemory(container, temp); + await seeder.ShutdownAsync(); + + // Create a non-expandable stream from the byte array (matches user pattern: new MemoryStream(File.ReadAllBytes(...))) + byte[] savedState = temp.ToArray(); + MemoryStream nonResizable = new MemoryStream(savedState); + + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, nonResizable); + + // Act — should not throw NotSupportedException + await inMemoryContainer.ShutdownAsync(); + + // Assert + Assert.AreEqual(0, nonResizable.Position); + Assert.IsTrue(nonResizable.Length > 0); + List deserialized = DeserializeLeasesFromStream(nonResizable); + Assert.AreEqual(1, deserialized.Count); + Assert.AreEqual("0", deserialized[0].Id); + Assert.AreEqual("1", deserialized[0].ContinuationToken); + } + + [TestMethod] + public async Task ShutdownAsync_WithNonResizableStream_LargerData_ThrowsInvalidOperation() + { + // Arrange — start with a small non-expandable stream, then add more leases + // so serialized output exceeds the original buffer capacity. + byte[] smallBuffer = System.Text.Encoding.UTF8.GetBytes("[]"); + MemoryStream nonResizable = new MemoryStream(smallBuffer); + + ConcurrentDictionary container = new ConcurrentDictionary(); + for (int i = 0; i < 10; i++) + { + DocumentServiceLeaseCoreEpk lease = new DocumentServiceLeaseCoreEpk + { + LeaseId = i.ToString(), + LeaseToken = i.ToString(), + ContinuationToken = $"continuation-{i}", + Owner = "owner", + FeedRange = new FeedRangeEpk(new Range("", "FF", true, false)) + }; + container.TryAdd(lease.Id, lease); + } + + DocumentServiceLeaseContainerInMemory inMemoryContainer = new DocumentServiceLeaseContainerInMemory(container, nonResizable); + + // Act & Assert — should throw InvalidOperationException with helpful message + InvalidOperationException ex = await Assert.ThrowsExceptionAsync( + () => inMemoryContainer.ShutdownAsync()); + + Assert.IsTrue(ex.Message.Contains("not expandable")); + Assert.IsInstanceOfType(ex.InnerException, typeof(NotSupportedException)); + } + + #endregion } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetSDKAPI.net6.json b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetSDKAPI.net6.json index 0f3dbfe49e..0328f86c40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetSDKAPI.net6.json +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Contracts/DotNetSDKAPI.net6.json @@ -375,6 +375,11 @@ "Attributes": [], "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder WithInMemoryLeaseContainer();IsAbstract:False;IsStatic:False;IsVirtual:True;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" }, + "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(System.IO.MemoryStream)": { + "Type": "Method", + "Attributes": [], + "MethodInfo": "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(System.IO.MemoryStream);IsAbstract:False;IsStatic:False;IsVirtual:True;IsGenericMethod:False;IsConstructor:False;IsFinal:False;" + }, "Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder WithInstanceName(System.String)": { "Type": "Method", "Attributes": [],