Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ private async Task InjectNewHttp11ConnectionAsync(RequestQueue<HttpConnection>.Q
HttpConnection? connection = null; HttpConnection? connection = null;
Exception? connectionException = null; Exception? connectionException = null;


CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
waiter.ConnectionCancellationTokenSource = cts;
try try
{ {
connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ private async Task InjectNewHttp2ConnectionAsync(RequestQueue<Http2Connection?>.
Exception? connectionException = null; Exception? connectionException = null;
HttpConnectionWaiter<Http2Connection?> waiter = queueItem.Waiter; HttpConnectionWaiter<Http2Connection?> waiter = queueItem.Waiter;


CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
waiter.ConnectionCancellationTokenSource = cts;
try try
{ {
(Stream stream, TransportContext? transportContext, Activity? activity, IPEndPoint? remoteEndPoint) = await ConnectAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false); (Stream stream, TransportContext? transportContext, Activity? activity, IPEndPoint? remoteEndPoint) = await ConnectAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ internal sealed partial class HttpConnectionPool


// Loop in case we get a 421 and need to send the request to a different authority. // Loop in case we get a 421 and need to send the request to a different authority.
while (true) while (true)
{
HttpConnectionWaiter<Http3Connection?>? http3ConnectionWaiter = null;
try
{ {
if (!TryGetHttp3Authority(request, out HttpAuthority? authority, out Exception? reasonException)) if (!TryGetHttp3Authority(request, out HttpAuthority? authority, out Exception? reasonException))
{ {
Expand All @@ -87,7 +90,7 @@ internal sealed partial class HttpConnectionPool
long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0; long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0;
Activity? waitForConnectionActivity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(authority); Activity? waitForConnectionActivity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(authority);


if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out HttpConnectionWaiter<Http3Connection?>? http3ConnectionWaiter)) if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out http3ConnectionWaiter))
{ {
try try
{ {
Expand Down Expand Up @@ -122,6 +125,11 @@ internal sealed partial class HttpConnectionPool


return response; return response;
} }
finally
{
http3ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
}
}
} }


[SupportedOSPlatform("windows")] [SupportedOSPlatform("windows")]
Expand Down Expand Up @@ -253,8 +261,7 @@ private async Task InjectNewHttp3ConnectionAsync(RequestQueue<Http3Connection?>.
HttpAuthority? authority = null; HttpAuthority? authority = null;
HttpConnectionWaiter<Http3Connection?> waiter = queueItem.Waiter; HttpConnectionWaiter<Http3Connection?> waiter = queueItem.Waiter;


CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(); CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
waiter.ConnectionCancellationTokenSource = cts;
Activity? connectionSetupActivity = null; Activity? connectionSetupActivity = null;
try try
{ {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ public async ValueTask<HttpResponseMessage> SendWithVersionDetectionAndRetryAsyn
// We never cancel both attempts at the same time. When downgrade happens, it's possible that both waiters are non-null, // We never cancel both attempts at the same time. When downgrade happens, it's possible that both waiters are non-null,
// but in that case http2ConnectionWaiter.ConnectionCancellationTokenSource shall be null. // but in that case http2ConnectionWaiter.ConnectionCancellationTokenSource shall be null.
Debug.Assert(http11ConnectionWaiter is null || http2ConnectionWaiter?.ConnectionCancellationTokenSource is null); Debug.Assert(http11ConnectionWaiter is null || http2ConnectionWaiter?.ConnectionCancellationTokenSource is null);
http11ConnectionWaiter?.CancelIfNecessary(this, cancellationToken.IsCancellationRequested); http11ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
http2ConnectionWaiter?.CancelIfNecessary(this, cancellationToken.IsCancellationRequested); http2ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
} }
} }
} }
Expand Down Expand Up @@ -827,7 +827,31 @@ private async ValueTask<Stream> EstablishSocksTunnel(HttpRequestMessage request,
return stream; return stream;
} }


private CancellationTokenSource GetConnectTimeoutCancellationTokenSource() => new CancellationTokenSource(Settings._connectTimeout); private CancellationTokenSource GetConnectTimeoutCancellationTokenSource<T>(HttpConnectionWaiter<T> waiter)
where T : HttpConnectionBase?
{
var cts = new CancellationTokenSource(Settings._connectTimeout);

lock (waiter)
{
// After a request completes (or is canceled), it will call into SetTimeoutToPendingConnectionAttempt,
// which will no-op if ConnectionCancellationTokenSource is not set, assuming that the connection attempt is done.
// As the initiating request for this connection attempt may complete concurrently at any time,
// there is a race condition where the first call to SetTimeoutToPendingConnectionAttempt may happen
// before we were able to set the CTS, so no timeout will be applied even though the request is already done.
waiter.ConnectionCancellationTokenSource = cts;

// To fix that, we check whether the waiter already completed now that we're holding a lock.
// If it had, call SetTimeoutToPendingConnectionAttempt again now that the CTS is set.
if (waiter.Task.IsCompleted)
{
waiter.SetTimeoutToPendingConnectionAttempt(this, requestCancelled: waiter.Task.IsCanceled);
waiter.ConnectionCancellationTokenSource = null;
}
}

return cts;
}


private static Exception CreateConnectTimeoutException(OperationCanceledException oce) private static Exception CreateConnectTimeoutException(OperationCanceledException oce)
{ {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public bool TrySignal(T connection)
} }
} }


public void CancelIfNecessary(HttpConnectionPool pool, bool requestCancelled) public void SetTimeoutToPendingConnectionAttempt(HttpConnectionPool pool, bool requestCancelled)
{ {
int timeout = GlobalHttpSettings.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion; int timeout = GlobalHttpSettings.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion;
if (ConnectionCancellationTokenSource is null || if (ConnectionCancellationTokenSource is null ||
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -393,6 +393,68 @@ await RemoteExecutor.Invoke(static async (versionString, timoutStr) =>
}, UseVersion.ToString(), timeout.ToString()).DisposeAsync(); }, UseVersion.ToString(), timeout.ToString()).DisposeAsync();
} }


[OuterLoop("We wait for PendingConnectionTimeout which defaults to 5 seconds.")]
[Fact]
public async Task PendingConnectionTimeout_SignalsAllConnectionAttempts()
{
if (UseVersion == HttpVersion.Version30)
{
// HTTP3 does not support ConnectCallback
return;
}

int pendingConnectionAttempts = 0;
bool connectionAttemptTimedOut = false;

using var handler = new SocketsHttpHandler
{
ConnectCallback = async (context, cancellation) =>
{
Interlocked.Increment(ref pendingConnectionAttempts);
try
{
await Assert.ThrowsAsync<TaskCanceledException>(() => Task.Delay(-1, cancellation)).WaitAsync(TestHelper.PassingTestTimeout);
cancellation.ThrowIfCancellationRequested();
throw new UnreachableException();
}
catch (TimeoutException)
{
connectionAttemptTimedOut = true;
throw;
}
finally
{
Interlocked.Decrement(ref pendingConnectionAttempts);
}
}
};

using HttpClient client = CreateHttpClient(handler);
client.Timeout = TimeSpan.FromSeconds(2);

// Many of these requests should trigger new connection attempts, and all of those should eventually be cleaned up.
await Parallel.ForAsync(0, 100, async (_, _) =>
{
await Assert.ThrowsAnyAsync<TaskCanceledException>(() => client.GetAsync("https://dummy"));
});

Stopwatch stopwatch = Stopwatch.StartNew();

while (Volatile.Read(ref pendingConnectionAttempts) > 0)
{
Assert.False(connectionAttemptTimedOut);

if (stopwatch.Elapsed > 2 * TestHelper.PassingTestTimeout)
{
Assert.Fail("Connection attempts took too long to get cleaned up");
}

await Task.Delay(100);
}

Assert.False(connectionAttemptTimedOut);
}

private sealed class SetTcsContent : StreamContent private sealed class SetTcsContent : StreamContent
{ {
private readonly TaskCompletionSource<bool> _tcs; private readonly TaskCompletionSource<bool> _tcs;
Expand Down
Loading