Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
{
private const string HedgeContext = "Hedge Context";
private const string HedgeConfig = "Hedge Config";
private const string ResponseRegion = "Response Region";

/// <summary>
/// Latency threshold which activates the first region hedging
Expand Down Expand Up @@ -206,6 +207,10 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions.Take(requestNumber + 1));
// Note that the target region can be seperate than the actual region that serviced the request depending on the scenario
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.TargetRegionName);
return hedgeResponse.ResponseMessage;
}
}
Expand Down Expand Up @@ -234,6 +239,9 @@ internal override async Task<ResponseMessage> ExecuteAvailabilityStrategyAsync(
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
ResponseRegion,
hedgeResponse.TargetRegionName);
return hedgeResponse.ResponseMessage;
}
}
Expand Down Expand Up @@ -278,6 +286,7 @@ private async Task<HedgingResponse> CloneAndSendAsync(
return await this.RequestSenderAndResultCheckAsync(
sender,
clonedRequest,
hedgeRegions.ElementAt(requestNumber),
cancellationToken,
cancellationTokenSource,
trace);
Expand All @@ -287,6 +296,7 @@ private async Task<HedgingResponse> CloneAndSendAsync(
private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
Func<RequestMessage, CancellationToken, Task<ResponseMessage>> sender,
RequestMessage request,
string targetRegionName,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource,
ITrace trace)
Expand All @@ -301,10 +311,10 @@ private async Task<HedgingResponse> RequestSenderAndResultCheckAsync(
cancellationTokenSource.Cancel();
}

return new HedgingResponse(true, response);
return new HedgingResponse(true, response, targetRegionName);
}

return new HedgingResponse(false, response);
return new HedgingResponse(false, response, targetRegionName);
}
catch (OperationCanceledException oce) when (cancellationTokenSource.IsCancellationRequested)
{
Expand Down Expand Up @@ -346,11 +356,13 @@ private sealed class HedgingResponse
{
public readonly bool IsNonTransient;
public readonly ResponseMessage ResponseMessage;
public readonly string TargetRegionName;

public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage)
public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string targetRegionName)
{
this.IsNonTransient = isNonTransient;
this.ResponseMessage = responseMessage;
this.TargetRegionName = targetRegionName;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,71 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred
}
}

[TestMethod]
[DataRow(false, DisplayName = "ValidateAvailabilityStrategyNoTriggerTest with preferred regions.")]
[DataRow(true, DisplayName = "ValidateAvailabilityStrategyNoTriggerTest w/o preferred regions.")]
[TestCategory("MultiRegion")]
public async Task AvailabilityStrategyResponseRegionDiagnosticsTest(bool isPreferredLocationsEmpty)
{
FaultInjectionRule responseDelay = new FaultInjectionRuleBuilder(
id: "responseDely",
condition:
new FaultInjectionConditionBuilder()
.WithRegion(region1)
.WithOperationType(FaultInjectionOperationType.ReadItem)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.ResponseDelay)
.WithDelay(TimeSpan.FromMilliseconds(4000))
.Build())
.WithDuration(TimeSpan.FromMinutes(90))
.Build();

List<FaultInjectionRule> rules = new List<FaultInjectionRule>() { responseDelay };
FaultInjector faultInjector = new FaultInjector(rules);

responseDelay.Disable();

CosmosClientOptions clientOptions = new CosmosClientOptions()
{
ConnectionMode = ConnectionMode.Direct,
ApplicationPreferredRegions = isPreferredLocationsEmpty ? new List<string>() : new List<string>() { region1, region2 },
Serializer = this.cosmosSystemTextJsonSerializer
};

using (CosmosClient faultInjectionClient = new CosmosClient(
connectionString: this.connectionString,
clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)))
{
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);

//warm up connections read
ItemResponse<CosmosIntegrationTestObject> _ = await container.ReadItemAsync<CosmosIntegrationTestObject>("testId", new PartitionKey("pk"));

responseDelay.Enable();

ItemRequestOptions requestOptions = new ItemRequestOptions
{
AvailabilityStrategy = new CrossRegionHedgingAvailabilityStrategy(
threshold: TimeSpan.FromMilliseconds(100),
thresholdStep: TimeSpan.FromMilliseconds(50))
};
ItemResponse<CosmosIntegrationTestObject> ir = await container.ReadItemAsync<CosmosIntegrationTestObject>(
"testId",
new PartitionKey("pk"),
requestOptions);

CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
Assert.IsTrue(traceDiagnostic.ToString()
.Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\""));
traceDiagnostic.Value.Data.TryGetValue("Response Region", out object responseRegionObj);
Assert.IsNotNull(responseRegionObj);
Assert.AreEqual(region2, responseRegionObj as string);
}
}

[TestMethod]
[DataRow(false, DisplayName = "ValidateAvailabilityStrategyNoTriggerTest with preferred regions.")]
[DataRow(true, DisplayName = "ValidateAvailabilityStrategyNoTriggerTest w/o preferred regions.")]
Expand Down
Loading