Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
3c56d2f
ChangeFeedProcessor: Fixes lease de-duplication for /partitionKey-par…
kirankumarkolli Apr 22, 2026
f8d3a3e
ChangeFeedProcessor: Fixes deterministic lease partition key dedup be…
Copilot Apr 22, 2026
1404cc1
ChangeFeed tests: Adds full env-var coverage for both lease create ov…
Copilot Apr 22, 2026
537a24c
ChangeFeed tests: Pins deterministic mode for default-path assertions
Copilot Apr 22, 2026
84553ec
ChangeFeed tests: Isolates env-var state with test initialize cleanup
Copilot Apr 22, 2026
9bd4c8e
ChangeFeed tests: Moves deterministic assertions to default-path tests
Copilot Apr 22, 2026
c105570
ChangeFeed: Adds review feedback - helper extraction, dedup/backcompa…
kirankumarkolli Apr 23, 2026
829afe6
ChangeFeed tests: Adds two-call 409 dedup tests and parameterized Gre…
kirankumarkolli Apr 23, 2026
19922d2
ChangeFeed tests: Adds DoNotParallelize to env-var-dependent test cla…
kirankumarkolli Apr 23, 2026
bd8965e
Tests: Refactors 409 dedup tests to use (PK, id) uniqueness simulation
kirankumarkolli Apr 23, 2026
e2100bf
Tests: Refactors lease dedup tests to eliminate duplication
kirankumarkolli Apr 23, 2026
30144e4
Merge branch 'master' into users/kirankk/changefeed-lease-dedup-fix
kirankumarkolli Apr 23, 2026
278aedd
ChangeFeedProcessor: Refactors per review feedback
kirankumarkolli Apr 23, 2026
239a381
ChangeFeedProcessor: Refactors to static field initialization
kirankumarkolli Apr 23, 2026
cb93af1
ChangeFeedProcessor: Removes redundant parameter from GetLeasePartiti…
kirankumarkolli Apr 23, 2026
edcf4ab
Tests: Fixes GremlinSmokeTests to set static field directly
kirankumarkolli Apr 23, 2026
eeef59e
Tests: Refactors to use ConfigurationManager for default initialization
kirankumarkolli Apr 23, 2026
2d2b308
Tests: Removes env-sensitive deterministic assertions from legacy tests
Copilot Apr 23, 2026
5619f7d
Merge master and resolve ConfigurationManager conflict
Copilot Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ internal sealed class DocumentServiceLeaseManagerCosmos : DocumentServiceLeaseMa
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
private PartitionKeyRangeCache partitionKeyRangeCache;

internal static bool IsChangeFeedLeaseIdAsPartitionKeyEnabled = ConfigurationManager.IsChangeFeedLeaseIdAsPartitionKeyEnabled();
Comment thread
xinlian12 marked this conversation as resolved.

public DocumentServiceLeaseManagerCosmos(
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
Expand Down Expand Up @@ -128,7 +130,9 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
Mode = this.GetChangeFeedMode()
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
this.requestOptionsFactory.AddPartitionKeyIfNeeded(
(string pk) => documentServiceLease.LeasePartitionKey = pk,
DocumentServiceLeaseManagerCosmos.GetLeasePartitionKeyValue(documentServiceLease.LeaseId));

return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}
Expand All @@ -153,11 +157,20 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
Mode = this.GetChangeFeedMode()
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
this.requestOptionsFactory.AddPartitionKeyIfNeeded(
(string pk) => documentServiceLease.LeasePartitionKey = pk,
DocumentServiceLeaseManagerCosmos.GetLeasePartitionKeyValue(documentServiceLease.LeaseId));

return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}

private static string GetLeasePartitionKeyValue(string leaseId)
{
return DocumentServiceLeaseManagerCosmos.IsChangeFeedLeaseIdAsPartitionKeyEnabled
? leaseId
: Guid.NewGuid().ToString();
}

private string GetChangeFeedMode()
{
return this.options.Mode == ChangeFeedMode.AllVersionsAndDeletes
Expand Down
19 changes: 18 additions & 1 deletion Microsoft.Azure.Cosmos/src/Util/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ internal static class ConfigurationManager
/// </summary>
internal static readonly string Http2KeepAlivePingTimeoutInSeconds = "AZURE_COSMOS_HTTP2_KEEPALIVE_PING_TIMEOUT_IN_SECONDS";

/// <summary>
/// Environment variable name to enable deterministic lease-id partition key values for Change Feed lease creation.
/// </summary>
internal static readonly string ChangeFeedLeaseIdAsPartitionKeyEnabled = "AZURE_COSMOS_CHANGE_FEED_LEASE_ID_AS_PARTITION_KEY_ENABLED";

public static T GetEnvironmentVariable<T>(string variable, T defaultValue)
{
string value = Environment.GetEnvironmentVariable(variable);
Expand Down Expand Up @@ -212,7 +217,19 @@ public static bool IsThinClientEnabled(
.GetEnvironmentVariable(
variable: ConfigurationManager.ThinClientModeEnabled,
defaultValue: defaultValue);
}
}

/// <summary>
/// Gets the boolean value indicating whether Change Feed lease creation should use lease id as the partition key value.
/// </summary>
/// <returns>A boolean flag indicating if deterministic lease-id partition key behavior is enabled.</returns>
public static bool IsChangeFeedLeaseIdAsPartitionKeyEnabled()
{
return ConfigurationManager
.GetEnvironmentVariable(
variable: ConfigurationManager.ChangeFeedLeaseIdAsPartitionKeyEnabled,
defaultValue: true);
}

/// <summary>
/// Gets the AAD scope value to override.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.ChangeFeed
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
[TestCategory("ChangeFeed")]
[DoNotParallelize]
public class GremlinSmokeTests : BaseChangeFeedClientHelper
{
private Container Container;
Expand Down Expand Up @@ -75,39 +77,64 @@ public async Task WritesTriggerDelegate_WithLeaseContainer()

}

[TestMethod]
public async Task Schema_DefaultsToHavingPartitionKey()
[DataTestMethod]
[DataRow(true, DisplayName = "Default (deterministic PK): partition key equals lease id")]
[DataRow(false, DisplayName = "Opt-out (legacy GUID PK): partition key differs from lease id")]
public async Task Schema_DefaultsToHavingPartitionKey(bool isChangeFeedLeaseIdAsPartitionKeyEnabled)
{
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) => Task.CompletedTask)
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);
DocumentServiceLeaseManagerCosmos.IsChangeFeedLeaseIdAsPartitionKeyEnabled = isChangeFeedLeaseIdAsPartitionKeyEnabled;

// Verify that leases have the partitionKey attribute
using FeedIterator<dynamic> iterator = this.LeaseContainer.GetItemQueryIterator<dynamic>();
while (iterator.HasMoreResults)
try
{
FeedResponse<dynamic> page = await iterator.ReadNextAsync();
foreach (dynamic lease in page)
ChangeFeedProcessor processor = this.Container
.GetChangeFeedProcessorBuilder("test", (IReadOnlyCollection<TestClass> docs, CancellationToken token) => Task.CompletedTask)
.WithInstanceName("random")
.WithLeaseContainer(this.LeaseContainer).Build();

await processor.StartAsync();
// Letting processor initialize
await Task.Delay(BaseChangeFeedClientHelper.ChangeFeedSetupTime);

// Verify that leases have the partitionKey attribute
using FeedIterator<dynamic> iterator = this.LeaseContainer.GetItemQueryIterator<dynamic>();
while (iterator.HasMoreResults)
{
string leaseId = lease.id;
Assert.IsNotNull(lease.partitionKey);
if (leaseId.Contains(".info") || leaseId.Contains(".lock"))
FeedResponse<dynamic> page = await iterator.ReadNextAsync();
foreach (dynamic lease in page)
{
// These are the store initialization marks
continue;
string leaseId = lease.id;
Assert.IsNotNull(lease.partitionKey);
if (leaseId.Contains(".info") || leaseId.Contains(".lock"))
{
// These are the store initialization marks
continue;
}

if (isChangeFeedLeaseIdAsPartitionKeyEnabled)
{
Assert.AreEqual((string)lease.id, (string)lease.partitionKey,
"Deterministic mode: lease partition key must equal lease id for dedup invariant.");
}
else
{
Assert.AreNotEqual((string)lease.id, (string)lease.partitionKey,
"Legacy mode: lease partition key should be a random GUID, not the lease id.");
Assert.IsTrue(System.Guid.TryParse((string)lease.partitionKey, out _),
"Legacy mode: partition key should be a valid GUID.");
}

Assert.IsNotNull(lease.LeaseToken);
Assert.IsNull(lease.PartitionId);
}

Assert.IsNotNull(lease.LeaseToken);
Assert.IsNull(lease.PartitionId);
}
}

await processor.StopAsync();
await processor.StopAsync();
Comment thread
kirankumarkolli marked this conversation as resolved.
}
finally
{
DocumentServiceLeaseManagerCosmos.IsChangeFeedLeaseIdAsPartitionKeyEnabled =
ConfigurationManager.IsChangeFeedLeaseIdAsPartitionKeyEnabled();
}
}

public class TestClass
Expand Down
Loading
Loading