diff --git a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs
index 01d7a513fa..6a1d7c0910 100644
--- a/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs
+++ b/Microsoft.Azure.Cosmos/src/Routing/AvailabilityStrategy/CrossRegionHedgingAvailabilityStrategy.cs
@@ -24,7 +24,7 @@ namespace Microsoft.Azure.Cosmos
internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInternal
{
private const string HedgeContext = "Hedge Context";
- private const string ResponseRegion = "Response Region";
+ private const string HedgeConfig = "Hedge Config";
///
/// Latency threshold which activates the first region hedging
@@ -44,6 +44,8 @@ internal class CrossRegionHedgingAvailabilityStrategy : AvailabilityStrategyInte
///
public bool EnableMultiWriteRegionHedge { get; private set; }
+ private readonly string HedgeConfigText;
+
///
/// Constructor for hedging availability strategy
///
@@ -68,6 +70,8 @@ public CrossRegionHedgingAvailabilityStrategy(
this.Threshold = threshold;
this.ThresholdStep = thresholdStep ?? TimeSpan.FromMilliseconds(-1);
this.EnableMultiWriteRegionHedge = enableMultiWriteRegionHedge;
+
+ this.HedgeConfigText = $"t:{this.Threshold.TotalMilliseconds}ms, s:{this.ThresholdStep.TotalMilliseconds}ms, w:{this.EnableMultiWriteRegionHedge}";
}
///
@@ -134,13 +138,12 @@ internal override async Task ExecuteAvailabilityStrategyAsync(
: await StreamExtension.AsClonableStreamAsync(request.Content)))
{
IReadOnlyCollection hedgeRegions = client.DocumentClient.GlobalEndpointManager
- .GetApplicableRegions(
- request.RequestOptions?.ExcludeRegions,
- OperationTypeExtensions.IsReadOperation(request.OperationType));
+ .GetApplicableRegions(
+ request.RequestOptions?.ExcludeRegions,
+ OperationTypeExtensions.IsReadOperation(request.OperationType));
List requestTasks = new List(hedgeRegions.Count + 1);
- Task primaryRequest = null;
HedgingResponse hedgeResponse = null;
//Send out hedged requests
@@ -153,33 +156,17 @@ internal override async Task ExecuteAvailabilityStrategyAsync(
CancellationToken timerToken = timerTokenSource.Token;
using (Task hedgeTimer = Task.Delay(awaitTime, timerToken))
{
- if (requestNumber == 0)
- {
- primaryRequest = this.RequestSenderAndResultCheckAsync(
- sender,
- request,
- hedgeRegions.ElementAt(requestNumber),
- cancellationToken,
- cancellationTokenSource,
- trace);
-
- requestTasks.Add(primaryRequest);
- }
- else
- {
- Task requestTask = this.CloneAndSendAsync(
- sender: sender,
- request: request,
- clonedBody: clonedBody,
- hedgeRegions: hedgeRegions,
- requestNumber: requestNumber,
- trace: trace,
- cancellationToken: cancellationToken,
- cancellationTokenSource: cancellationTokenSource);
-
- requestTasks.Add(requestTask);
- }
-
+ Task requestTask = this.CloneAndSendAsync(
+ sender: sender,
+ request: request,
+ clonedBody: clonedBody,
+ hedgeRegions: hedgeRegions,
+ requestNumber: requestNumber,
+ trace: trace,
+ cancellationToken: cancellationToken,
+ cancellationTokenSource: cancellationTokenSource);
+
+ requestTasks.Add(requestTask);
requestTasks.Add(hedgeTimer);
Task completedTask = await Task.WhenAny(requestTasks);
@@ -202,13 +189,14 @@ internal override async Task ExecuteAvailabilityStrategyAsync(
if (hedgeResponse.IsNonTransient)
{
cancellationTokenSource.Cancel();
+
+ ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
+ HedgeConfig,
+ this.HedgeConfigText);
//Take is not inclusive, so we need to add 1 to the request number which starts at 0
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions.Take(requestNumber + 1));
- ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
- ResponseRegion,
- hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
@@ -231,12 +219,12 @@ internal override async Task ExecuteAvailabilityStrategyAsync(
if (hedgeResponse.IsNonTransient || requestTasks.Count == 0)
{
cancellationTokenSource.Cancel();
+ ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
+ HedgeConfig,
+ this.HedgeConfigText);
((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
HedgeContext,
hedgeRegions);
- ((CosmosTraceDiagnostics)hedgeResponse.ResponseMessage.Diagnostics).Value.AddOrUpdateDatum(
- ResponseRegion,
- hedgeResponse.ResponseRegion);
return hedgeResponse.ResponseMessage;
}
}
@@ -270,15 +258,17 @@ private async Task CloneAndSendAsync(
{
clonedRequest.RequestOptions ??= new RequestOptions();
- List excludeRegions = new List(hedgeRegions);
- string region = excludeRegions[requestNumber];
- excludeRegions.RemoveAt(requestNumber);
- clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
+ //we do not want to exclude any regions for the primary request
+ if (requestNumber > 0)
+ {
+ List excludeRegions = new List(hedgeRegions);
+ excludeRegions.RemoveAt(requestNumber);
+ clonedRequest.RequestOptions.ExcludeRegions = excludeRegions;
+ }
return await this.RequestSenderAndResultCheckAsync(
sender,
clonedRequest,
- region,
cancellationToken,
cancellationTokenSource,
trace);
@@ -288,7 +278,6 @@ private async Task CloneAndSendAsync(
private async Task RequestSenderAndResultCheckAsync(
Func> sender,
RequestMessage request,
- string hedgedRegion,
CancellationToken cancellationToken,
CancellationTokenSource cancellationTokenSource,
ITrace trace)
@@ -303,12 +292,12 @@ private async Task RequestSenderAndResultCheckAsync(
cancellationTokenSource.Cancel();
}
- return new HedgingResponse(true, response, hedgedRegion);
+ return new HedgingResponse(true, response);
}
- return new HedgingResponse(false, response, hedgedRegion);
+ return new HedgingResponse(false, response);
}
- catch (OperationCanceledException oce ) when (cancellationTokenSource.IsCancellationRequested)
+ catch (OperationCanceledException oce) when (cancellationTokenSource.IsCancellationRequested)
{
throw new CosmosOperationCanceledException(oce, trace);
}
@@ -348,13 +337,11 @@ private sealed class HedgingResponse
{
public readonly bool IsNonTransient;
public readonly ResponseMessage ResponseMessage;
- public readonly string ResponseRegion;
- public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage, string responseRegion)
+ public HedgingResponse(bool isNonTransient, ResponseMessage responseMessage)
{
this.IsNonTransient = isNonTransient;
this.ResponseMessage = responseMessage;
- this.ResponseRegion = responseRegion;
}
}
}
diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs
index c95073aa69..b3b6b7aaaa 100644
--- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs
+++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosAvailabilityStrategyTests.cs
@@ -255,35 +255,27 @@ public async Task AvailabilityStrategyNoTriggerTest(bool isPreferredLocationsEmp
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
responseDelay.Enable();
ItemResponse ir = await container.ReadItemAsync("testId", new PartitionKey("pk"));
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object responseRegion);
- Assert.IsNotNull(responseRegion);
- Assert.AreEqual(region1, (string)responseRegion);
-
- //Should send out hedge request but original should be returned
- traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- IReadOnlyCollection hedgeContextList;
- hedgeContextList = hedgeContext as IReadOnlyCollection;
if (isPreferredLocationsEmpty)
{
- Assert.AreEqual(3, hedgeContextList.Count);
- Assert.IsTrue(hedgeContextList.Contains(region1));
- Assert.IsTrue(hedgeContextList.Contains(region2));
- Assert.IsTrue(hedgeContextList.Contains(region3));
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
}
else
{
- Assert.AreEqual(2, hedgeContextList.Count);
- Assert.IsTrue(hedgeContextList.Contains(region1));
- Assert.IsTrue(hedgeContextList.Contains(region2));
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
}
- };
+ }
+ ;
}
[TestMethod]
@@ -325,6 +317,9 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
responseDelay.Enable();
ItemRequestOptions requestOptions = new ItemRequestOptions
@@ -340,9 +335,8 @@ public async Task AvailabilityStrategyRequestOptionsTriggerTest(bool isPreferred
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\""));
}
}
@@ -389,6 +383,9 @@ public async Task AvailabilityStrategyDisableOverideTest(bool isPreferredLocatio
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
responseDelay.Enable();
ItemRequestOptions requestOptions = new ItemRequestOptions
{
@@ -403,7 +400,7 @@ public async Task AvailabilityStrategyDisableOverideTest(bool isPreferredLocatio
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- Assert.IsFalse(traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out _));
+ Assert.IsFalse(traceDiagnostic.Value.Data.TryGetValue("Hedge Context", out object _));
}
}
@@ -533,8 +530,10 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
CosmosTraceDiagnostics traceDiagnostic;
- object hedgeContext;
switch (operation)
{
@@ -556,9 +555,9 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
Assert.IsTrue(rule.GetHitCount() > 0);
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
+
break;
@@ -588,9 +587,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
Assert.IsTrue(rule.GetHitCount() > 0);
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
}
break;
@@ -619,9 +617,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
Assert.IsTrue(rule.GetHitCount() > 0);
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
}
break;
@@ -649,9 +646,8 @@ public async Task AvailabilityStrategyAllFaultsTests(string operation, string co
Assert.IsTrue(rule.GetHitCount() > 0);
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
break;
@@ -750,8 +746,10 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
CosmosTraceDiagnostics traceDiagnostic;
- object hedgeContext;
switch (operation)
{
@@ -765,9 +763,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
break;
@@ -792,9 +789,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
}
break;
@@ -813,9 +809,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
traceDiagnostic = feedResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
}
break;
@@ -835,9 +830,8 @@ public async Task AvailabilityStrategyStepTests(string operation, string condito
traceDiagnostic = readManyResponse.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
break;
@@ -919,6 +913,9 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeTest()
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
sendDelay.Enable();
ItemRequestOptions requestOptions = new ItemRequestOptions
@@ -944,9 +941,8 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeTest()
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
}
}
@@ -987,6 +983,9 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest()
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
responseDelay.Enable();
ItemRequestOptions requestOptions = new ItemRequestOptions
@@ -1016,9 +1015,8 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterTest()
CosmosTraceDiagnostics traceDiagnostic = ex.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
}
finally
{
@@ -1079,7 +1077,8 @@ public async Task AvailabilityStrategyMultiMasterWriteBeforeStepTest()
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
-
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
ItemRequestOptions requestOptions = new ItemRequestOptions
{
@@ -1119,9 +1118,8 @@ await this.container.DeleteItemAsync(
CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
}
}
@@ -1177,6 +1175,9 @@ public async Task AvailabilityStrategyMultiMasterWriteAfterStepTest()
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
ItemRequestOptions requestOptions = new ItemRequestOptions
{
AvailabilityStrategy = new CrossRegionHedgingAvailabilityStrategy(
@@ -1218,9 +1219,8 @@ await this.container.DeleteItemAsync(
CosmosTraceDiagnostics traceDiagnostic = ex.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreEqual(region3, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
}
finally
{
@@ -1274,14 +1274,113 @@ public async Task AvailabilityStrategyWithCancellationTokenThrowsExceptionTest()
Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+ //warm up connections read
+ ItemResponse _ = await container.ReadItemAsync("testId", new PartitionKey("pk"));
+
CosmosOperationCanceledException cancelledException = await Assert.ThrowsExceptionAsync(() =>
container.ReadItemAsync(
"testId",
new PartitionKey("pk"), cancellationToken: cts.Token
));
+ }
+ }
+
+ [TestMethod]
+ [TestCategory("MultiMaster")]
+ public async Task HedgingCancellationTokenHandling()
+ {
+ List feedRanges = (List)await this.container.GetFeedRangesAsync();
+ Assert.IsTrue(feedRanges.Any());
+ try
+ {
+ await this.container.DeleteItemAsync("deleteMe", new PartitionKey("MMWrite"));
}
+ catch (Exception) { }
+
+
+ FaultInjectionRule sendDelay = new FaultInjectionRuleBuilder(
+ id: "sendDelay",
+ condition:
+ new FaultInjectionConditionBuilder()
+ .WithRegion(region1)
+ .WithConnectionType(FaultInjectionConnectionType.Gateway)
+ .WithEndpoint(
+ new FaultInjectionEndpointBuilder(
+ MultiRegionSetupHelpers.dbName,
+ MultiRegionSetupHelpers.containerName,
+ feedRanges[0])
+ .WithIncludePrimary(true)
+ .WithReplicaCount(4)
+ .Build())
+ .Build(),
+ result:
+ FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.SendDelay)
+ .WithDelay(TimeSpan.FromMilliseconds(8000))
+ .Build())
+ .WithDuration(TimeSpan.FromMinutes(90))
+ .Build();
+
+ List rules = new List() { sendDelay };
+ FaultInjector faultInjector = new FaultInjector(rules);
+
+ sendDelay.Disable();
+
+ CosmosClientOptions clientOptions = new CosmosClientOptions()
+ {
+ ConnectionMode = ConnectionMode.Direct,
+ ApplicationPreferredRegions = new List() { region1, region2 },
+ Serializer = this.cosmosSystemTextJsonSerializer,
+ RequestTimeout = TimeSpan.FromMilliseconds(5000)
+ };
+
+ using (CosmosClient faultInjectionClient = new CosmosClient(
+ connectionString: this.connectionString,
+ clientOptions: faultInjector.GetFaultInjectionClientOptions(clientOptions)))
+ {
+ Database database = faultInjectionClient.GetDatabase(MultiRegionSetupHelpers.dbName);
+ Container container = database.GetContainer(MultiRegionSetupHelpers.containerName);
+
+ sendDelay.Enable();
+
+ CancellationTokenSource cts = new CancellationTokenSource();
+ cts.CancelAfter(TimeSpan.FromSeconds(5)); // Cancellation token expiry time is 5 seconds.
+
+ ItemRequestOptions requestOptions = new ItemRequestOptions
+ {
+ AvailabilityStrategy = new CrossRegionHedgingAvailabilityStrategy(
+ threshold: TimeSpan.FromMilliseconds(100),
+ thresholdStep: TimeSpan.FromMilliseconds(50),
+ enableMultiWriteRegionHedge: true)
+ };
+
+ CosmosIntegrationTestObject CosmosIntegrationTestObject = new CosmosIntegrationTestObject
+ {
+ Id = "deleteMe",
+ Pk = "MMWrite",
+ Other = "test"
+ };
+
+ try
+ {
+ ItemResponse ir = await container.CreateItemAsync(
+ CosmosIntegrationTestObject,
+ requestOptions: requestOptions,
+ cancellationToken: cts.Token);
+ CosmosTraceDiagnostics traceDiagnostic = ir.Diagnostics as CosmosTraceDiagnostics;
+ Assert.IsNotNull(traceDiagnostic);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
+ }
+ catch (CosmosException ex)
+ {
+ Assert.Fail(ex.Message);
+ }
+
+
+ sendDelay.Disable();
+ }
}
private static async Task HandleChangesAsync(
@@ -1296,9 +1395,8 @@ private static async Task HandleChangesAsync(
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreNotEqual(region1, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\"]"));
await Task.Delay(1);
}
@@ -1314,10 +1412,8 @@ private static async Task HandleChangesStepAsync(
CosmosTraceDiagnostics traceDiagnostic = context.Diagnostics as CosmosTraceDiagnostics;
Assert.IsNotNull(traceDiagnostic);
- traceDiagnostic.Value.Data.TryGetValue("Response Region", out object hedgeContext);
- Assert.IsNotNull(hedgeContext);
- Assert.AreNotEqual(region1, (string)hedgeContext);
- Assert.AreNotEqual(region2, (string)hedgeContext);
+ Assert.IsTrue(traceDiagnostic.ToString()
+ .Contains($"\"Hedge Context\":[\"{region1}\",\"{region2}\",\"{region3}\"]"));
await Task.Delay(1);
}
}