Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8711b51
Adds: Lease container export support
yash2710 Jan 27, 2026
2e0faa4
Simplify implementation
yash2710 Feb 9, 2026
fd8686f
Only support EPK range based leases for import/export
yash2710 Feb 27, 2026
269b195
Update implementation
yash2710 Mar 2, 2026
6dfcd7e
Add inmemory lease container initialization document support
yash2710 Mar 23, 2026
7cb829c
Update attributes
yash2710 Mar 26, 2026
b567bf4
Update contract
yash2710 Mar 26, 2026
735d256
Update implementation
yash2710 Apr 2, 2026
0299cc0
Cleanup
yash2710 Apr 2, 2026
5956597
Limit to FeedRangeEPK based leases
yash2710 Apr 6, 2026
448d89a
update STJ to Newtonsoft
yash2710 Apr 7, 2026
3d55713
Cleanup
yash2710 Apr 7, 2026
cc9c7a9
Address comments
yash2710 Apr 9, 2026
16d7f92
Update contract
yash2710 Apr 13, 2026
946defc
Update contract
yash2710 Apr 14, 2026
0f8d16e
Merge branch 'master' into users/trivediyash/leaseExport
kirankumarkolli Apr 21, 2026
37b450d
Address comments
yash2710 Apr 22, 2026
5526e12
Merge branch 'master' into users/trivediyash/leaseExport
NaluTripician Apr 23, 2026
20d52c4
Merge branch 'master' into users/trivediyash/leaseExport
NaluTripician Apr 23, 2026
516000b
ChangeFeedProcessor: Fixes blocking review comments for lease export
kirankumarkolli Apr 23, 2026
a95f582
ChangeFeedProcessor: Refactors ShutdownAsync from LeaseContainer to S…
kirankumarkolli Apr 23, 2026
db11ce1
ChangeFeedProcessor: Refactors to co-locate lease serialization/deser…
kirankumarkolli Apr 23, 2026
a21cfcd
Tests: Fixes non-resizable stream tests and adds validation test
kirankumarkolli Apr 23, 2026
1b2c508
ChangeFeedProcessor: Refactors ShutdownAsync to abstract in base class
kirankumarkolli Apr 23, 2026
40551c6
ChangeFeedProcessor: Removes eager stream expandability validation
kirankumarkolli Apr 23, 2026
f92c683
ChangeFeedProcessor: Refactors in-memory lease shutdown for correctne…
kirankumarkolli Apr 24, 2026
54d9c7b
Tests: Refactors in-memory lease container tests
kirankumarkolli Apr 24, 2026
0d45b61
ChangeFeedProcessor: Refactors lease state rehydrate doc example
kirankumarkolli Apr 24, 2026
b124707
Merge branch 'master' into users/trivediyash/leaseExport
kirankumarkolli Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.IO;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using static Microsoft.Azure.Cosmos.Container;
Expand Down Expand Up @@ -219,22 +220,73 @@ public ChangeFeedProcessorBuilder WithLeaseContainer(Container leaseContainer)
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer()
{
if (this.leaseContainer != null)
this.ValidateNoLeaseContainerConfigured();

if (string.IsNullOrEmpty(this.InstanceName))
{
throw new InvalidOperationException("The builder already defined a lease container.");
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
}

if (this.LeaseStoreManager != null)
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
return this;
}

/// <summary>
/// Uses an in-memory container to maintain state of the leases, optionally initialized from a <see cref="MemoryStream"/>
/// containing previously persisted lease state.
///
/// When the processor is stopped via <see cref="ChangeFeedProcessor.StopAsync"/>, the current lease state
/// is automatically written back to the same <paramref name="leaseState"/> stream, allowing the state to be
/// restored when creating a new processor instance.
///
/// Using an in-memory container restricts the scaling capability to just the instance running the current processor.
/// </summary>
/// <remarks>
/// <para>
/// <see cref="ChangeFeedProcessor.StopAsync"/> must not be invoked concurrently from multiple threads; the
/// in-memory container expects a single shutdown call and does not synchronize concurrent writers to
/// <paramref name="leaseState"/>.
/// </para>
/// </remarks>
/// <param name="leaseState">
/// A <see cref="MemoryStream"/> that serves as both input and output for lease state.
/// If the stream contains data, leases are deserialized and used to initialize the container.
/// When the processor stops, the current lease state is serialized back into this stream.
/// The stream must be writable and expandable (for example, created via <c>new MemoryStream()</c>).
/// A fixed-size stream such as <c>new MemoryStream(byte[])</c> will fail at shutdown if the
/// serialized lease state exceeds the original buffer capacity.
/// A <see cref="MemoryStream"/> is required (rather than the base <see cref="System.IO.Stream"/> type) so that
/// the lease state can be trimmed via <see cref="MemoryStream.SetLength(long)"/> when a new snapshot is smaller
/// than the previously persisted one. To integrate with <see cref="System.IO.Stream"/>-based persistence
/// (e.g., a file or blob), call <see cref="MemoryStream.ToArray"/> after <see cref="ChangeFeedProcessor.StopAsync"/>
/// to obtain the persisted bytes; create an expandable
/// <see cref="MemoryStream"/> (<c>new MemoryStream()</c>), write the bytes into it, set
/// <see cref="System.IO.Stream.Position"/> back to 0, and pass it to this method.
/// </param>
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
/// <exception cref="ArgumentNullException">Thrown when <paramref name="leaseState"/> is null.</exception>
public virtual ChangeFeedProcessorBuilder WithInMemoryLeaseContainer(MemoryStream leaseState)
Comment thread
yash2710 marked this conversation as resolved.
Comment thread
kirankumarkolli marked this conversation as resolved.
Comment thread
kirankumarkolli marked this conversation as resolved.
Comment thread
kirankumarkolli marked this conversation as resolved.
{
if (leaseState == null)
{
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
throw new ArgumentNullException(nameof(leaseState));
}

this.ValidateNoLeaseContainerConfigured();

if (!leaseState.CanWrite)
{
throw new ArgumentException("The lease state stream must be writable so that state can be persisted on shutdown.", nameof(leaseState));
}

if (string.IsNullOrEmpty(this.InstanceName))
{
this.InstanceName = ChangeFeedProcessorBuilder.InMemoryDefaultHostName;
}

this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory();
// Deserialization of lease state (if any) is handled inside the manager
Comment thread
kirankumarkolli marked this conversation as resolved.
// so that serialization and deserialization are co-located in the same layer.
this.LeaseStoreManager = new DocumentServiceLeaseStoreManagerInMemory(leaseState);
return this;
}

Expand Down Expand Up @@ -317,5 +369,18 @@ public ChangeFeedProcessor Build()
this.isBuilt = true;
return this.changeFeedProcessor;
}

private void ValidateNoLeaseContainerConfigured()
{
if (this.leaseContainer != null)
{
throw new InvalidOperationException("The builder already defined a lease container.");
}

if (this.LeaseStoreManager != null)
{
throw new InvalidOperationException("The builder already defined an in-memory lease container instance.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
Expand Down Expand Up @@ -79,7 +78,14 @@ public override async Task StartAsync()
public override async Task StopAsync()
{
DefaultTrace.TraceInformation("Stopping processor...");

// Persist in-memory lease state before stopping the partition manager so that
// a subsequent partition-manager shutdown failure cannot prevent recovery of the
// lease snapshot. No-op for Cosmos-backed leases.
await this.documentServiceLeaseStoreManager.ShutdownAsync().ConfigureAwait(false);

await this.partitionManager.StopAsync().ConfigureAwait(false);

DefaultTrace.TraceInformation("Processor stopped.");
Comment thread
kirankumarkolli marked this conversation as resolved.
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
//------------------------------------------------------------
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

internal sealed class DocumentServiceLeaseContainerInMemory : DocumentServiceLeaseContainer
{
private readonly ConcurrentDictionary<string, DocumentServiceLease> container;
private readonly MemoryStream leaseStateStream;

public DocumentServiceLeaseContainerInMemory(ConcurrentDictionary<string, DocumentServiceLease> container)
: this(container, leaseStateStream: null)
{
}

public DocumentServiceLeaseContainerInMemory(
ConcurrentDictionary<string, DocumentServiceLease> container,
MemoryStream leaseStateStream)
Comment thread
kirankumarkolli marked this conversation as resolved.
{
this.container = container;
this.leaseStateStream = leaseStateStream;
}

public override Task<IReadOnlyList<DocumentServiceLease>> GetAllLeasesAsync()
Expand All @@ -27,5 +38,46 @@ public override Task<IEnumerable<DocumentServiceLease>> GetOwnedLeasesAsync()
{
return Task.FromResult<IEnumerable<DocumentServiceLease>>(this.container.Values.AsEnumerable());
}

/// <summary>
/// Persists the current in-memory lease state into the user-supplied <see cref="MemoryStream"/>.
/// </summary>
/// <remarks>
/// Must only be invoked from the single <c>ChangeFeedProcessor.StopAsync</c> call path;
/// concurrent invocation is not supported and may corrupt the stream.
/// </remarks>
/// <returns>A completed task once the stream has been populated, or a no-op if no stream was supplied.</returns>
public Task ShutdownAsync()
{
if (this.leaseStateStream == null)
{
return Task.CompletedTask;
}

byte[] serializedBytes = InMemoryLeaseJsonFormat.Serialize(this.container.Values.ToList());

// Resize the target stream BEFORE writing. If the stream is not expandable and
// cannot hold the new payload, SetLength throws NotSupportedException and the
// user's stream is left untouched (no partial-write corruption). If SetLength
// succeeds, the subsequent Write is guaranteed to fit.
try
{
this.leaseStateStream.SetLength(serializedBytes.Length);
}
catch (NotSupportedException ex)
{
throw new InvalidOperationException(
"Failed to persist lease state because the MemoryStream is not expandable and the serialized "
+ "state exceeds its capacity. Use 'new MemoryStream()' or a MemoryStream with sufficient "
+ "capacity instead of 'new MemoryStream(byte[])' to create a resizable stream.",
ex);
}

this.leaseStateStream.Position = 0;
this.leaseStateStream.Write(serializedBytes, 0, serializedBytes.Length);
this.leaseStateStream.Position = 0;

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System.Threading.Tasks;

/// <summary>
/// The DocumentServiceLeaseStoreManager defines a way to perform operations with <see cref="DocumentServiceLease"/>.
/// </summary>
Expand All @@ -29,5 +31,13 @@ internal abstract class DocumentServiceLeaseStoreManager
/// for particular monitoring collection and lease container prefix.
/// </summary>
public abstract DocumentServiceLeaseStore LeaseStore { get; }

/// <summary>
/// Called when the processor is stopping. Implementations may override to perform
/// cleanup or state persistence. The default implementation (for Cosmos-backed
/// lease stores) is a no-op. Exceptions thrown from this method propagate to the
/// caller of <see cref="ChangeFeedProcessor.StopAsync"/>.
/// </summary>
public abstract Task ShutdownAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;

/// <summary>
Expand Down Expand Up @@ -80,5 +81,10 @@ internal DocumentServiceLeaseStoreManagerCosmos(
public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer;

public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer;

public override Task ShutdownAsync()
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Newtonsoft.Json;

/// <summary>
/// Lease manager that is using In-Memory as lease storage.
Expand All @@ -15,15 +19,33 @@ internal sealed class DocumentServiceLeaseStoreManagerInMemory : DocumentService
private readonly DocumentServiceLeaseStore leaseStore;
private readonly DocumentServiceLeaseManager leaseManager;
private readonly DocumentServiceLeaseCheckpointer leaseCheckpointer;
private readonly DocumentServiceLeaseContainer leaseContainer;
private readonly DocumentServiceLeaseContainerInMemory leaseContainer;

public DocumentServiceLeaseStoreManagerInMemory()
: this(new ConcurrentDictionary<string, DocumentServiceLease>())
{
}

/// <summary>
/// Initializes a new instance from a <see cref="MemoryStream"/> containing
/// previously persisted lease state. Deserialization is co-located here so
/// that the manager owns the lease JSON format for both read (restore) and
/// write (ShutdownAsync → persist).
/// </summary>
internal DocumentServiceLeaseStoreManagerInMemory(MemoryStream leaseStateStream)
: this(DocumentServiceLeaseStoreManagerInMemory.DeserializeLeaseState(leaseStateStream), leaseStateStream)
{
}

internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary<string, DocumentServiceLease> container)
: this(new DocumentServiceLeaseUpdaterInMemory(container), container)
: this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream: null)
{
}

internal DocumentServiceLeaseStoreManagerInMemory(
ConcurrentDictionary<string, DocumentServiceLease> container,
MemoryStream leaseStateStream)
: this(new DocumentServiceLeaseUpdaterInMemory(container), container, leaseStateStream)
{
}

Expand All @@ -35,7 +57,8 @@ internal DocumentServiceLeaseStoreManagerInMemory(ConcurrentDictionary<string, D
/// </remarks>
internal DocumentServiceLeaseStoreManagerInMemory(
DocumentServiceLeaseUpdater leaseUpdater,
ConcurrentDictionary<string, DocumentServiceLease> container) // For testing purposes only.
ConcurrentDictionary<string, DocumentServiceLease> container,
MemoryStream leaseStateStream = null)
{
if (leaseUpdater == null) throw new ArgumentException(nameof(leaseUpdater));

Expand All @@ -47,7 +70,7 @@ internal DocumentServiceLeaseStoreManagerInMemory(
leaseUpdater,
new PartitionedByIdCollectionRequestOptionsFactory());

this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container);
this.leaseContainer = new DocumentServiceLeaseContainerInMemory(container, leaseStateStream);
}

public override DocumentServiceLeaseStore LeaseStore => this.leaseStore;
Expand All @@ -57,5 +80,61 @@ internal DocumentServiceLeaseStoreManagerInMemory(
public override DocumentServiceLeaseCheckpointer LeaseCheckpointer => this.leaseCheckpointer;

public override DocumentServiceLeaseContainer LeaseContainer => this.leaseContainer;

public override Task ShutdownAsync()
{
return this.leaseContainer.ShutdownAsync();
}

/// <summary>
/// Deserializes lease state from a <see cref="MemoryStream"/> into a dictionary.
/// This is the counterpart of the serialization in
/// <see cref="DocumentServiceLeaseContainerInMemory.ShutdownAsync"/>.
/// </summary>
private static ConcurrentDictionary<string, DocumentServiceLease> DeserializeLeaseState(
MemoryStream leaseStateStream)
{
ConcurrentDictionary<string, DocumentServiceLease> container =
new ConcurrentDictionary<string, DocumentServiceLease>();

if (leaseStateStream == null || leaseStateStream.Length == 0)
{
return container;
}

List<DocumentServiceLease> leases;
try
{
leases = InMemoryLeaseJsonFormat.Deserialize(leaseStateStream);
}
catch (JsonException ex)
{
throw new InvalidOperationException(
"Failed to deserialize lease state from the provided MemoryStream. "
+ "Ensure the stream contains valid lease state JSON previously persisted by the ChangeFeedProcessor.",
ex);
}

foreach (DocumentServiceLease lease in leases)
{
if (string.IsNullOrEmpty(lease?.Id))
{
throw new InvalidOperationException("Lease state contains a null or invalid lease entry.");
}

if (!container.TryAdd(lease.Id, lease))
{
throw new InvalidOperationException(
$"Lease state contains duplicate lease id '{lease.Id}'. The persisted stream is corrupt.");
}
}

// Leave the caller's stream positioned at the start so it is symmetric with
// the state produced by ShutdownAsync and the stream remains immediately
// re-readable by the caller (e.g., to persist it elsewhere).
leaseStateStream.Position = 0;

return container;
}
}
}
Loading
Loading