-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HttpConnection roots user-provided write buffer in AwaitableSocketAsyncEventArgs #67096
Comments
Tagging subscribers to this area: @dotnet/ncl Issue DetailsDescriptionMy scenario:
Code to reproduce: Simple consuming REST endpoint (this can be any endpoint that accepts the payload) (Use import { Application } from "https://deno.land/x/oak/mod.ts";
const app = new Application();
app.use(async (ctx) => {
// Accept and add some latency to simulate a remote network
if (ctx.request.method === 'POST' && ctx.request.url.pathname === '/api/bytes') {
const result = ctx.request.body({ type: 'bytes' });
await new Promise(res => setTimeout(res, 50));
const val = await result.value;
ctx.response.body = `${val.length} bytes received`;
} else {
ctx.response.body = "Hello World!";
}
});
await app.listen({ port: 1777 }); Uploading code: using System.Net;
using System.Net.Http.Headers;
HttpClient client = new HttpClient {
BaseAddress = new Uri("http://localhost:1777"),
DefaultRequestVersion = HttpVersion.Version20,
};
// Used to throttle the upload rate (not overload the destination server & avoid port exhaustion)
SemaphoreSlim uploadLock = new(25);
ServicePointManager.DefaultConnectionLimit = 25;
async Task DoUpload(MemoryStream item)
{
await uploadLock.WaitAsync().ConfigureAwait(false);
var buffer = item.GetBuffer();
using var content = new ByteArrayContent(buffer, 0, (int)item.Length);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
content.Headers.ContentLength = item.Length;
using var request = new HttpRequestMessage(HttpMethod.Post, "/api/bytes")
{
Content = content,
Version = new Version(2, 0)
};
// ResponseHeadersRead because we don't care about the response body - literally just the status code
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
Console.WriteLine("Success!");
}
else
{
Console.WriteLine("Something went wrong!");
}
response.Dispose();
await item.DisposeAsync().ConfigureAwait(false);
uploadLock.Release();
}
async Task DoDummyUpload(int sizeKb) {
var sizeBytes = 1024 * sizeKb;
var item = new MemoryStream();
await item.WriteAsync(new byte[sizeBytes], 0, sizeBytes).ConfigureAwait(false);
await DoUpload(item);
}
async Task<int> Main(string[] args) {
var producer = new DummyProducer();
producer.Received += async (_, sizeKb) => {
// Upload from within a delegate - source of the leak
// If this were run outside of a delegate it would not leak
await DoDummyUpload(sizeKb);
};
// Simple prompt for testing - enter "upload 1000 64" to see a spike in memory that won't go down
Console.WriteLine("Waiting for input...");
while (true)
{
var line = await Console.In.ReadLineAsync().ConfigureAwait(false);
var cmd = line.Split(' ')[0];
if (cmd == "exit") break;
if (cmd == "upload") {
(int numRequests, int payloadSizeKb) = (int.Parse(line.Split(' ')[1]), int.Parse(line.Split(' ')[2]));
Console.WriteLine("Uploading...");
foreach (var i in Enumerable.Range(0, numRequests))
{
producer.emit(payloadSizeKb);
}
}
}
return 0;
}
await Main(args).ConfigureAwait(false);
class DummyProducer
{
public event EventHandler<int> Received = delegate { };
public void emit(int sizeKb) {
Received.Invoke(this, sizeKb);
}
} Configuration
Regression?Unsure if regression. Other information
|
How are you determining that a leak is happening? Can you share what objects are kept in memory when a leak does occur?
To my knowledge, there is no usage of |
With a brief search in https://source.dot.net/#System.Collections.Concurrent/System/Collections/Concurrent/ConcurrentStack.cs,de91cdef3f389289,references , Your memory snapshot indicates things more than the minimal repro ( |
I tried it locally and on my machine, it tops at around 200MB. However, I notice that there is a small problem when e.g. you don't start the server and the call to var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false); throws an exception, then no new tasks are allowed to run (no calls to
|
I believe the issue you are seeing is that the write buffer is not cleared after we write to the socket. The That means the huge In your example, if you follow the In general, you should try to avoid allocating such buffers, and instead streaming the data via
HttpClient client = new HttpClient(new HttpClientHandler
{
MaxConnectionsPerServer = 25
}); |
The example is allocating all the buffers at the same time at the beginning of the upload, holding them in the memory, until it can process them 25 at a time. |
A simpler repro: static async Task<WeakReference<byte[]>> SendRequestAsync(HttpClient client)
{
var bytes = new byte[1024 * 1024];
using HttpResponseMessage response = await client.PostAsync("http://localhost:5159", new ByteArrayContent(bytes));
await response.Content.CopyToAsync(Stream.Null);
return new WeakReference<byte[]>(bytes);
}
using var client = new HttpClient();
WeakReference<byte[]> bytesReference = await SendRequestAsync(client);
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
Console.WriteLine("Alive: " + bytesReference.TryGetTarget(out _)); |
Thank you for looking into this everyone! To address some of the recent responses...
|
The Anyway, it seems that MihaZupan identified a possible root cause that would need to be fixed on our side |
Whatever you set to
Are you able to share the modified code? The way the content is copied is different with
To be clear, I do not recommend you manually force GCs, ever. My example was meant only to demonstrate the existence of an underlying issue. If what I described above is the issue you are hitting here, there is a way to mitigate it. using var client = new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(10)
}); |
You can get port exhaustion if you are not reusing |
Hi All, sorry for the slow response, hard to rip myself away from meetings & I wanted to test this out before I bring it back to you. I've tried the same reproduction code but with (Side note - I'm happy to test out StreamContent, but unfortunately it looks like I can't use it with our application - we're limited by what our 3rd party vendor supports for uploads - ByteArrayContent) Maybe I'm missing something from staring at this for too long - but here it is: Updated reproduction code (ran with: upload 1000 500): using System.Net;
using System.Net.Http.Headers;
HttpClient client = new HttpClient {
BaseAddress = new Uri("http://localhost:1777"),
DefaultRequestVersion = HttpVersion.Version20,
};
// Used to throttle the upload rate (not overload the destination server & avoid port exhaustion)
SemaphoreSlim uploadLock = new(25);
ServicePointManager.DefaultConnectionLimit = 25;
async Task DoUpload(MemoryStream item)
{
await uploadLock.WaitAsync().ConfigureAwait(false);
//var buffer = item.GetBuffer();
//using var content = new ByteArrayContent(buffer, 0, (int)item.Length);
using var content = new StreamContent(item); // New!
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
content.Headers.ContentLength = item.Length;
using var request = new HttpRequestMessage(HttpMethod.Post, "/api/bytes")
{
Content = content,
Version = new Version(2, 0)
};
// ResponseHeadersRead because we don't care about the response body - literally just the status code
var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
if (response.IsSuccessStatusCode)
{
Console.WriteLine("Success!");
}
else
{
Console.WriteLine("Something went wrong!");
}
response.Dispose();
await item.DisposeAsync().ConfigureAwait(false);
uploadLock.Release();
}
async Task DoDummyUpload(int sizeKb) {
var sizeBytes = 1024 * sizeKb;
var item = new MemoryStream();
await item.WriteAsync(new byte[sizeBytes], 0, sizeBytes).ConfigureAwait(false);
await DoUpload(item);
}
async Task<int> Main(string[] args) {
// Simple prompt for testing - enter "upload 1000 64" to see a spike in memory that won't go down
Console.WriteLine("Waiting for input...");
while (true)
{
var line = await Console.In.ReadLineAsync().ConfigureAwait(false);
var cmd = line.Split(' ')[0];
if (cmd == "exit") break;
if (cmd == "upload") {
(int numRequests, int payloadSizeKb) = (int.Parse(line.Split(' ')[1]), int.Parse(line.Split(' ')[2]));
Console.WriteLine("Uploading...");
foreach (var i in Enumerable.Range(0, numRequests))
{
// Fire and forget Task so that we can do multiple concurrent uploads as the application runs
Task.Run(async () => await DoDummyUpload(payloadSizeKb));
}
}
}
return 0;
}
await Main(args).ConfigureAwait(false); Updated "server" code to handle the stream: import { Application } from "https://deno.land/x/oak/mod.ts";
const app = new Application();
app.use(async (ctx) => {
// Accept and add some latency to simulate a remote network
if (ctx.request.method === 'POST' && ctx.request.url.pathname === '/api/bytes') {
//const result = ctx.request.body({ type: 'bytes' });
const result = ctx.request.body({ type: 'stream' });
await new Promise(res => setTimeout(res, 50));
const val = await result.value;
ctx.response.body = `${val.length} bytes received`;
} else {
ctx.response.body = "Hello World!";
}
});
await app.listen({ port: 1777 }); |
@MihaZupan I've tried your recommendation with: HttpClient client = new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(10)
}) {
BaseAddress = new Uri("http://localhost:1777"),
DefaultRequestVersion = HttpVersion.Version20,
}; Unfortunately, it still retains the memory (I tested this by adding |
Please apply the change Radek recommended above and move the await uploadLock.WaitAsync().ConfigureAwait(false);
try
{
// Do the upload
}
finally
{
uploadLock.Release();
}
You are limited in what the server will accept, or does the 3rd party control the APIs around |
To answer your question @MihaZupan - I'm limited to what the server will accept - their endpoint won't accept stream content (it returns an error code right away when I try to do so). I made the recommended change and re-ran it for much longer - I thought the "upload" was completed (I'm testing this against my dummy local endpoint still). With
With
My test code (I just comment out / uncomment the using System.Net;
using System.Net.Http.Headers;
HttpClient client = new HttpClient(new SocketsHttpHandler
{
PooledConnectionIdleTimeout = TimeSpan.FromSeconds(10)
}) {
BaseAddress = new Uri("http://localhost:1777"),
DefaultRequestVersion = HttpVersion.Version20,
};
// Used to throttle the upload rate (not overload the destination server & avoid port exhaustion)
SemaphoreSlim uploadLock = new(25);
async Task DoUpload(MemoryStream item)
{
await uploadLock.WaitAsync().ConfigureAwait(false);
try
{
using var content = new ByteArrayContent(item.GetBuffer(), 0, (int)item.Length);
//using var content = new StreamContent(item);
content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
content.Headers.ContentLength = item.Length;
using var request = new HttpRequestMessage(HttpMethod.Post, "/api/bytes")
{
Content = content,
Version = new Version(2, 0)
};
// ResponseHeadersRead because we don't care about the response body - literally just the status code
using var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
Console.WriteLine(response.IsSuccessStatusCode ? "Success!" : "Something went wrong!");
} finally
{
await item.DisposeAsync().ConfigureAwait(false);
uploadLock.Release();
}
}
async Task DoDummyUpload(int sizeKb) {
var sizeBytes = 1024 * sizeKb;
var item = new MemoryStream();
await item.WriteAsync(new byte[sizeBytes], 0, sizeBytes).ConfigureAwait(false);
await DoUpload(item);
}
async Task<int> Main(string[] args) {
// Simple prompt for testing - enter "upload 1000 64" to see a spike in memory that won't go down
Console.WriteLine("Waiting for input...");
while (true)
{
var line = await Console.In.ReadLineAsync().ConfigureAwait(false);
var cmd = line.Split(' ')[0];
if (cmd == "exit") break;
if (cmd == "upload") {
(int numRequests, int payloadSizeKb) = (int.Parse(line.Split(' ')[1]), int.Parse(line.Split(' ')[2]));
Console.WriteLine("Uploading...");
foreach (var i in Enumerable.Range(0, numRequests))
{
// Fire and forget Task so that we can do multiple concurrent uploads as the application runs
Task.Run(async () => await DoDummyUpload(payloadSizeKb));
}
}
}
return 0;
}
await Main(args).ConfigureAwait(false); Please let me know if there's anything else you'd like me to try. Trying to resolve this, or find a viable workaround is my priority now. |
The GC generally won't do work unless it has to. If the system has a ton of memory to spare and the process is sitting idle, it has no need to do anything. Sitting at 400 MB isn't a memory leak in itself. To get more interesting numbers about how much memory is actually being kept alive, try adding this at the start of your test: _ = Task.Run(async () =>
{
while (true)
{
Console.Title = $"{GC.GetTotalMemory(forceFullCollection: true) / 1024f / 1024f:N1} MB";
await Task.Delay(1000);
}
}); Looking into it a bit more, The aim is to never allocate the
Your async Task DoDummyUpload(int sizeKb) {
var sizeBytes = 1024 * sizeKb;
var item = new MemoryStream(new byte[sizeBytes]);
await DoUpload(item);
} If you have to buffer everything for some reason, try using a CustomStreamContent.cs instead. |
If you have a method like protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context, CancellationToken cancellationToken)
{
await _yourSourceObject.SaveAsync(stream, cancellationToken);
}
If what you were trying out is different than the above, feel free to share the code. This approach being more expensive than pre-buffering everything into a
I don't know what your approach was with "just an await from main", but my guess is that you were running all uploads serially, so you were only allocating one buffer at a time, and only sending one request at a time. |
Triage:
|
I'm not sure why this issue was closed. A few additional notes I'd like to point out:
My concern here is:
Are there any good alternatives here? is it possible to null the buffer manually from my code? @MihaZupan I'll try to get back to you about saving to the network stream directly afterwards if you're willing to take a quick look. I'd like any effort right now to go into the above... |
So if you force a full GC, effectively all of the memory is collected.
I think the example here is a bit contrived when looking at memory usage. Buffering 15 GB worth of content in the process before sending it is not a good use of resources and is something you should avoid in production code.
I think this is the important discussion here. What does the memory usage look like in your real app if you avoid allocating the buffers and instead stream the content directly? Does it retain unreasonable amounts of memory without forcing a full GC?
Please take a look at #50902 which is discussing how you may tell the GC to prefer low memory usage over collection times. My guess is that the memory is being retained because you are using huge buffers that immediately go to the LOH (large object heap), and won't be collected unless there is a good reason to do so (e.g. running out of memory). That is, your scenario is aggressively allocating short-lived buffers that exceed the LOH threshold. This is a performance anti-pattern and it won't be fast. If you for some reason can't avoid allocating such buffers (you most likely can), then you can look at tweaking GC settings like the LOH threshold or changing the conserve-memory flag. You can also file a new issue to discuss the performance of such scenarios and get input from people with more expertise in the area (label If your application is not otherwise under high CPU load and all you want is to minimize memory usage, you could also consider inducing a full GC yourself. This is not something we would generally recommend, but may be effective in your scenario. Task.Run(async () =>
{
while (true)
{
if (current process memory > some reasonable threshold for your scenario)
{
GC.Collect();
}
await Task.Delay(TimeSpan.FromMinutes(1));
}
}); |
Description
My scenario:
Task.Run
in the delegate, but this causes a leak as wellCode to reproduce:
Simple consuming REST endpoint (this can be any endpoint that accepts the payload) (Use
deno run --allow-net consume-server.js
to run)Uploading code:
Configuration
Regression?
Unsure if regression.
Other information
The text was updated successfully, but these errors were encountered: