Skip to content

Commit

Permalink
Pool SocketSenders (#30771)
Browse files Browse the repository at this point in the history
- SocketAsyncEventArgs have lots of state on them and as a result are quite big (~350) bytes at runtime. We can pool these since sends are usually very fast and we can reduce the per connection overhead as a result.
- We also allocate one per IOQueue to reduce contention.
- Fixed buffer list management
- Disposed pool when the transport is disposed
- Added project to slnf so running tests in VS was possible
- Clear the buffer and buffer list before returning to the pool
- This cleans up dumps as the pooled senders don't see references to buffers
while pooled in the queue
- Keep track of items in the pool separately from the queue count.
  • Loading branch information
davidfowl authored Mar 16, 2021
1 parent d2a0cbc commit 0a7a620
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 74 deletions.
3 changes: 2 additions & 1 deletion src/Servers/Kestrel/Kestrel.slnf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
"src\\Servers\\Kestrel\\test\\Sockets.BindTests\\Sockets.BindTests.csproj",
"src\\Servers\\Kestrel\\test\\Sockets.FunctionalTests\\Sockets.FunctionalTests.csproj",
"src\\Servers\\Kestrel\\tools\\CodeGenerator\\CodeGenerator.csproj",
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj"
"src\\ObjectPool\\src\\Microsoft.Extensions.ObjectPool.csproj",
"src\\Testing\\src\\Microsoft.AspNetCore.Testing.csproj"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal class SocketConnectionFactory : IConnectionFactory, IAsyncDisposable
private readonly SocketsTrace _trace;
private readonly PipeOptions _inputOptions;
private readonly PipeOptions _outputOptions;
private readonly SocketSenderPool _socketSenderPool;

public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILoggerFactory loggerFactory)
{
Expand All @@ -46,9 +47,12 @@ public SocketConnectionFactory(IOptions<SocketTransportOptions> options, ILogger
// These are the same, it's either the thread pool or inline
var applicationScheduler = _options.UnsafePreferInlineScheduling ? PipeScheduler.Inline : PipeScheduler.ThreadPool;
var transportScheduler = applicationScheduler;
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

_inputOptions = new PipeOptions(_memoryPool, applicationScheduler, transportScheduler, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false);
_outputOptions = new PipeOptions(_memoryPool, transportScheduler, applicationScheduler, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false);
_socketSenderPool = new SocketSenderPool(awaiterScheduler);
}

public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken = default)
Expand All @@ -72,6 +76,7 @@ public async ValueTask<ConnectionContext> ConnectAsync(EndPoint endpoint, Cancel
_memoryPool,
_inputOptions.ReaderScheduler, // This is either threadpool or inline
_trace,
_socketSenderPool,
_inputOptions,
_outputOptions,
_options.WaitForDataBeforeAllocatingBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
{
private static readonly Action _callbackCompleted = () => { };

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ internal sealed class SocketConnection : TransportConnection
private readonly Socket _socket;
private readonly ISocketsTrace _trace;
private readonly SocketReceiver _receiver;
private readonly SocketSender _sender;
private SocketSender? _sender;
private readonly SocketSenderPool _socketSenderPool;
private readonly IDuplexPipe _originalTransport;
private readonly CancellationTokenSource _connectionClosedTokenSource = new CancellationTokenSource();

Expand All @@ -36,6 +37,7 @@ internal SocketConnection(Socket socket,
MemoryPool<byte> memoryPool,
PipeScheduler transportScheduler,
ISocketsTrace trace,
SocketSenderPool socketSenderPool,
PipeOptions inputOptions,
PipeOptions outputOptions,
bool waitForData = true)
Expand All @@ -48,6 +50,7 @@ internal SocketConnection(Socket socket,
MemoryPool = memoryPool;
_trace = trace;
_waitForData = waitForData;
_socketSenderPool = socketSenderPool;

LocalEndPoint = _socket.LocalEndPoint;
RemoteEndPoint = _socket.RemoteEndPoint;
Expand All @@ -59,8 +62,7 @@ internal SocketConnection(Socket socket,
// https://github.com/aspnet/KestrelHttpServer/issues/2573
var awaiterScheduler = OperatingSystem.IsWindows() ? transportScheduler : PipeScheduler.Inline;

_receiver = new SocketReceiver(_socket, awaiterScheduler);
_sender = new SocketSender(_socket, awaiterScheduler);
_receiver = new SocketReceiver(awaiterScheduler);

var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions);

Expand Down Expand Up @@ -93,7 +95,7 @@ private async Task StartAsync()
await sendTask;

_receiver.Dispose();
_sender.Dispose();
_sender?.Dispose();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -183,13 +185,13 @@ private async Task ProcessReceives()
if (_waitForData)
{
// Wait for data before allocating a buffer.
await _receiver.WaitForDataAsync();
await _receiver.WaitForDataAsync(_socket);
}

// Ensure we have some reasonable amount of buffer space
var buffer = input.GetMemory(MinAllocBufferSize);

var bytesReceived = await _receiver.ReceiveAsync(buffer);
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer);

if (bytesReceived == 0)
{
Expand Down Expand Up @@ -282,7 +284,12 @@ private async Task ProcessSends()
var isCompleted = result.IsCompleted;
if (!buffer.IsEmpty)
{
await _sender.SendAsync(buffer);
_sender = _socketSenderPool.Rent();
await _sender.SendAsync(_socket, buffer);
// We don't return to the pool if there was an exception, and
// we keep the _sender assigned so that we can dispose it in StartAsync.
_socketSenderPool.Return(_sender);
_sender = null;
}

output.AdvanceTo(end);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,34 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketReceiver : SocketSenderReceiverBase
internal sealed class SocketReceiver : SocketAwaitableEventArgs
{
public SocketReceiver(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
public SocketReceiver(PipeScheduler ioScheduler) : base(ioScheduler)
{
}

public SocketAwaitableEventArgs WaitForDataAsync()
public SocketAwaitableEventArgs WaitForDataAsync(Socket socket)
{
_awaitableEventArgs.SetBuffer(Memory<byte>.Empty);
SetBuffer(Memory<byte>.Empty);

if (!_socket.ReceiveAsync(_awaitableEventArgs))
if (!socket.ReceiveAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

public SocketAwaitableEventArgs ReceiveAsync(Memory<byte> buffer)
public SocketAwaitableEventArgs ReceiveAsync(Socket socket, Memory<byte> buffer)
{
_awaitableEventArgs.SetBuffer(buffer);
SetBuffer(buffer);

if (!_socket.ReceiveAsync(_awaitableEventArgs))
if (!socket.ReceiveAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}
}
}
58 changes: 30 additions & 28 deletions src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,61 @@

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal sealed class SocketSender : SocketSenderReceiverBase
internal sealed class SocketSender : SocketAwaitableEventArgs
{
private List<ArraySegment<byte>>? _bufferList;

public SocketSender(Socket socket, PipeScheduler scheduler) : base(socket, scheduler)
public SocketSender(PipeScheduler scheduler) : base(scheduler)
{
}

public SocketAwaitableEventArgs SendAsync(in ReadOnlySequence<byte> buffers)
public SocketAwaitableEventArgs SendAsync(Socket socket, in ReadOnlySequence<byte> buffers)
{
if (buffers.IsSingleSegment)
{
return SendAsync(buffers.First);
return SendAsync(socket, buffers.First);
}

if (!_awaitableEventArgs.MemoryBuffer.Equals(Memory<byte>.Empty))
{
_awaitableEventArgs.SetBuffer(null, 0, 0);
}

_awaitableEventArgs.BufferList = GetBufferList(buffers);
SetBufferList(buffers);

if (!_socket.SendAsync(_awaitableEventArgs))
if (!socket.SendAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

private SocketAwaitableEventArgs SendAsync(ReadOnlyMemory<byte> memory)
public void Reset()
{
// The BufferList getter is much less expensive then the setter.
if (_awaitableEventArgs.BufferList != null)
// We clear the buffer and buffer list before we put it back into the pool
// it's a small performance hit but it removes the confusion when looking at dumps to see this still
// holds onto the buffer when it's back in the pool
if (BufferList != null)
{
BufferList = null;

_bufferList?.Clear();
}
else
{
_awaitableEventArgs.BufferList = null;
SetBuffer(null, 0, 0);
}
}

_awaitableEventArgs.SetBuffer(MemoryMarshal.AsMemory(memory));
private SocketAwaitableEventArgs SendAsync(Socket socket, ReadOnlyMemory<byte> memory)
{
SetBuffer(MemoryMarshal.AsMemory(memory));

if (!_socket.SendAsync(_awaitableEventArgs))
if (!socket.SendAsync(this))
{
_awaitableEventArgs.Complete();
Complete();
}

return _awaitableEventArgs;
return this;
}

private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
private void SetBufferList(in ReadOnlySequence<byte> buffer)
{
Debug.Assert(!buffer.IsEmpty);
Debug.Assert(!buffer.IsSingleSegment);
Expand All @@ -68,18 +74,14 @@ private List<ArraySegment<byte>> GetBufferList(in ReadOnlySequence<byte> buffer)
{
_bufferList = new List<ArraySegment<byte>>();
}
else
{
// Buffers are pooled, so it's OK to root them until the next multi-buffer write.
_bufferList.Clear();
}

foreach (var b in buffer)
{
_bufferList.Add(b.GetArray());
}

return _bufferList;
// The act of setting this list, sets the buffers in the internal buffer list
BufferList = _bufferList;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.IO.Pipelines;
using System.Threading;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
{
internal class SocketSenderPool : IDisposable
{
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?

private readonly ConcurrentQueue<SocketSender> _queue = new();
private int _count;
private readonly PipeScheduler _scheduler;
private bool _disposed;

public SocketSenderPool(PipeScheduler scheduler)
{
_scheduler = scheduler;
}

public SocketSender Rent()
{
if (_queue.TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
return sender;
}
return new SocketSender(_scheduler);
}

public void Return(SocketSender sender)
{
// This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive
if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize)
{
Interlocked.Decrement(ref _count);
sender.Dispose();
return;
}

sender.Reset();
_queue.Enqueue(sender);
}

public void Dispose()
{
if (!_disposed)
{
_disposed = true;
while (_queue.TryDequeue(out var sender))
{
sender.Dispose();
}
}
}
}
}

This file was deleted.

Loading

0 comments on commit 0a7a620

Please sign in to comment.