Skip to content

Commit

Permalink
Socket.SendFileAsync based on SendPacketsAsync (#52208)
Browse files Browse the repository at this point in the history
* Socket.SendFileAsync layered on top of SendPacketsAsync

* Tests

* Cleanup

* Set SendPacketsFlags

* Check if the socket is connection orientated

Cf. #52208 (comment)

* Try to re-use the SendPacketsElement-array

Cf. #52208 (comment)

* Fixed test

Cf. #52208 (comment)

* Update src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs
  • Loading branch information
gfoidl authored May 6, 2021
1 parent 222cef5 commit 3e6de7e
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 215 deletions.
2 changes: 2 additions & 0 deletions src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ public static void Select(System.Collections.IList? checkRead, System.Collection
public void SendFile(string? fileName) { }
public void SendFile(string? fileName, byte[]? preBuffer, byte[]? postBuffer, System.Net.Sockets.TransmitFileOptions flags) { }
public void SendFile(string? fileName, System.ReadOnlySpan<byte> preBuffer, System.ReadOnlySpan<byte> postBuffer, System.Net.Sockets.TransmitFileOptions flags) { }
public System.Threading.Tasks.ValueTask SendFileAsync(string? fileName, System.ReadOnlyMemory<byte> preBuffer, System.ReadOnlyMemory<byte> postBuffer, System.Net.Sockets.TransmitFileOptions flags, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public System.Threading.Tasks.ValueTask SendFileAsync(string? fileName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public bool SendPacketsAsync(System.Net.Sockets.SocketAsyncEventArgs e) { throw null; }
public int SendTo(byte[] buffer, int offset, int size, System.Net.Sockets.SocketFlags socketFlags, System.Net.EndPoint remoteEP) { throw null; }
public int SendTo(byte[] buffer, int size, System.Net.Sockets.SocketFlags socketFlags, System.Net.EndPoint remoteEP) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
<Compile Include="System\Net\Sockets\SocketAsyncEventArgs.Windows.cs" />
<Compile Include="System\Net\Sockets\IOControlKeepAlive.Windows.cs" />
<Compile Include="System\Net\Sockets\SocketPal.Windows.cs" />
<Compile Include="System\Net\Sockets\TransmitFileAsyncResult.Windows.cs" />
<Compile Include="System\Net\Sockets\UnixDomainSocketEndPoint.Windows.cs" />
<Compile Include="$(CommonPath)System\Net\ContextAwareResult.Windows.cs"
Link="Common\System\Net\ContextAwareResult.Windows.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public SendPacketsElement(byte[] buffer, int offset, int count, bool endOfPacket
throw new ArgumentOutOfRangeException(nameof(count));
}

Initialize(null, null, buffer, new ReadOnlyMemory<byte>(buffer, offset, count), offset, count, endOfPacket);
Initialize(null, null, buffer, buffer.AsMemory(offset, count), offset, count, endOfPacket);
}

public SendPacketsElement(ReadOnlyMemory<byte> buffer) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,95 @@ public ValueTask<int> SendToAsync(ReadOnlyMemory<byte> buffer, SocketFlags socke
return saea.SendToAsync(this, cancellationToken);
}

/// <summary>
/// Sends the file <paramref name="fileName"/> to a connected <see cref="Socket"/> object.
/// </summary>
/// <param name="fileName">A <see cref="string"/> that contains the path and name of the file to be sent. This parameter can be <see langword="null"/>.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <exception cref="ObjectDisposedException">The <see cref="Socket"/> object has been closed.</exception>
/// <exception cref="NotSupportedException">The <see cref="Socket"/> object is not connected to a remote host.</exception>
/// <exception cref="FileNotFoundException">The file <paramref name="fileName"/> was not found.</exception>
/// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception>
public ValueTask SendFileAsync(string? fileName, CancellationToken cancellationToken = default)
{
return SendFileAsync(fileName, default, default, TransmitFileOptions.UseDefaultWorkerThread, cancellationToken);
}

/// <summary>
/// Sends the file <paramref name="fileName"/> and buffers of data to a connected <see cref="Socket"/> object
/// using the specified <see cref="TransmitFileOptions"/> value.
/// </summary>
/// <param name="fileName">A <see cref="string"/> that contains the path and name of the file to be sent. This parameter can be <see langword="null"/>.</param>
/// <param name="preBuffer">A <see cref="byte"/> array that contains data to be sent before the file is sent. This parameter can be <see langword="null"/>.</param>
/// <param name="postBuffer">A <see cref="byte"/> array that contains data to be sent after the file is sent. This parameter can be <see langword="null"/>.</param>
/// <param name="flags">One or more of <see cref="TransmitFileOptions"/> values.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <exception cref="ObjectDisposedException">The <see cref="Socket"/> object has been closed.</exception>
/// <exception cref="NotSupportedException">The <see cref="Socket"/> object is not connected to a remote host.</exception>
/// <exception cref="FileNotFoundException">The file <paramref name="fileName"/> was not found.</exception>
/// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception>
public ValueTask SendFileAsync(string? fileName, ReadOnlyMemory<byte> preBuffer, ReadOnlyMemory<byte> postBuffer, TransmitFileOptions flags, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
{
return ValueTask.FromCanceled(cancellationToken);
}

if (!IsConnectionOriented)
{
var soex = new SocketException((int)SocketError.NotConnected);
return ValueTask.FromException(soex);
}

int packetsCount = 0;

if (fileName is not null)
{
packetsCount++;
}

if (!preBuffer.IsEmpty)
{
packetsCount++;
}

if (!postBuffer.IsEmpty)
{
packetsCount++;
}

AwaitableSocketAsyncEventArgs saea =
Interlocked.Exchange(ref _singleBufferSendEventArgs, null) ??
new AwaitableSocketAsyncEventArgs(this, isReceiveForCaching: false);

SendPacketsElement[] sendPacketsElements = saea.SendPacketsElements?.Length == packetsCount
? saea.SendPacketsElements
: new SendPacketsElement[packetsCount];

int index = 0;
if (!preBuffer.IsEmpty)
{
sendPacketsElements[index++] = new SendPacketsElement(preBuffer, endOfPacket: index == packetsCount);
}

if (fileName is not null)
{
sendPacketsElements[index++] = new SendPacketsElement(fileName, 0, 0, endOfPacket: index == packetsCount);
}

if (!postBuffer.IsEmpty)
{
sendPacketsElements[index++] = new SendPacketsElement(postBuffer, endOfPacket: index == packetsCount);
}

Debug.Assert(index == packetsCount);

saea.SendPacketsFlags = flags;
saea.SendPacketsElements = sendPacketsElements;
saea.WrapExceptionsForNetworkStream = false;
return saea.SendPacketsAsync(this, cancellationToken);
}

private static void ValidateBufferArguments(byte[] buffer, int offset, int size)
{
if (buffer == null)
Expand Down Expand Up @@ -1005,6 +1094,26 @@ public ValueTask SendAsyncForNetworkStream(Socket socket, CancellationToken canc
ValueTask.FromException(CreateException(error));
}

public ValueTask SendPacketsAsync(Socket socket, CancellationToken cancellationToken)
{
Debug.Assert(Volatile.Read(ref _continuation) == null, "Expected null continuation to indicate reserved for use");

// TODO: Support cancellation by passing cancellationToken down through SendPacketsAsync, etc.
if (socket.SendPacketsAsync(this))
{
_cancellationToken = cancellationToken;
return new ValueTask(this, _token);
}

SocketError error = SocketError;

Release();

return error == SocketError.Success ?
default :
ValueTask.FromException(CreateException(error));
}

public ValueTask<int> SendToAsync(Socket socket, CancellationToken cancellationToken)
{
Debug.Assert(Volatile.Read(ref _continuation) == null, "Expected null continuation to indicate reserved for use");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,61 +226,5 @@ private void SendFileInternal(string? fileName, ReadOnlySpan<byte> preBuffer, Re
Send(postBuffer);
}
}

private async Task SendFileInternalAsync(FileStream? fileStream, byte[]? preBuffer, byte[]? postBuffer)
{
SocketError errorCode = SocketError.Success;
using (fileStream)
{
// Send the preBuffer, if any
// This will throw on error
if (preBuffer != null && preBuffer.Length > 0)
{
// Using "this." makes the extension method kick in
await this.SendAsync(new ArraySegment<byte>(preBuffer), SocketFlags.None).ConfigureAwait(false);
}

// Send the file, if any
if (fileStream != null)
{
var tcs = new TaskCompletionSource<SocketError>();
errorCode = SocketPal.SendFileAsync(_handle, fileStream, (_, socketError) => tcs.SetResult(socketError));
if (errorCode == SocketError.IOPending)
{
errorCode = await tcs.Task.ConfigureAwait(false);
}
}
}

if (errorCode != SocketError.Success)
{
UpdateSendSocketErrorForDisposed(ref errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}

// Send the postBuffer, if any
// This will throw on error
if (postBuffer != null && postBuffer.Length > 0)
{
// Using "this." makes the extension method kick in
await this.SendAsync(new ArraySegment<byte>(postBuffer), SocketFlags.None).ConfigureAwait(false);
}
}

private IAsyncResult BeginSendFileInternal(string? fileName, byte[]? preBuffer, byte[]? postBuffer, TransmitFileOptions flags, AsyncCallback? callback, object? state)
{
CheckTransmitFileOptions(flags);

// Open the file, if any
// Open it before we send the preBuffer so that any exception happens first
FileStream? fileStream = OpenFile(fileName);

return TaskToApm.Begin(SendFileInternalAsync(fileStream, preBuffer, postBuffer), callback, state);
}

private void EndSendFileInternal(IAsyncResult asyncResult)
{
TaskToApm.End(asyncResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,59 +400,6 @@ private void SendFileInternal(string? fileName, ReadOnlySpan<byte> preBuffer, Re
}
}

private IAsyncResult BeginSendFileInternal(string? fileName, byte[]? preBuffer, byte[]? postBuffer, TransmitFileOptions flags, AsyncCallback? callback, object? state)
{
FileStream? fileStream = OpenFile(fileName);

TransmitFileAsyncResult asyncResult = new TransmitFileAsyncResult(this, state, callback);
asyncResult.StartPostingAsyncOp(false);

SocketError errorCode = SocketPal.SendFileAsync(_handle, fileStream, preBuffer, postBuffer, flags, asyncResult);

// Check for synchronous exception
if (!CheckErrorAndUpdateStatus(errorCode))
{
UpdateSendSocketErrorForDisposed(ref errorCode);
throw new SocketException((int)errorCode);
}

asyncResult.FinishPostingAsyncOp();

return asyncResult;
}

private void EndSendFileInternal(IAsyncResult asyncResult)
{
TransmitFileAsyncResult? castedAsyncResult = asyncResult as TransmitFileAsyncResult;
if (castedAsyncResult == null || castedAsyncResult.AsyncObject != this)
{
throw new ArgumentException(SR.net_io_invalidasyncresult, nameof(asyncResult));
}

if (castedAsyncResult.EndCalled)
{
throw new InvalidOperationException(SR.Format(SR.net_io_invalidendcall, "EndSendFile"));
}

castedAsyncResult.InternalWaitForCompletion();
castedAsyncResult.EndCalled = true;

// If the user passed the Disconnect and/or ReuseSocket flags, then TransmitFile disconnected the socket.
// Update our state to reflect this.
if (castedAsyncResult.DoDisconnect)
{
SetToDisconnected();
_remoteEndPoint = null;
}

SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode;
if (errorCode != SocketError.Success)
{
UpdateSendSocketErrorForDisposed(ref errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode);
}
}

internal ThreadPoolBoundHandle GetOrAllocateThreadPoolBoundHandle() =>
_handle.GetThreadPoolBoundHandle() ??
GetOrAllocateThreadPoolBoundHandleSlow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2216,7 +2216,7 @@ public IAsyncResult BeginSendFile(string? fileName, byte[]? preBuffer, byte[]? p

if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"::DoBeginSendFile() SRC:{LocalEndPoint} DST:{RemoteEndPoint} fileName:{fileName}");

return BeginSendFileInternal(fileName, preBuffer, postBuffer, flags, callback, state);
return TaskToApm.Begin(SendFileAsync(fileName, preBuffer, postBuffer, flags).AsTask(), callback, state);
}

public void EndSendFile(IAsyncResult asyncResult)
Expand All @@ -2228,7 +2228,7 @@ public void EndSendFile(IAsyncResult asyncResult)
throw new ArgumentNullException(nameof(asyncResult));
}

EndSendFileInternal(asyncResult);
TaskToApm.End(asyncResult);
}

public IAsyncResult BeginSendTo(byte[] buffer, int offset, int size, SocketFlags socketFlags, EndPoint remoteEP, AsyncCallback? callback, object? state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1881,9 +1881,6 @@ public static SocketError Shutdown(SafeSocketHandle handle, bool isConnected, bo
return GetSocketErrorForErrorCode(err);
}

public static SocketError SendFileAsync(SafeSocketHandle handle, FileStream fileStream, Action<long, SocketError> callback) =>
SendFileAsync(handle, fileStream, 0, fileStream.Length, callback);

private static SocketError SendFileAsync(SafeSocketHandle handle, FileStream fileStream, long offset, long count, Action<long, SocketError> callback)
{
long bytesSent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,30 +1074,6 @@ private static unsafe bool TransmitFileHelper(
}
}

public static unsafe SocketError SendFileAsync(SafeSocketHandle handle, FileStream? fileStream, byte[]? preBuffer, byte[]? postBuffer, TransmitFileOptions flags, TransmitFileAsyncResult asyncResult)
{
asyncResult.SetUnmanagedStructures(fileStream, preBuffer, postBuffer, (flags & (TransmitFileOptions.Disconnect | TransmitFileOptions.ReuseSocket)) != 0);
try
{
bool success = TransmitFileHelper(
handle,
fileStream?.SafeFileHandle,
asyncResult.DangerousOverlappedPointer, // SafeHandle was just created in SetUnmanagedStructures
preBuffer is not null ? Marshal.UnsafeAddrOfPinnedArrayElement(preBuffer, 0) : IntPtr.Zero,
preBuffer?.Length ?? 0,
postBuffer is not null ? Marshal.UnsafeAddrOfPinnedArrayElement(postBuffer, 0) : IntPtr.Zero,
postBuffer?.Length ?? 0,
flags);

return asyncResult.ProcessOverlappedResult(success, 0);
}
catch
{
asyncResult.ReleaseUnmanagedStructures();
throw;
}
}

public static void CheckDualModeReceiveSupport(Socket socket)
{
// Dual-mode sockets support received packet info on Windows.
Expand Down

This file was deleted.

Loading

0 comments on commit 3e6de7e

Please sign in to comment.