Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.47.0</ClientOfficialVersion>
<ClientPreviewVersion>3.48.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview.0</ClientPreviewSuffixVersion>
<DirectVersion>3.37.9</DirectVersion>
<DirectVersion>3.37.10</DirectVersion>
<FaultInjectionVersion>1.0.0</FaultInjectionVersion>
<FaultInjectionSuffixVersion>beta.0</FaultInjectionSuffixVersion>
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand All @@ -19,6 +20,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests
using System.Threading;
using System.Threading.Tasks;
using global::Azure;
using Microsoft.Azure.Cosmos.FaultInjection;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Services.Management.Tests.LinqProviderTests;
using Microsoft.Azure.Cosmos.Telemetry;
Expand Down Expand Up @@ -936,7 +938,136 @@ public async Task MultiRegionAccountTest()
AccountProperties properties = await cosmosClient.ReadAccountAsync();
Assert.IsNotNull(properties);
}


[TestMethod]
[Owner("amudumba")]
public async Task CreateItemDuringTimeoutTest()
{
//Prepare
//Enabling aggressive timeout detection that empowers connnection health checker whih marks a channel/connection as "unhealthy" if there are a set of consecutive timeouts.
Environment.SetEnvironmentVariable("AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED", "True");
Environment.SetEnvironmentVariable("AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS", "1");

// Enabling fault injection rule to simulate a timeout scenario.
string timeoutRuleId = "timeoutRule-" + Guid.NewGuid().ToString();
FaultInjectionRule timeoutRule = new FaultInjectionRuleBuilder(
id: timeoutRuleId,
condition:
new FaultInjectionConditionBuilder()
.WithOperationType(FaultInjectionOperationType.CreateItem)
.Build(),
result:
FaultInjectionResultBuilder.GetResultBuilder(FaultInjectionServerErrorType.SendDelay)
.WithDelay(TimeSpan.FromSeconds(100))
.Build())
.Build();

List<FaultInjectionRule> rules = new List<FaultInjectionRule> { timeoutRule };
FaultInjector faultInjector = new FaultInjector(rules);


CosmosClientOptions cosmosClientOptions = new CosmosClientOptions()
{
ConsistencyLevel = Cosmos.ConsistencyLevel.Session,
FaultInjector = faultInjector,
RequestTimeout = TimeSpan.FromSeconds(2)

};

Cosmos.Database db = null;
try
{
CosmosClient cosmosClient = TestCommon.CreateCosmosClient(clientOptions: cosmosClientOptions);

db = await cosmosClient.CreateDatabaseIfNotExistsAsync("TimeoutFaultTest");
Container container = await db.CreateContainerIfNotExistsAsync("TimeoutFaultContainer", "/pk");

// Act.
// Simulate a aggressive timeout scenario by performing 3 writes which will all timeout due to fault injection rule.
for (int i = 0; i < 3; i++)
{
try
{
ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity();
await container.CreateItemAsync<ToDoActivity>(testItem);
}
catch (CosmosException exx)
{
Assert.AreEqual(HttpStatusCode.RequestTimeout, exx.StatusCode);
}
}

//Assert that the old channel that is now made unhealthy by the timeouts and a new healthy channel is available for next requests.


// Get all the channels that are under TransportClient -> ChannelDictionary -> Channels.
IStoreClientFactory factory = (IStoreClientFactory)cosmosClient.DocumentClient.GetType()
.GetField("storeClientFactory", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(cosmosClient.DocumentClient);
StoreClientFactory storeClientFactory = (StoreClientFactory)factory;

TransportClient client = (TransportClient)storeClientFactory.GetType()
.GetField("transportClient", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(storeClientFactory);
Documents.Rntbd.TransportClient transportClient = (Documents.Rntbd.TransportClient)client;

Documents.Rntbd.ChannelDictionary channelDict = (Documents.Rntbd.ChannelDictionary)transportClient.GetType()
.GetField("channelDictionary", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(transportClient);
ConcurrentDictionary<Documents.Rntbd.ServerKey, Documents.Rntbd.IChannel> allChannels = (ConcurrentDictionary<Documents.Rntbd.ServerKey, Documents.Rntbd.IChannel>)channelDict.GetType()
.GetField("channels", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(channelDict);

//Assert that the old channel that is now made unhealthy by the timeouts.
//Get the channel by channelDict -> LoadBalancingChannel -> LoadBalancingPartition -> LbChannelState -> IChannel.
Documents.Rntbd.LoadBalancingChannel loadBalancingUnhealthyChannel = (Documents.Rntbd.LoadBalancingChannel)allChannels[allChannels.Keys.ElementAt(1)];
Documents.Rntbd.LoadBalancingPartition loadBalancingPartitionUnHealthy = (Documents.Rntbd.LoadBalancingPartition)loadBalancingUnhealthyChannel.GetType()
.GetField("singlePartition", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(loadBalancingUnhealthyChannel);

Assert.IsNotNull(loadBalancingPartitionUnHealthy);

List<Documents.Rntbd.LbChannelState> openChannelsUnhealthy = (List<Documents.Rntbd.LbChannelState>)loadBalancingPartitionUnHealthy.GetType()
.GetField("openChannels", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(loadBalancingPartitionUnHealthy);
Assert.AreEqual(1, openChannelsUnhealthy.Count);

foreach (Documents.Rntbd.LbChannelState channelState in openChannelsUnhealthy)
{
Documents.Rntbd.IChannel channel = (Documents.Rntbd.IChannel)openChannelsUnhealthy[0].GetType()
.GetField("channel", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(channelState);
Assert.IsFalse(channel.Healthy);
}

//Assert that the new channel which is healthy. Picking the first channel from the allChannels dictionary as the new channel.
Documents.Rntbd.LoadBalancingChannel loadBalancingChannel = (Documents.Rntbd.LoadBalancingChannel)allChannels[allChannels.Keys.First()];
Documents.Rntbd.LoadBalancingPartition loadBalancingPartition = (Documents.Rntbd.LoadBalancingPartition)loadBalancingChannel.GetType()
.GetField("singlePartition", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(loadBalancingChannel);

Assert.IsNotNull(loadBalancingPartition);

List<Documents.Rntbd.LbChannelState> openChannels = (List<Documents.Rntbd.LbChannelState>)loadBalancingPartition.GetType()
.GetField("openChannels", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(loadBalancingPartition);
Assert.AreEqual(1, openChannels.Count);

foreach (Documents.Rntbd.LbChannelState channelState in openChannels)
{
Documents.Rntbd.IChannel channel = (Documents.Rntbd.IChannel)openChannels[0].GetType()
.GetField("channel", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(channelState);
Assert.IsTrue(channel.Healthy);
}
}
finally
{
Environment.SetEnvironmentVariable("AZURE_COSMOS_AGGRESSIVE_TIMEOUT_DETECTION_ENABLED", null);
Environment.SetEnvironmentVariable("AZURE_COSMOS_TIMEOUT_DETECTION_TIME_LIMIT_IN_SECONDS", null);
if (db != null) await db.DeleteAsync();
}
}
public static IReadOnlyList<string> GetActiveConnections()
{
string testPid = Process.GetCurrentProcess().Id.ToString();
Expand Down
Loading