Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 78 additions & 8 deletions Microsoft.Azure.Cosmos/src/direct/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ namespace Microsoft.Azure.Documents.Rntbd
#endif

// The RNTBD RPC channel. Supports multiple parallel requests and timeouts.
internal sealed class Channel : IChannel, IDisposable
internal sealed class Channel : IChannel, IDisposable, IAsyncDisposable
{
private readonly Dispatcher dispatcher;
private readonly TimerPool timerPool;
private readonly int requestTimeoutSeconds;
private readonly Uri serverUri;
private readonly bool localRegionRequest;
private bool disposed = false;
private int disposed;

private readonly ReaderWriterLockSlim stateLock =
new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
Expand Down Expand Up @@ -79,7 +79,7 @@ public Channel(

public void InjectFaultInjectionConnectionError(TransportException transportException)
{
if (!this.disposed)
if (this.disposed == 0)
{
this.dispatcher.InjectFaultInjectionConnectionError(transportException);
}
Expand Down Expand Up @@ -293,11 +293,18 @@ public void Close()
((IDisposable) this).Dispose();
}

public Task CloseAsync() => this.DisposeAsync().AsTask();

// Keep in sync with DisposeAsync().
Comment thread
NaluTripician marked this conversation as resolved.
void IDisposable.Dispose()
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);
this.chaosInterceptor?.OnChannelDispose(this.ConnectionCorrelationId);
this.ThrowIfDisposed();
this.disposed = true;
DefaultTrace.TraceInformation("[RNTBD Channel {0}] Disposing RNTBD Channel {1}", this.ConnectionCorrelationId, this);

Task initTask = null;
Expand Down Expand Up @@ -335,8 +342,71 @@ void IDisposable.Dispose()
}
}
Debug.Assert(this.dispatcher != null);
this.dispatcher.Dispose();
this.stateLock.Dispose();
try
{
this.dispatcher.Dispose();
}
finally
{
this.stateLock.Dispose();
}
}

// Keep in sync with Dispose().
public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);
this.chaosInterceptor?.OnChannelDispose(this.ConnectionCorrelationId);
DefaultTrace.TraceInformation("[RNTBD Channel {0}] Async disposing RNTBD Channel {1}", this.ConnectionCorrelationId, this);

Task initTask = null;
this.stateLock.EnterWriteLock();
try
{
if (this.state != State.Closed)
{
initTask = this.initializationTask;
}
this.state = State.Closed;
}
finally
{
this.stateLock.ExitWriteLock();
}
if (initTask != null)
{
try
{
await initTask.ConfigureAwait(false);
}
catch (Exception e)
{
DefaultTrace.TraceWarning(
"[RNTBD Channel {0}] {1} initialization failed. Consuming the task " +
"exception in {2}. Server URI: {3}. Exception: {4}",
this.ConnectionCorrelationId,
nameof(Channel),
nameof(DisposeAsync),
this.serverUri,
e.Message);
// Intentionally swallowing the exception. The caller can't
// do anything useful with it.
}
}
Debug.Assert(this.dispatcher != null);
try
{
await this.dispatcher.DisposeAsync().ConfigureAwait(false);
}
finally
{
this.stateLock.Dispose();
}
}

#region Test hook.
Expand Down Expand Up @@ -364,7 +434,7 @@ internal bool TestIsIdle

private void ThrowIfDisposed()
{
if (this.disposed)
if (this.disposed != 0)
{
throw new ObjectDisposedException(nameof(Channel));
}
Expand Down
49 changes: 44 additions & 5 deletions Microsoft.Azure.Cosmos/src/direct/ChannelDictionary.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ namespace Microsoft.Azure.Documents.Rntbd
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents.FaultInjection;

// ChannelDictionary maps server keys to load-balanced channels. There is
// one load-balanced channel per back-end server.
internal sealed class ChannelDictionary : IChannelDictionary, IDisposable
internal sealed class ChannelDictionary : IChannelDictionary, IDisposable, IAsyncDisposable
{
private readonly ChannelProperties channelProperties;
private bool disposed = false;
private int disposed;

private ConcurrentDictionary<ServerKey, IChannel> channels =
new ConcurrentDictionary<ServerKey, IChannel>();
Expand Down Expand Up @@ -70,17 +74,52 @@ public bool TryGetChannel(Uri requestUri, out IChannel channel)

public void Dispose()
{
this.ThrowIfDisposed();
this.disposed = true;
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);
foreach (IChannel channel in this.channels.Values)
{
channel.Close();
}
}

public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);

List<Task> closeTasks = new List<Task>(this.channels.Count);
foreach (IChannel channel in this.channels.Values)
{
closeTasks.Add(channel.CloseAsync());
}

Task whenAllTask = Task.WhenAll(closeTasks);
try
{
await whenAllTask.ConfigureAwait(false);
}
catch (Exception)
{
foreach (Exception inner in whenAllTask.Exception.Flatten().InnerExceptions)
{
DefaultTrace.TraceWarning(
"[RNTBD ChannelDictionary] Async dispose encountered error during channel closure: {0}",
inner.Message);
}
}
}

private void ThrowIfDisposed()
{
if (this.disposed)
if (this.disposed != 0)
{
throw new ObjectDisposedException(nameof(ChannelDictionary));
}
Expand Down
88 changes: 79 additions & 9 deletions Microsoft.Azure.Cosmos/src/direct/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Documents.Rntbd

// Dispatcher encapsulates the state and logic needed to dispatch multiple requests through
// a single connection.
internal sealed class Dispatcher : IDisposable
internal sealed class Dispatcher : IDisposable, IAsyncDisposable
{
// Connection is thread-safe for sending.
// Receiving is done only from the receive loop.
Expand All @@ -43,7 +43,7 @@ internal sealed class Dispatcher : IDisposable
private readonly bool enableChannelMultiplexing;
private readonly Action<string, Exception> clientCertificateFailureHandler;

private bool disposed = false;
private int disposed;

private ServerProperties serverProperties = null;

Expand Down Expand Up @@ -264,7 +264,7 @@ async delegate

public void InjectFaultInjectionConnectionError(TransportException transportException)
{
if (!this.disposed)
if (this.disposed == 0)
{
this.isFaultInjectionedConnectionError = true;
this.faultInjectionTransportException = transportException;
Expand Down Expand Up @@ -451,10 +451,15 @@ public override string ToString()
return this.connection.ToString();
}

// Keep in sync with DisposeAsync().
public void Dispose()
{
this.ThrowIfDisposed();
this.disposed = true;
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);

DefaultTrace.TraceInformation("[RNTBD Dispatcher {0}] Disposing RNTBD Dispatcher {1}", this.ConnectionCorrelationId, this);

Expand Down Expand Up @@ -483,6 +488,43 @@ public void Dispose()
DefaultTrace.TraceInformation("[RNTBD Dispatcher {0}] RNTBD Dispatcher {1} is disposed", this.ConnectionCorrelationId, this);
}

// Keep in sync with Dispose().
public async ValueTask DisposeAsync()
Comment thread
NaluTripician marked this conversation as resolved.
{
if (Interlocked.CompareExchange(ref this.disposed, 1, 0) != 0)
{
return;
}

GC.SuppressFinalize(this);

DefaultTrace.TraceInformation("[RNTBD Dispatcher {0}] Async disposing RNTBD Dispatcher {1}", this.ConnectionCorrelationId, this);
Comment thread
NaluTripician marked this conversation as resolved.

Task idleTimerTaskCopy = null;
Debug.Assert(!Monitor.IsEntered(this.connectionLock));
lock (this.connectionLock)
{
this.StartConnectionShutdown();
idleTimerTaskCopy = this.StopIdleTimer();
}

await this.WaitTaskAsync(idleTimerTaskCopy, "idle timer").ConfigureAwait(false);

Task receiveTaskCopy = null;
Debug.Assert(!Monitor.IsEntered(this.connectionLock));
lock (this.connectionLock)
{
Debug.Assert(this.idleTimer == null);
Debug.Assert(this.idleTimerTask == null);

receiveTaskCopy = this.CloseConnection();
}

await this.WaitTaskAsync(receiveTaskCopy, "receive loop").ConfigureAwait(false);

DefaultTrace.TraceInformation("[RNTBD Dispatcher {0}] RNTBD Dispatcher {1} is disposed", this.ConnectionCorrelationId, this);
}

private void StartIdleTimer()
{
DefaultTrace.TraceInformation("[RNTBD Dispatcher {0}] RNTBD idle connection monitor: Timer is starting...", this.ConnectionCorrelationId);
Expand Down Expand Up @@ -522,7 +564,7 @@ private void StartIdleTimer()
}
}

private void OnIdleTimer(Task precedentTask)
private async Task OnIdleTimerAsync(Task precedentTask)
Comment thread
NaluTripician marked this conversation as resolved.
{
Task receiveTaskCopy = null;

Expand Down Expand Up @@ -572,15 +614,21 @@ private void OnIdleTimer(Task precedentTask)
receiveTaskCopy = this.CloseConnection();
}

this.WaitTask(receiveTaskCopy, "receive loop");
await this.WaitTaskAsync(receiveTaskCopy, "receive loop").ConfigureAwait(false);
}

// this.connectionLock must be held.
private void ScheduleIdleTimer(TimeSpan timeToIdle)
{
Debug.Assert(Monitor.IsEntered(this.connectionLock));
this.idleTimer = this.idleTimerPool.GetPooledTimer((int)timeToIdle.TotalSeconds);
this.idleTimerTask = this.idleTimer.StartTimerAsync().ContinueWith(this.OnIdleTimer, TaskContinuationOptions.OnlyOnRanToCompletion);

// IMPORTANT: .Unwrap() is essential here. Without it, idleTimerTask would be Task<Task>
// and would complete when OnIdleTimerAsync STARTS (returns its inner Task), not when it
// FINISHES. StopIdleTimer() and Dispose/DisposeAsync wait on idleTimerTask — if it
// completes early, disposal proceeds while OnIdleTimerAsync is still running, causing
// use-after-dispose on the connection. Do not remove .Unwrap().
this.idleTimerTask = this.idleTimer.StartTimerAsync().ContinueWith(this.OnIdleTimerAsync, TaskContinuationOptions.OnlyOnRanToCompletion).Unwrap();
Comment thread
NaluTripician marked this conversation as resolved.
Comment thread
NaluTripician marked this conversation as resolved.
Comment thread
NaluTripician marked this conversation as resolved.
this.idleTimerTask.ContinueWith(
failedTask =>
{
Expand Down Expand Up @@ -681,9 +729,31 @@ private void WaitTask(Task t, string description)
}
}

private async Task WaitTaskAsync(Task t, string description)
{
if (t == null)
{
return;
}
try
{
Debug.Assert(!Monitor.IsEntered(this.callLock));
Debug.Assert(!Monitor.IsEntered(this.connectionLock));
await t.ConfigureAwait(false);
}
catch (Exception e)
{
DefaultTrace.TraceWarning(
"[RNTBD Dispatcher {0}][{1}] Parallel task failed: {2}. Exception: {3}: {4}",
this.ConnectionCorrelationId, this, description, e.GetType().Name, e.Message);
// Intentionally swallowing the exception. The caller can't
Comment thread
NaluTripician marked this conversation as resolved.
// do anything useful with it.
}
}

private void ThrowIfDisposed()
{
if (this.disposed)
if (this.disposed != 0)
{
Debug.Assert(this.serverUri != null);
throw new ObjectDisposedException(
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/direct/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ public Task OpenChannelAsync(
bool Healthy { get; }

void Close();

Task CloseAsync();
Comment thread
NaluTripician marked this conversation as resolved.
}
}
Loading
Loading