Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4ca3c63
Add support for TCP and Unix Domain sockets.
lateralusX Mar 1, 2021
6f016b1
Add Tcp support to IpcServerTransport.
lateralusX Mar 2, 2021
0003639
Add support to bind any interfaces (ipv4,ipv6) and set address reuse.
lateralusX Mar 3, 2021
27a0e56
Socket.Bind PAL already sets SO_REUSEADDR when needed.
lateralusX Mar 3, 2021
1970a1b
Some renaming.
lateralusX Mar 4, 2021
1e2f13c
Restructure/Align code.
lateralusX Mar 4, 2021
6c7b7b0
Disable TcpIp for all existing tools.
lateralusX Mar 5, 2021
3f27fca
Add dotnet-dsproxy tool.
lateralusX Mar 9, 2021
5de89ab
Error handling and logging.
lateralusX Mar 10, 2021
607dbf2
Wait on completion of running proxy task in parallell with accept.
lateralusX Mar 10, 2021
4892d56
Improve detecting runtime shutdown.
lateralusX Mar 10, 2021
3a4e111
Align build flag with native runtime TCP PAL build flag.
lateralusX Mar 10, 2021
d1af66b
Reflect advertise package size in proxy stats.
lateralusX Mar 10, 2021
d76a587
Logging, commandline and code restructure.
lateralusX Mar 11, 2021
5a7fd92
Remove TCP/IP suport for tooling (only available in dotnet-dsproxy).
lateralusX Mar 11, 2021
8569672
Fix some naming.
lateralusX Mar 11, 2021
b86805c
Fix netcore2.1 build.
lateralusX Mar 11, 2021
6596c2f
Reduce proxy TCP delay and add additional debug logging.
lateralusX Mar 12, 2021
a1945c7
Adding additional explicit Dispose calls.
lateralusX Mar 12, 2021
941bc5b
Switch into using instead of explicit dispose.
lateralusX Mar 15, 2021
72b7f35
Use proxy process ID in advertise message.
lateralusX Mar 16, 2021
8c12bc6
Adjustments after rebase.
lateralusX Mar 16, 2021
4172948
Add ability to retry client connection after timeout.
lateralusX Mar 17, 2021
c6c918d
Restructure code for better reuse implementing server<->server proxy.
lateralusX Mar 17, 2021
fa35020
Rename file.
lateralusX Mar 17, 2021
cbce1e6
Adjust shutdown logging.
lateralusX Mar 17, 2021
861e29a
Add server-server proxy mode.
lateralusX Mar 17, 2021
35bbcc5
Parallel accept of ipc and tcp connections, re-worked retry loop.
lateralusX Mar 18, 2021
c73d88d
Restructure ipc client retry logic.
lateralusX Mar 18, 2021
a24742c
Only re-create transport socket/pipe when not explicitly canceled.
lateralusX Mar 18, 2021
dd78b21
Logging additional warnings.
lateralusX Mar 18, 2021
f924217
Delay client ipc retries on socket failures.
lateralusX Mar 19, 2021
d5d68cf
Move DiagnosticServerProxy classes into NETCore.Client library.
lateralusX Mar 19, 2021
7e56eba
Adjusting some names.
lateralusX Mar 19, 2021
a71d695
Adjusted logging capabilities.
lateralusX Mar 22, 2021
f16f8a5
Rename proxy -> router.
lateralusX Mar 22, 2021
e2f6ead
Added dotnet-dsrouter to solution file and set IsShipping to false.
lateralusX Mar 24, 2021
26545d1
Review feedback.
lateralusX Apr 6, 2021
2c97d99
Renamed a couple of files to match rename of classes.
lateralusX Apr 6, 2021
5a3cffa
Move EndPoint into bind/connect/connectasync calls, drop IpcTcpSocket.
lateralusX Apr 6, 2021
3e2c6de
File rename/split due to class refactoring.
lateralusX Apr 6, 2021
101242d
Move CreateNewSocketServer call away from base class.
lateralusX Apr 6, 2021
7f9bbbd
Switch to ILogger.
lateralusX Apr 6, 2021
019e022
Extend transport callback with EndPoint, used to bind port 0 in dsrou…
lateralusX Apr 7, 2021
4d7822a
Add ability to launch process through dsrouter relaying TcpServerAddr…
lateralusX Apr 7, 2021
9bc0e1d
Suspend child process for both server-server and client-server modes.
lateralusX Apr 7, 2021
d20f0b5
Unix Domain Socket needs address family on create.
lateralusX Apr 8, 2021
5a722e2
Set DOTNET_DiagnosticPorts unconditionally.
lateralusX Apr 8, 2021
73373a1
Review feedback.
lateralusX Apr 12, 2021
184ca11
Fix parsing of *:0 in ParseTcpIpEndPoint.
lateralusX Apr 12, 2021
c3b1564
Adjust timeouts + detect disconnected ipc client while waiting for tc…
lateralusX Apr 14, 2021
fc326c8
Detect both read/write state in IsStreamConnected.
lateralusX Apr 15, 2021
ff7d474
Split debug logging into debug and trace log levels.
lateralusX Apr 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private IpcAdvertise(byte[] magic, Guid cookie, UInt64 pid, UInt16 future)
RuntimeInstanceCookie = cookie;
}

public static int V1SizeInBytes { get; } = IpcAdvertiseV1SizeInBytes;

public static async Task<IpcAdvertise> ParseAsync(Stream stream, CancellationToken token)
{
byte[] buffer = new byte[IpcAdvertiseV1SizeInBytes];
Expand Down Expand Up @@ -78,6 +80,28 @@ public static async Task<IpcAdvertise> ParseAsync(Stream stream, CancellationTok
return new IpcAdvertise(magic, cookie, pid, future);
}

public static async Task SerializeAsync(Stream stream, Guid runtimeInstanceCookie, ulong processId, CancellationToken token)
{
int index = 0;
byte[] buffer = new byte[IpcAdvertiseV1SizeInBytes];

Array.Copy(Magic_V1, buffer, Magic_V1.Length);
index += Magic_V1.Length;

byte[] cookieBuffer = runtimeInstanceCookie.ToByteArray();
Array.Copy(cookieBuffer, 0, buffer, index, cookieBuffer.Length);
index += cookieBuffer.Length;

Array.Copy(BitConverter.GetBytes(processId), 0, buffer, index, sizeof(ulong));
index += sizeof(ulong);

short future = 0;
Array.Copy(BitConverter.GetBytes(future), 0, buffer, index, sizeof(short));
index += sizeof(short);

await stream.WriteAsync(buffer, 0, index).ConfigureAwait(false);
}

public override string ToString()
{
return $"{{ Magic={Magic}; ClrInstanceId={RuntimeInstanceCookie}; ProcessId={ProcessId}; Future={Future} }}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@ internal abstract class IpcServerTransport : IDisposable
private IIpcServerTransportCallbackInternal _callback;
private bool _disposed;

public static IpcServerTransport Create(string transportPath, int maxConnections)
public static IpcServerTransport Create(string address, int maxConnections, bool enableTcpIpProtocol)
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
if (!enableTcpIpProtocol || !IpcTcpSocketTransport.TryParseIPAddress(address))
{
return new WindowsPipeServerTransport(transportPath, maxConnections);
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
return new IpcWindowsNamedPipeServerTransport(address, maxConnections);
}
else
{
return new IpcUnixDomainSocketServerTransport(address, maxConnections);
}
}
else
{
return new UnixDomainSocketServerTransport(transportPath, maxConnections);
return new IpcTcpSocketServerTransport(address, maxConnections);
}
}

Expand All @@ -45,18 +52,10 @@ protected virtual void Dispose(bool disposing)

public abstract Task<Stream> AcceptAsync(CancellationToken token);

public static int MaxAllowedConnections
{
public static int MaxAllowedConnections {
get
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
return NamedPipeServerStream.MaxAllowedServerInstances;
}
else
{
return (int)SocketOptionName.MaxConnections;
}
return -1;
}
}

Expand All @@ -79,7 +78,7 @@ protected void OnCreateNewServer()
}
}

internal sealed class WindowsPipeServerTransport : IpcServerTransport
internal sealed class IpcWindowsNamedPipeServerTransport : IpcServerTransport
{
private const string PipePrefix = @"\\.\pipe\";

Expand All @@ -89,11 +88,11 @@ internal sealed class WindowsPipeServerTransport : IpcServerTransport
private readonly string _pipeName;
private readonly int _maxInstances;

public WindowsPipeServerTransport(string pipeName, int maxInstances)
public IpcWindowsNamedPipeServerTransport(string pipeName, int maxInstances)
{
_maxInstances = maxInstances;
_maxInstances = maxInstances != MaxAllowedConnections ? maxInstances : NamedPipeServerStream.MaxAllowedServerInstances;
_pipeName = pipeName.StartsWith(PipePrefix) ? pipeName.Substring(PipePrefix.Length) : pipeName;
CreateNewPipeServer();
_stream = CreateNewNamedPipeServer(_pipeName, _maxInstances);
}

protected override void Dispose(bool disposing)
Expand All @@ -113,50 +112,56 @@ public override async Task<Stream> AcceptAsync(CancellationToken token)
VerifyNotDisposed();

using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);

NamedPipeServerStream connectedStream;
try
{
// Connect client to named pipe server stream.
await _stream.WaitForConnectionAsync(linkedSource.Token).ConfigureAwait(false);

connectedStream = _stream;
// Transfer ownership of connected named pipe.
var connectedStream = _stream;

// Setup new named pipe server stream used in upcomming accept calls.
_stream = CreateNewNamedPipeServer(_pipeName, _maxInstances);

return connectedStream;
}
finally
catch (Exception)
{
if (!_cancellation.IsCancellationRequested)
// Keep named pipe server stream when getting any kind of cancel request.
// Cancel happens when complete transport is about to disposed or caller
// cancels out specific accept call, no need to recycle named pipe server stream.
// In all other exception scenarios named pipe server stream will be re-created.
if (!linkedSource.IsCancellationRequested)
{
CreateNewPipeServer();
_stream.Dispose();
_stream = CreateNewNamedPipeServer(_pipeName, _maxInstances);
}
throw;
}
return connectedStream;
}

private void CreateNewPipeServer()
private NamedPipeServerStream CreateNewNamedPipeServer(string pipeName, int maxInstances)
{
_stream = new NamedPipeServerStream(
_pipeName,
PipeDirection.InOut,
_maxInstances,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
var stream = new NamedPipeServerStream(pipeName, PipeDirection.InOut, maxInstances, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
OnCreateNewServer();
return stream;
}
}

internal sealed class UnixDomainSocketServerTransport : IpcServerTransport
internal abstract class IpcSocketServerTransport : IpcServerTransport
{
private readonly CancellationTokenSource _cancellation = new CancellationTokenSource();
private readonly int _backlog;
private readonly string _path;
private readonly string _address;

private UnixDomainSocket _socket;
private IpcSocketTransport _socket;

public UnixDomainSocketServerTransport(string path, int backlog)
public IpcSocketServerTransport(string address, int backlog)
{
_backlog = backlog;
_path = path;
_address = address;

CreateNewSocketServer();
_socket = CreateNewSocketServer(_address, _backlog);
}

protected override void Dispose(bool disposing)
Expand Down Expand Up @@ -187,28 +192,83 @@ public override async Task<Stream> AcceptAsync(CancellationToken token)
using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellation.Token);
try
{
Socket socket = await _socket.AcceptAsync(linkedSource.Token).ConfigureAwait(false);
// Accept next client socket.
var socket = await _socket.AcceptAsync(linkedSource.Token).ConfigureAwait(false);

// Configure client socket based on transport type.
OnAccept(socket);

return new ExposedSocketNetworkStream(socket, ownsSocket: true);
}
catch (Exception)
{
// Recreate socket if transport is not disposed.
if (!_cancellation.IsCancellationRequested)
// Keep server socket when getting any kind of cancel request.
// Cancel happens when complete transport is about to disposed or caller
// cancels out specific accept call, no need to recycle server socket.
// In all other exception scenarios server socket will be re-created.
if (!linkedSource.IsCancellationRequested)
{
CreateNewSocketServer();
_socket = CreateNewSocketServer(_address, _backlog);
}
throw;
}
}

private void CreateNewSocketServer()
internal abstract bool OnAccept(Socket socket);

internal abstract IpcSocketTransport CreateNewSocketServer(string address, int backlog);
}

internal sealed class IpcTcpSocketServerTransport : IpcSocketServerTransport
{
public IpcTcpSocketServerTransport(string address, int backlog)
: base (address, backlog != MaxAllowedConnections ? backlog : 100)
{
}

internal override bool OnAccept(Socket socket)
{
socket.NoDelay = true;
return true;
}

internal override IpcSocketTransport CreateNewSocketServer(string address, int backlog)
{
string hostAddress;
int hostPort;

if (!IpcTcpSocketTransport.TryParseIPAddress(address, out hostAddress, out hostPort))
throw new ArgumentException(string.Format("Could not parse {0} into host, port", address));

var socket = IpcTcpSocketTransport.Create(hostAddress, hostPort);
socket.Bind();
socket.Listen(backlog);
socket.LingerState.Enabled = false;
OnCreateNewServer();
return socket;
}
}

internal sealed class IpcUnixDomainSocketServerTransport : IpcSocketServerTransport
{
public IpcUnixDomainSocketServerTransport(string path, int backlog)
: base (path, backlog != MaxAllowedConnections ? backlog : (int)SocketOptionName.MaxConnections)
{
}

internal override bool OnAccept(Socket socket)
{
return true;
}

internal override IpcSocketTransport CreateNewSocketServer(string address, int backlog)
{
_socket = new UnixDomainSocket();
_socket.Bind(_path);
_socket.Listen(_backlog);
_socket.LingerState.Enabled = false;
var socket = IpcUnixDomainSocketTransport.Create(address);
socket.Bind();
socket.Listen(backlog);
socket.LingerState.Enabled = false;
OnCreateNewServer();
return socket;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@

using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.NETCore.Client
{
internal sealed class UnixDomainSocket : Socket
internal class IpcSocketTransport : Socket
{
private bool _ownsSocketFile;
private string _path;
EndPoint _address;

public UnixDomainSocket() :
base(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified)
public IpcSocketTransport(EndPoint address, SocketType type, ProtocolType protocol)
: base(address.AddressFamily, type, protocol)
{
_address = address;
}

public async Task<Socket> AcceptAsync(CancellationToken token)
Expand All @@ -44,24 +43,18 @@ public async Task<Socket> AcceptAsync(CancellationToken token)
}
}

public void Bind(string path)
public virtual void Bind()
{
Bind(CreateUnixDomainSocketEndPoint(path));

_ownsSocketFile = true;
_path = path;
Bind(_address);
}

public void Connect(string path, TimeSpan timeout)
public virtual void Connect(TimeSpan timeout)
{
IAsyncResult result = BeginConnect(CreateUnixDomainSocketEndPoint(path), null, null);
IAsyncResult result = BeginConnect(_address, null, null);

if (result.AsyncWaitHandle.WaitOne(timeout))
{
EndConnect(result);

_ownsSocketFile = false;
_path = path;
}
else
{
Expand All @@ -70,15 +63,15 @@ public void Connect(string path, TimeSpan timeout)
}
}

public async Task ConnectAsync(string path, CancellationToken token)
public async Task ConnectAsync(CancellationToken token)
{
using (token.Register(() => Close(0)))
{
try
{
Func<AsyncCallback, object, IAsyncResult> beginConnect = (callback, state) =>
{
return BeginConnect(CreateUnixDomainSocketEndPoint(path), callback, state);
return BeginConnect(_address, callback, state);
};
await Task.Factory.FromAsync(beginConnect, EndConnect, this).ConfigureAwait(false);
}
Expand All @@ -93,33 +86,5 @@ public async Task ConnectAsync(string path, CancellationToken token)
}
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_ownsSocketFile && !string.IsNullOrEmpty(_path) && File.Exists(_path))
{
File.Delete(_path);
}
}
base.Dispose(disposing);
}

private static EndPoint CreateUnixDomainSocketEndPoint(string path)
{
#if NETCOREAPP
return new UnixDomainSocketEndPoint(path);
#elif NETSTANDARD2_0
// UnixDomainSocketEndPoint is not part of .NET Standard 2.0
var type = typeof(Socket).Assembly.GetType("System.Net.Sockets.UnixDomainSocketEndPoint")
?? Type.GetType("System.Net.Sockets.UnixDomainSocketEndPoint, System.Core");
if (type == null)
{
throw new PlatformNotSupportedException("Current process is not running a compatible .NET runtime.");
}
var ctor = type.GetConstructor(new[] { typeof(string) });
return (EndPoint)ctor.Invoke(new object[] { path });
#endif
}
}
}
Loading