Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition some pools by core #40476

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ internal class SocketSenderPool : IDisposable
{
private const int MaxQueueSize = 1024; // REVIEW: Is this good enough?

private readonly ConcurrentQueue<SocketSender> _queue = new();
private readonly ConcurrentQueue<SocketSender>[] _queues;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to just disable the SocketSenders pooling for machines with large amount of cpu cores?

private int _count;
private readonly PipeScheduler _scheduler;
private bool _disposed;

public SocketSenderPool(PipeScheduler scheduler)
{
_scheduler = scheduler;

_queues = new ConcurrentQueue<SocketSender>[Environment.ProcessorCount];

for (var i = 0; i < _queues.Length; i++)
{
_queues[i] = new ConcurrentQueue<SocketSender>();
}
}

public SocketSender Rent()
{
if (_queue.TryDequeue(out var sender))
var partition = Thread.GetCurrentProcessorId() % _queues.Length;

if (_queues[partition].TryDequeue(out var sender))
{
Interlocked.Decrement(ref _count);
return sender;
Expand All @@ -40,18 +49,23 @@ public void Return(SocketSender sender)
return;
}

var partition = Thread.GetCurrentProcessorId() % _queues.Length;

sender.Reset();
_queue.Enqueue(sender);
_queues[partition].Enqueue(sender);
}

public void Dispose()
{
if (!_disposed)
{
_disposed = true;
while (_queue.TryDequeue(out var sender))
foreach (var queue in _queues)
{
sender.Dispose();
while (queue.TryDequeue(out var sender))
{
sender.Dispose();
}
}
}
}
Expand Down
30 changes: 23 additions & 7 deletions src/Shared/Buffers.MemoryPool/PinnedBlockMemoryPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
/// Thread-safe collection of blocks which are currently in the pool. A slab will pre-allocate all of the block tracking objects
/// and add them to this collection. When memory is requested it is taken from here first, and when it is returned it is re-added.
/// </summary>
private readonly ConcurrentQueue<MemoryPoolBlock> _blocks = new ConcurrentQueue<MemoryPoolBlock>();
private readonly ConcurrentQueue<MemoryPoolBlock>[] _queues;

/// <summary>
/// This is part of implementing the IDisposable pattern.
Expand All @@ -46,6 +46,15 @@ internal sealed class PinnedBlockMemoryPool : MemoryPool<byte>
/// </summary>
private const int AnySize = -1;

public PinnedBlockMemoryPool()
{
_queues = new ConcurrentQueue<MemoryPoolBlock>[Environment.ProcessorCount];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


for (var i = 0; i < _queues.Length; i++)
{
_queues[i] = new ConcurrentQueue<MemoryPoolBlock>();
}
}
public override IMemoryOwner<byte> Rent(int size = AnySize)
{
if (size > _blockSize)
Expand All @@ -58,7 +67,10 @@ public override IMemoryOwner<byte> Rent(int size = AnySize)
MemoryPoolThrowHelper.ThrowObjectDisposedException(MemoryPoolThrowHelper.ExceptionArgument.MemoryPool);
}

if (_blocks.TryDequeue(out var block))

var partition = Thread.GetCurrentProcessorId() % _queues.Length;

if (_queues[partition].TryDequeue(out var block))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if Thread.GetCurrentProcessorId() changes between Rent and Return operations and the pool gets starved?

should it stop on first failure or try to dequeue from another partition?

https://github.com/dotnet/runtime/blob/07e87bc4cb0358f57e7116e047f7d2017b049cf9/src/libraries/System.Private.CoreLib/src/System/Buffers/TlsOverPerCoreLockedStacksArrayPool.cs#L338

Copy link
Member

@kouvel kouvel May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It likely would be better to assign a pooled item with a partition and return it to the same partition it was taken from. I had seen this issue before and recently again in testing with thread pool changes along with this change, where there is still a fair bit of allocation even with this change. The issue is that currently, some threads rent buffers and in many cases other threads return them, so the threads renting buffers will eventually run out and will have to allocate. Returning the buffers (or pooled items similarly for SocketSenderPool) to the same queue that it came from would maintain some balance between the caches, based on what I've seen it solves the allocation issues. The issue I saw with this change may not exist or be visible currently, but it's looming and I don't think there's much reason to not fix it.

{
// block successfully taken from the stack - return it
return block;
Expand All @@ -84,7 +96,9 @@ internal void Return(MemoryPoolBlock block)

if (!_isDisposed)
{
_blocks.Enqueue(block);
var partition = Thread.GetCurrentProcessorId() % _queues.Length;

_queues[partition].Enqueue(block);
}
}

Expand All @@ -101,11 +115,13 @@ protected override void Dispose(bool disposing)

if (disposing)
{
// Discard blocks in pool
while (_blocks.TryDequeue(out _))
foreach (var queue in _queues)
{

}
while (queue.TryDequeue(out var block))
{
block.Dispose();
}
}
}
}
}
Expand Down