Skip to content

Commit

Permalink
Code changes to fix concurrency issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
kundadebdatta committed Aug 8, 2023
1 parent 5545f81 commit bf1b812
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 30 deletions.
1 change: 0 additions & 1 deletion Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,6 @@ private IEnumerable<TransportAddressUri> GetAddressesNeededToValidateStatus(
.Where(address => address
.GetCurrentHealthState()
.GetHealthStatus() is
TransportAddressHealthState.HealthStatus.Unknown or
TransportAddressHealthState.HealthStatus.UnhealthyPending);
}

Expand Down
18 changes: 3 additions & 15 deletions Microsoft.Azure.Cosmos/src/direct/AddressEnumerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,22 +243,18 @@ private static IEnumerable<TransportAddressUri> ReorderAddressesWhenReplicaValid
IEnumerable<TransportAddressUri> addresses,
HashSet<TransportAddressUri> failedReplicasPerRequest)
{
List<TransportAddressUri> unknownReplicas = null, failedReplicas = null, pendingReplicas = null;
List<TransportAddressUri> failedReplicas = null, pendingReplicas = null;
foreach (TransportAddressUri transportAddressUri in addresses)
{
TransportAddressHealthState.HealthStatus status = AddressEnumerator.GetEffectiveStatus(
addressUri: transportAddressUri,
failedEndpoints: failedReplicasPerRequest);

if (status == TransportAddressHealthState.HealthStatus.Connected)
if (status == TransportAddressHealthState.HealthStatus.Connected
|| status == TransportAddressHealthState.HealthStatus.Unknown)
{
yield return transportAddressUri;
}
else if (status == TransportAddressHealthState.HealthStatus.Unknown)
{
unknownReplicas ??= new ();
unknownReplicas.Add(transportAddressUri);
}
else if (status == TransportAddressHealthState.HealthStatus.UnhealthyPending)
{
pendingReplicas ??= new ();
Expand All @@ -271,14 +267,6 @@ private static IEnumerable<TransportAddressUri> ReorderAddressesWhenReplicaValid
}
}

if (unknownReplicas != null)
{
foreach (TransportAddressUri transportAddressUri in unknownReplicas)
{
yield return transportAddressUri;
}
}

if (pendingReplicas != null)
{
foreach (TransportAddressUri transportAddressUri in pendingReplicas)
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Cosmos/src/direct/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ private static void HandleTaskTimeout(Task runawayTask, Guid activityId, Guid co
public void SetHealthState(
bool isHealthy)
{
// Do Nothing.
// No implementation is required since the channel health state is managed
// using the State enumeration.
}

private enum State
Expand Down
22 changes: 18 additions & 4 deletions Microsoft.Azure.Cosmos/src/direct/ChannelDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,30 @@ namespace Microsoft.Azure.Documents.Rntbd
// one load-balanced channel per back-end server.
internal sealed class ChannelDictionary : IDisposable
{
private readonly LoadBalancingPartition singleLoadBalancedPartitionForTest;
private readonly ChannelProperties channelProperties;
private bool disposed = false;

private ConcurrentDictionary<ServerKey, IChannel> channels =
new ConcurrentDictionary<ServerKey, IChannel>();

internal ChannelDictionary(
LoadBalancingPartition singleLoadBalancedPartitionForTest,
ChannelProperties channelProperties) : this(channelProperties)
{
this.singleLoadBalancedPartitionForTest = singleLoadBalancedPartitionForTest;
}

public ChannelDictionary(ChannelProperties channelProperties)
{
Debug.Assert(channelProperties != null);
this.channelProperties = channelProperties;
}

public IChannel GetChannel(Uri requestUri, bool localRegionRequest)
public IChannel GetChannel(
Uri requestUri,
bool localRegionRequest,
bool validationRequired = false)
{
this.ThrowIfDisposed();
ServerKey key = new ServerKey(requestUri);
Expand All @@ -37,7 +48,9 @@ public IChannel GetChannel(Uri requestUri, bool localRegionRequest)
value = new LoadBalancingChannel(
new Uri(requestUri.GetLeftPart(UriPartial.Authority)),
this.channelProperties,
localRegionRequest);
localRegionRequest,
validationRequired,
this.singleLoadBalancedPartitionForTest);
if (this.channels.TryAdd(key, value))
{
return value;
Expand All @@ -62,8 +75,9 @@ public Task OpenChannelAsync(
{
this.ThrowIfDisposed();
IChannel channel = this.GetChannel(
physicalAddress,
localRegionRequest);
requestUri: physicalAddress,
localRegionRequest: localRegionRequest,
validationRequired: true);

return channel.Healthy
? Task.FromResult(0)
Expand Down
42 changes: 35 additions & 7 deletions Microsoft.Azure.Cosmos/src/direct/LoadBalancingChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Documents.Rntbd
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

// LoadBalancingChannel encapsulates the management of channels that connect to a single
Expand All @@ -22,12 +23,19 @@ internal sealed class LoadBalancingChannel : IChannel, IDisposable

private bool disposed = false;

private readonly ReaderWriterLockSlim healthStateLock = new(LockRecursionPolicy.NoRecursion);

private volatile bool healthy;

public LoadBalancingChannel(Uri serverUri, ChannelProperties channelProperties, bool localRegionRequest)
public LoadBalancingChannel(
Uri serverUri,
ChannelProperties channelProperties,
bool localRegionRequest,
bool validationRequired = false,
LoadBalancingPartition singleLoadBalancedPartitionForTest = null)
{
this.serverUri = serverUri;
this.healthy = false;
this.SetHealthState(!validationRequired);

if ((channelProperties.PartitionCount < 1) ||
(channelProperties.PartitionCount > 8))
Expand Down Expand Up @@ -75,7 +83,7 @@ public LoadBalancingChannel(Uri serverUri, ChannelProperties channelProperties,
else
{
Debug.Assert(channelProperties.PartitionCount == 1);
this.singlePartition = new LoadBalancingPartition(
this.singlePartition = singleLoadBalancedPartitionForTest ?? new LoadBalancingPartition(
serverUri, channelProperties, localRegionRequest);
}
}
Expand All @@ -85,15 +93,31 @@ public bool Healthy
get
{
this.ThrowIfDisposed();
return this.healthy;
this.healthStateLock.EnterReadLock();
try
{
return this.healthy;
}
finally
{
this.healthStateLock.ExitReadLock();
}
}
}

/// <inheritdoc/>
public void SetHealthState(
bool isHealthy)
{
this.healthy = isHealthy;
this.healthStateLock.EnterWriteLock();
try
{
this.healthy = isHealthy;
}
finally
{
this.healthStateLock.ExitWriteLock();
}
}

public Task<StoreResponse> RequestAsync(
Expand Down Expand Up @@ -162,11 +186,13 @@ private async Task OpenChannelToPartitionAsync(
try
{
await partition.OpenChannelAsync(activityId);
this.healthy = true;
this.SetHealthState(
isHealthy: true);
}
catch (Exception)
{
this.healthy = false;
this.SetHealthState(
isHealthy: false);
throw;
}
}
Expand Down Expand Up @@ -208,6 +234,8 @@ void IDisposable.Dispose()
this.partitions[i].Dispose();
}
}

this.healthStateLock.Dispose();
}

private void ThrowIfDisposed()
Expand Down
2 changes: 0 additions & 2 deletions Microsoft.Azure.Cosmos/src/direct/rntbd2/TransportClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ internal override async Task<StoreResponse> InvokeStoreAsync(
resourceOperation, activityId, transportRequestStats);
transportRequestStats.RecordState(TransportRequestStats.RequestStage.Completed);
storeResponse.TransportRequestStats = transportRequestStats;
channel?.SetHealthState(
isHealthy: true);
}
catch (TransportException ex)
{
Expand Down

0 comments on commit bf1b812

Please sign in to comment.