Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a229099
test to prove no exception, no documents when change feed more is cha…
philipthomas-MSFT Feb 6, 2024
59e8811
renaming
philipthomas-MSFT Feb 6, 2024
a6519b9
Merge branch 'master' into users/philipthomas-MSFT/4308-change-feed-p…
philipthomas-MSFT Feb 27, 2024
465143d
throwing exception when changeFeedMode change on CFP with same lease …
philipthomas-MSFT Feb 29, 2024
b8001a6
removed PREVIEW tags under advisement. Well create a PR later
philipthomas-MSFT Mar 1, 2024
3af61fd
Merge branch 'master' into users/philipthomas-MSFT/4308-change-feed-p…
philipthomas-MSFT Mar 13, 2024
706453f
removed usings
philipthomas-MSFT Mar 13, 2024
c621e17
revert to old summary
philipthomas-MSFT Mar 13, 2024
5fa3dcd
defaulting change feed mode to latestversion
philipthomas-MSFT Mar 13, 2024
f21b8ed
change feed mode exception on switching and tests based on last recom…
philipthomas-MSFT Mar 20, 2024
15c0833
Merge branch 'master' into users/philipthomas-MSFT/4308-change-feed-p…
philipthomas-MSFT Mar 20, 2024
ffba025
more changes based on discussions
philipthomas-MSFT Mar 20, 2024
14aa99f
some refactoring
philipthomas-MSFT Mar 20, 2024
420385c
fixed some tests
philipthomas-MSFT Mar 20, 2024
d718bef
more changes based on recommendations
philipthomas-MSFT Mar 20, 2024
3a29c23
another refactor
philipthomas-MSFT Mar 20, 2024
e204c1b
change from CosmosException to ArgumentException
philipthomas-MSFT Mar 20, 2024
bf6c2f5
removed CosmosException from comments
philipthomas-MSFT Mar 20, 2024
d47e3ed
Merge branch 'master' into users/philipthomas-MSFT/4308-change-feed-p…
philipthomas-MSFT Mar 20, 2024
6fe813d
fixin tests to account for extra GetAllLeasesAsync call
philipthomas-MSFT Mar 21, 2024
2a03e73
some recommendation changes
philipthomas-MSFT Mar 21, 2024
63d7b40
Merge branch 'master' into users/philipthomas-MSFT/4308-change-feed-p…
philipthomas-MSFT Mar 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ private async Task InitializeLeaseStoreAsync(ITrace trace, CancellationToken can
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName);
instanceName: ChangeFeedEstimatorIterator.EstimatorDefaultHostName,
changeFeedMode: ChangeFeedMode.LatestVersion);

this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ private async Task InitializeLeaseStoreAsync()
{
string monitoredContainerAndDatabaseRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync(default);
string leasePrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredContainerAndDatabaseRid);
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName);
DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
.InitializeAsync(
monitoredContainer: this.monitoredContainer,
leaseContainer: this.leaseContainer,
leaseContainerPrefix: leasePrefix,
instanceName: ChangeFeedEstimatorRunner.EstimatorDefaultHostName,
changeFeedMode: ChangeFeedMode.LatestVersion)
.ConfigureAwait(false);

this.documentServiceLeaseContainer = documentServiceLeaseStoreManager.LeaseContainer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ public ChangeFeedProcessorBuilder WithInstanceName(string instanceName)
}

/// <summary>
/// Sets the mode for the change freed processor.
/// Sets the mode for the change feed processor.
/// </summary>
/// <param name="changeFeedMode"></param>
/// <returns>The instance of <see cref="ChangeFeedProcessorBuilder"/> to use.</returns>
internal ChangeFeedProcessorBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode)
{
this.changeFeedProcessorOptions.Mode = changeFeedMode;
this.changeFeedLeaseOptions.Mode = changeFeedMode;

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using global::Azure;
using Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping;
using Microsoft.Azure.Cosmos.ChangeFeed.Configuration;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.FeedProcessing;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.ChangeFeed.Utils;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedProcessorCore : ChangeFeedProcessor
{
Expand Down Expand Up @@ -75,23 +80,99 @@ public override async Task StopAsync()
private async Task InitializeAsync()
{
string containerRid = await this.monitoredContainer.GetCachedRIDAsync(
forceRefresh: false,
NoOpTrace.Singleton,
forceRefresh: false,
NoOpTrace.Singleton,
default);

string monitoredDatabaseAndContainerRid = await this.monitoredContainer.GetMonitoredDatabaseAndContainerRidAsync();
string leaseContainerPrefix = this.monitoredContainer.GetLeasePrefix(this.changeFeedLeaseOptions.LeasePrefix, monitoredDatabaseAndContainerRid);
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.monitoredContainer.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton);
if (this.documentServiceLeaseStoreManager == null)
{
this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder.InitializeAsync(this.monitoredContainer, this.leaseContainer, leaseContainerPrefix, this.instanceName).ConfigureAwait(false);
this.documentServiceLeaseStoreManager = await DocumentServiceLeaseStoreManagerBuilder
.InitializeAsync(
this.monitoredContainer,
this.leaseContainer,
leaseContainerPrefix,
this.instanceName,
changeFeedMode: this.changeFeedProcessorOptions.Mode)
.ConfigureAwait(false);
}

await this.ChangeFeedModeSwitchingCheckAsync().ConfigureAwait(false);

this.partitionManager = this.BuildPartitionManager(
containerRid,
containerRid,
partitionKeyRangeCache);
this.initialized = true;
}

/// <summary>
/// If the lease container's lease document is found, this method checks for lease
/// document's ChangeFeedMode and if the new ChangeFeedMode is different
/// from the current ChangeFeedMode, a CosmosException is thrown.
/// This is based on an issue located at <see href="https://github.com/Azure/azure-cosmos-dotnet-v3/issues/4308"/>.
/// </summary>
private async Task ChangeFeedModeSwitchingCheckAsync()
{
IReadOnlyList<DocumentServiceLease> documentServiceLeases = await this.documentServiceLeaseStoreManager
.LeaseContainer
.GetAllLeasesAsync()
.ConfigureAwait(false);

// No lease documents. Return.

if (documentServiceLeases.Count == 0)
{
return;
}

DocumentServiceLease documentServiceLease = documentServiceLeases.FirstOrDefault();

// No lease documents that match the Id.

if (documentServiceLease == default)
{
return;
}

// Mode attribute exists on lease document, but it is not set. legacy is always LatestVersion because
// AllVersionsAndDeletes does not exist. There should not be any legacy lease documents that are
// AllVersionsAndDeletes. If the ChangeFeedProcessor's mode is not legacy, a CosmosException should thrown.
// If the ChangeFeedProcessor mode is not the mode in the lease document, a CosmosException should be thrown.

bool shouldThrowException = string.IsNullOrEmpty(documentServiceLease.Mode)
? this.VerifyChangeFeedProcessorMode(
changeFeedMode: ChangeFeedMode.LatestVersion,
leaseChangeFeedMode: documentServiceLease.Mode,
normalizedProcessorChangeFeedMode: out string normalizedProcessorChangeFeedMode)
: this.VerifyChangeFeedProcessorMode(
changeFeedMode: this.changeFeedLeaseOptions.Mode,
leaseChangeFeedMode: documentServiceLease.Mode,
normalizedProcessorChangeFeedMode: out normalizedProcessorChangeFeedMode);

// If shouldThrowException is true, throw the CosmosException.

if (shouldThrowException)
{
throw CosmosExceptionFactory.CreateBadRequestException(
message: $"Switching {nameof(ChangeFeedMode)} {documentServiceLease.Mode} to {normalizedProcessorChangeFeedMode} is not allowed.",
headers: default);
}
}

private bool VerifyChangeFeedProcessorMode(
ChangeFeedMode changeFeedMode,
string leaseChangeFeedMode,
out string normalizedProcessorChangeFeedMode)
{
normalizedProcessorChangeFeedMode = changeFeedMode == ChangeFeedMode.AllVersionsAndDeletes
? HttpConstants.A_IMHeaderValues.FullFidelityFeed
: HttpConstants.A_IMHeaderValues.IncrementalFeed;

return string.Compare(leaseChangeFeedMode, normalizedProcessorChangeFeedMode, StringComparison.OrdinalIgnoreCase) != 0;
}

private PartitionManager BuildPartitionManager(
string containerRid,
Routing.PartitionKeyRangeCache partitionKeyRangeCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,10 @@ public ChangeFeedLeaseOptions()
/// instances pointing at the same feed while using the same auxiliary collection.
/// </summary>
public string LeasePrefix { get; set; }

/// <summary>
/// Gets or sets the <see cref="ChangeFeedMode"/>.
/// </summary>
public ChangeFeedMode Mode { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,10 @@ internal abstract class DocumentServiceLease
/// Gets or sets custom lease properties which can be managed from <see cref="LoadBalancingStrategy"/>.
/// </summary>
public abstract Dictionary<string, string> Properties { get; set; }

/// <summary>
/// Gets or sets the ChangeFeedMode.
/// </summary>
public abstract string Mode { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public override DateTime Timestamp
[JsonProperty("_ts")]
private long TS { get; set; }

[JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
public override string Mode { get; set; }

public override string ToString()
{
return string.Format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public override DateTime Timestamp
[JsonProperty("properties")]
public override Dictionary<string, string> Properties { get; set; } = new Dictionary<string, string>();

[JsonProperty("Mode", NullValueHandling = NullValueHandling.Ignore)]
public override string Mode { get; set; }

[JsonProperty("timestamp")]
private DateTime? ExplicitTimestamp { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
LeaseId = leaseDocId,
LeaseToken = leaseToken,
ContinuationToken = continuationToken,
FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange())
FeedRange = new FeedRangeEpk(partitionKeyRange.ToRange()),
Mode = this.GetChangeFeedMode()
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());
Expand All @@ -148,14 +149,22 @@ public override Task<DocumentServiceLease> CreateLeaseIfNotExistAsync(
LeaseId = leaseDocId,
LeaseToken = leaseToken,
ContinuationToken = continuationToken,
FeedRange = feedRange
FeedRange = feedRange,
Mode = this.GetChangeFeedMode()
};

this.requestOptionsFactory.AddPartitionKeyIfNeeded((string pk) => documentServiceLease.LeasePartitionKey = pk, Guid.NewGuid().ToString());

return this.TryCreateDocumentServiceLeaseAsync(documentServiceLease);
}

private string GetChangeFeedMode()
{
return this.options.Mode == ChangeFeedMode.AllVersionsAndDeletes
? HttpConstants.A_IMHeaderValues.FullFidelityFeed
: HttpConstants.A_IMHeaderValues.IncrementalFeed;
}

public override async Task ReleaseAsync(DocumentServiceLease lease)
{
if (lease == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
ContainerInternal monitoredContainer,
ContainerInternal leaseContainer,
string leaseContainerPrefix,
string instanceName)
string instanceName,
ChangeFeedMode changeFeedMode)
{
ContainerProperties containerProperties = await leaseContainer.GetCachedContainerPropertiesAsync(forceRefresh: false, NoOpTrace.Singleton, cancellationToken: default);

Expand Down Expand Up @@ -58,7 +59,8 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(
.WithMonitoredContainer(monitoredContainer)
.WithLeaseContainer(leaseContainer)
.WithRequestOptionsFactory(requestOptionsFactory)
.WithHostName(instanceName);
.WithHostName(instanceName)
.WithChangeFeedMode(changeFeedMode);

return leaseStoreManagerBuilder.Build();
}
Expand All @@ -70,7 +72,7 @@ public static async Task<DocumentServiceLeaseStoreManager> InitializeAsync(

private DocumentServiceLeaseStoreManagerBuilder WithMonitoredContainer(ContainerInternal monitoredContainer)
{
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(leaseContainer));
this.monitoredContainer = monitoredContainer ?? throw new ArgumentNullException(nameof(monitoredContainer));
return this;
}

Expand Down Expand Up @@ -98,6 +100,12 @@ private DocumentServiceLeaseStoreManagerBuilder WithHostName(string hostName)
return this;
}

private DocumentServiceLeaseStoreManagerBuilder WithChangeFeedMode(ChangeFeedMode changeFeedMode)
{
this.options.Mode = changeFeedMode ?? throw new ArgumentNullException(nameof(changeFeedMode));
return this;
}

private DocumentServiceLeaseStoreManager Build()
{
if (this.monitoredContainer == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ internal string GetPartitionLeasePrefix()
{
return this.ContainerNamePrefix + PartitionLeasePrefixSeparator;
}

internal ChangeFeedMode Mode { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,6 @@ public abstract Task<ResponseMessage> DeleteAllItemsByPartitionKeyStreamAsync(
public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
FeedRange feedRange,
CancellationToken cancellationToken = default);

#endif
}
}
Loading