Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Diagnostics.NETCore.Client
{
internal abstract class IpcServerTransport : IDisposable
{
private IIpcServerTransportCallbackInternal _callback;
private bool _disposed;

public static IpcServerTransport Create(string transportPath, int maxConnections)
Expand Down Expand Up @@ -66,6 +67,16 @@ protected void VerifyNotDisposed()
throw new ObjectDisposedException(this.GetType().Name);
}
}

internal void SetCallback(IIpcServerTransportCallbackInternal callback)
{
_callback = callback;
}

protected void OnCreateNewServer()
{
_callback?.CreatedNewServer();
}
}

internal sealed class WindowsPipeServerTransport : IpcServerTransport
Expand Down Expand Up @@ -128,6 +139,7 @@ private void CreateNewPipeServer()
_maxInstances,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
OnCreateNewServer();
}
}

Expand Down Expand Up @@ -196,6 +208,12 @@ private void CreateNewSocketServer()
_socket.Bind(_path);
_socket.Listen(_backlog);
_socket.LingerState.Enabled = false;
OnCreateNewServer();
}
}

internal interface IIpcServerTransportCallbackInternal
{
void CreatedNewServer();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,14 @@ private void VerifyIsStarted()
/// <returns>A task that completes when the server is no longer listening at the transport path.</returns>
private async Task ListenAsync(int maxConnections, CancellationToken token)
{
// This disposal shuts down the transport in case an exception is thrown.
using var transport = IpcServerTransport.Create(_transportPath, maxConnections);
// Set transport callback for testing purposes.
transport.SetCallback(TransportCallback);
// This disposal shuts down the transport in case of cancellation; causes the transport
// to not recreate the server stream before the AcceptAsync call observes the cancellation.
using var _ = token.Register(() => transport.Dispose());

while (!token.IsCancellationRequested)
{
Stream stream = null;
Expand Down Expand Up @@ -340,5 +347,7 @@ private static bool TestStream(Stream stream)
private bool IsStarted => null != _listenTask;

public static int MaxAllowedConnections = IpcServerTransport.MaxAllowedConnections;

internal IIpcServerTransportCallbackInternal TransportCallback { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,52 @@ private async Task ReversedServerSingleTargetExitsClientInviableTestCore(bool us
await VerifyNoNewEndpointInfos(server, useAsync);
}

/// <summary>
/// Validates that the <see cref="ReversedDiagnosticsServer"/> does not create a new server
/// transport during disposal.
/// </summary>
[Fact]
public async Task ReversedServerNoCreateTransportAfterDispose()
{
var transportCallback = new IpcServerTransportCallback();

int transportVersion = 0;
TestRunner runner = null;
try
{
await using var server = CreateReversedServer(out string transportName);
server.TransportCallback = transportCallback;
server.Start();

// Start client pointing to diagnostics server
runner = StartTracee(transportName);

// Get client connection
IpcEndpointInfo info = await AcceptEndpointInfo(server, useAsync: true);

await VerifyEndpointInfo(runner, info, useAsync: true);

// There should not be any new endpoint infos
await VerifyNoNewEndpointInfos(server, useAsync: true);

ResumeRuntime(info);

await VerifyWaitForConnection(info, useAsync: true);

transportVersion = await transportCallback.GetStableTransportVersion();

// Server will be disposed
}
finally
{
_outputHelper.WriteLine("Stopping tracee.");
runner?.Stop();
}

// Check that the reversed server did not create a new server transport upon disposal.
Assert.Equal(transportVersion, await transportCallback.GetStableTransportVersion());
}

private ReversedDiagnosticsServer CreateReversedServer(out string transportName)
{
transportName = ReversedServerHelper.CreateServerTransportName();
Expand Down Expand Up @@ -592,5 +638,84 @@ public async Task WaitForConnection(TimeSpan timeout, bool expectTimeout = false
}
}
}

private class IpcServerTransportCallback : IIpcServerTransportCallbackInternal
{
private static readonly TimeSpan StableTransportSemaphoreTimeout = TimeSpan.FromSeconds(3);
private static readonly TimeSpan StableTransportVersionPeriod = TimeSpan.FromSeconds(3);
private static readonly TimeSpan StableTransportVersionTimeout = TimeSpan.FromSeconds(30);

private readonly Timer _transportVersionTimer;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1);

private int _transportVersion = 0;
private TaskCompletionSource<int> _transportVersionSource;

public IpcServerTransportCallback()
{
// Initially set timer to not callback
_transportVersionTimer = new Timer(NotifyStableTransportVersion, this, Timeout.Infinite, 0);
}

public void CreatedNewServer()
{
_semaphore.Wait(StableTransportSemaphoreTimeout);
try
{
_transportVersion++;
// Restart timer with existing settings
_transportVersionTimer.Change(0, 0);
}
finally
{
_semaphore.Release();
}
}

private static void NotifyStableTransportVersion(object state)
{
((IpcServerTransportCallback)state).NotifyStableTransportVersion();
}

private void NotifyStableTransportVersion()
{
_semaphore.Wait(StableTransportSemaphoreTimeout);
try
{
// Disable timer callback
_transportVersionTimer.Change(Timeout.Infinite, 0);
// Notify and clear the completion source
_transportVersionSource?.TrySetResult(_transportVersion);
_transportVersionSource = null;
}
finally
{
_semaphore.Release();
}
}

public async Task<int> GetStableTransportVersion()
{
await _semaphore.WaitAsync(StableTransportSemaphoreTimeout);
try
{
_transportVersionSource = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
// Set timer to callback a period of time after last transport update
_transportVersionTimer.Change(StableTransportVersionPeriod, Timeout.InfiniteTimeSpan);
}
finally
{
_semaphore.Release();
}

using var cancellation = new CancellationTokenSource(StableTransportVersionTimeout);

CancellationToken token = cancellation.Token;
using var _ = token.Register(() => _transportVersionSource.TrySetCanceled(token));

// Wait for the transport version to stabilize for a certain amount of time.
return await _transportVersionSource.Task;
}
}
}
}