Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,9 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
(Cosmos.ConsistencyLevel)this.accountServiceConfiguration.DefaultConsistencyLevel,
this.eventSource,
this.serializerSettings,
this.httpClient);
this.httpClient,
this.PartitionKeyRangeLocation,
isPartitionLevelFailoverEnabled: this.ConnectionPolicy.EnablePartitionLevelFailover || this.ConnectionPolicy.EnablePartitionLevelCircuitBreaker);

this.GatewayStoreModel = gatewayStoreModel;

Expand Down
9 changes: 7 additions & 2 deletions Microsoft.Azure.Cosmos/src/GatewayStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace Microsoft.Azure.Cosmos

internal class GatewayStoreClient : TransportClient
{
private readonly bool isPartitionLevelFailoverEnabled;
private readonly ICommunicationEventSource eventSource;
protected readonly CosmosHttpClient httpClient;
protected readonly JsonSerializerSettings SerializerSettings;
Expand All @@ -31,11 +32,13 @@ internal class GatewayStoreClient : TransportClient
public GatewayStoreClient(
CosmosHttpClient httpClient,
ICommunicationEventSource eventSource,
JsonSerializerSettings serializerSettings = null)
JsonSerializerSettings serializerSettings = null,
bool isPartitionLevelFailoverEnabled = false)
{
this.httpClient = httpClient;
this.SerializerSettings = serializerSettings;
this.eventSource = eventSource;
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
}

public async Task<DocumentServiceResponse> InvokeAsync(
Expand Down Expand Up @@ -375,7 +378,9 @@ private Task<HttpResponseMessage> InvokeClientAsync(
return this.httpClient.SendHttpAsync(
() => this.PrepareRequestMessageAsync(request, physicalAddress),
resourceType,
HttpTimeoutPolicy.GetTimeoutPolicy(request),
HttpTimeoutPolicy.GetTimeoutPolicy(
request,
this.isPartitionLevelFailoverEnabled),
request.RequestContext.ClientRequestStatistics,
cancellationToken,
request);
Expand Down
58 changes: 54 additions & 4 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ namespace Microsoft.Azure.Cosmos
// Marking it as non-sealed in order to unit test it using Moq framework
internal class GatewayStoreModel : IStoreModelExtension, IDisposable
{
private readonly bool isPartitionLevelFailoverEnabled;
private static readonly string sessionConsistencyAsString = ConsistencyLevel.Session.ToString();
private readonly GlobalPartitionEndpointManager globalPartitionEndpointManager;

internal readonly GlobalEndpointManager endpointManager;
private readonly DocumentClientEventSource eventSource;
Expand All @@ -43,17 +45,24 @@ public GatewayStoreModel(
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
JsonSerializerSettings serializerSettings,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
bool isPartitionLevelFailoverEnabled = false)
{
this.isPartitionLevelFailoverEnabled = isPartitionLevelFailoverEnabled;
this.endpointManager = endpointManager;
this.sessionContainer = sessionContainer;
this.defaultConsistencyLevel = defaultConsistencyLevel;
this.eventSource = eventSource;

this.globalPartitionEndpointManager = globalPartitionEndpointManager;
this.gatewayStoreClient = new GatewayStoreClient(
httpClient,
this.eventSource,
serializerSettings);
serializerSettings,
isPartitionLevelFailoverEnabled);

this.globalPartitionEndpointManager.SetBackgroundConnectionPeriodicRefreshTask(
this.MarkEndpointsToHealthyAsync);
}

public virtual async Task<DocumentServiceResponse> ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default)
Expand All @@ -69,12 +78,29 @@ await GatewayStoreModel.ApplySessionTokenAsync(
DocumentServiceResponse response;
try
{
Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request);
// Collect region name only for document resources
if (request.ResourceType.Equals(ResourceType.Document) && this.endpointManager.TryGetLocationForGatewayDiagnostics(request.RequestContext.LocationEndpointToRoute, out string regionName))
{
request.RequestContext.RegionName = regionName;
}

// This is applicable for both per partition automatic failover and per partition circuit breaker.
if (this.isPartitionLevelFailoverEnabled
&& !ReplicatedResourceClient.IsMasterResource(request.ResourceType)
&& request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(
request: request,
sessionContainer: this.sessionContainer,
partitionKeyRangeCache: this.partitionKeyRangeCache,
clientCollectionCache: this.clientCollectionCache,
refreshCache: false);

request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
this.globalPartitionEndpointManager.TryAddPartitionLevelLocationOverride(request);
}

Uri physicalAddress = GatewayStoreClient.IsFeedRequest(request.OperationType) ? this.GetFeedUri(request) : this.GetEntityUri(request);
response = await this.gatewayStoreClient.InvokeAsync(request, request.ResourceType, physicalAddress, cancellationToken);
}
catch (DocumentClientException exception)
Expand Down Expand Up @@ -422,6 +448,30 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
return new Tuple<bool, PartitionKeyRange>(true, partitonKeyRange);
}

/// <summary>
Comment thread
kundadebdatta marked this conversation as resolved.
/// Attempts to mark the unhealthy endpoints for a faulty partition to healthy state, un-deterministically. This is done
/// specifically for the gateway mode to get the faulty partition failed back to the original location.
/// </summary>
/// <param name="pkRangeUriMappings">A dictionary mapping partition key ranges to their corresponding collection resource ID, original failed location, and health status.</param>
public Task MarkEndpointsToHealthyAsync(
Dictionary<PartitionKeyRange, Tuple<string, Uri, TransportAddressHealthState.HealthStatus>> pkRangeUriMappings)
{
foreach (PartitionKeyRange pkRange in pkRangeUriMappings?.Keys)
{
string collectionRid = pkRangeUriMappings[pkRange].Item1;
Uri originalFailedLocation = pkRangeUriMappings[pkRange].Item2;

DefaultTrace.TraceVerbose("Un-deterministically marking the original failed endpoint: {0}, for the PkRange: {1}, collectionRid: {2} back to healthy.",
originalFailedLocation,
pkRange.Id,
collectionRid);

pkRangeUriMappings[pkRange] = new Tuple<string, Uri, TransportAddressHealthState.HealthStatus>(collectionRid, originalFailedLocation, TransportAddressHealthState.HealthStatus.Connected);
}

return Task.CompletedTask;
}

// DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync
internal static bool IsMasterOperation(
ResourceType resourceType,
Expand Down
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ internal abstract class HttpTimeoutPolicy
public virtual bool ShouldThrow503OnTimeout => false;

public static HttpTimeoutPolicy GetTimeoutPolicy(
DocumentServiceRequest documentServiceRequest)
DocumentServiceRequest documentServiceRequest,
bool isPartitionLevelFailoverEnabled = false)
{
//Query Plan Requests
if (documentServiceRequest.ResourceType == ResourceType.Document
Expand All @@ -45,7 +46,9 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
//Data Plane Read
if (!HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest)
{
return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
return isPartitionLevelFailoverEnabled
? HttpTimeoutPolicyForPartitionFailover.InstanceShouldThrow503OnTimeout
: HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
}

//Meta Data Read
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Net.Http;

internal sealed class HttpTimeoutPolicyForPartitionFailover : HttpTimeoutPolicy
{
public static readonly HttpTimeoutPolicy Instance = new HttpTimeoutPolicyForPartitionFailover(false);
public static readonly HttpTimeoutPolicy InstanceShouldThrow503OnTimeout = new HttpTimeoutPolicyForPartitionFailover(true);
public bool shouldThrow503OnTimeout;
private static readonly string Name = nameof(HttpTimeoutPolicyDefault);

private HttpTimeoutPolicyForPartitionFailover(bool shouldThrow503OnTimeout)
{
this.shouldThrow503OnTimeout = shouldThrow503OnTimeout;
}

private readonly IReadOnlyList<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> TimeoutsAndDelays = new List<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)>()
{
(TimeSpan.FromSeconds(.5), TimeSpan.Zero),
(TimeSpan.FromSeconds(.5), TimeSpan.Zero),
(TimeSpan.FromSeconds(1), TimeSpan.Zero),
};

public override string TimeoutPolicyName => HttpTimeoutPolicyForPartitionFailover.Name;

public override int TotalRetryCount => this.TimeoutsAndDelays.Count;

public override IEnumerator<(TimeSpan requestTimeout, TimeSpan delayForNextRequest)> GetTimeoutEnumerator()
{
return this.TimeoutsAndDelays.GetEnumerator();
}

// Assume that it is not safe to retry unless it is a get method.
// Create and other operations could have succeeded even though a timeout occurred.
public override bool IsSafeToRetry(HttpMethod httpMethod)
{
return httpMethod == HttpMethod.Get;
}

public override bool ShouldRetryBasedOnResponse(HttpMethod requestHttpMethod, HttpResponseMessage responseMessage)
{
return false;
}

public override bool ShouldThrow503OnTimeout => this.shouldThrow503OnTimeout;
}
}
4 changes: 3 additions & 1 deletion Microsoft.Azure.Cosmos/src/ThinClientStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal class ThinClientStoreModel : GatewayStoreModel

public ThinClientStoreModel(
GlobalEndpointManager endpointManager,
GlobalPartitionEndpointManager globalPartitionEndpointManager,
ISessionContainer sessionContainer,
ConsistencyLevel defaultConsistencyLevel,
DocumentClientEventSource eventSource,
Expand All @@ -33,7 +34,8 @@ public ThinClientStoreModel(
defaultConsistencyLevel,
eventSource,
serializerSettings,
httpClient)
httpClient,
globalPartitionEndpointManager)
{
this.thinClientStoreClient = new ThinClientStoreClient(
httpClient,
Expand Down
Loading