Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
client.conn_connect_ms_->complete();
client.conn_connect_ms_.reset();
ASSERT(client.state() == ActiveClient::State::CONNECTING);
transitionActiveClientState(client, ActiveClient::State::READY);
bool streams_available = client.currentUnusedCapacity() > 0;
transitionActiveClientState(client, streams_available ? ActiveClient::State::READY
: ActiveClient::State::BUSY);

// Now that the active client is ready, set up a timer for max connection duration.
const absl::optional<std::chrono::milliseconds> max_connection_duration =
Expand All @@ -506,7 +508,9 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
// At this point, for the mixed ALPN pool, the client may be deleted. Do not
// refer to client after this point.
onConnected(client);
onUpstreamReady();
if (streams_available) {
onUpstreamReady();
}
checkForIdleAndCloseIdleConnsIfDraining();
}
}
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/http3/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ void ActiveClient::onMaxStreamsChanged(uint32_t num_streams) {
parent_.transitionActiveClientState(*this, ActiveClient::State::READY);
// If there's waiting streams, make sure the pool will now serve them.
parent_.onUpstreamReady();
} else if (currentUnusedCapacity() == 0 && state() == ActiveClient::State::READY) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With HTTP/3, MAX_STREAMS can only increase. I'm not sure this is possible. Is this code reached in an integration tests?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, looking at how this is called, is the num_streams argument the new MAX_STREAMS value (which must always increase) or is it the current stream capacity? I think maybe the latter. Mind adding a comment this this method? (You'd think I could remember from the previous PR, but apparently not)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this for today, but didn't you say this can happen for a rejected 0-rtt handshake?
If we start out assuming some number of streams and create some number, then the 0-rtt handshake is rejected and we fail over to real handshake with fewer streams, I assume we can lower the functional capacity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hah! Yes, I did say that... and subsequently forgot it. You're totally right. Would it be worth adding a comment that says something like:

// With HTTP/3 this can only happen during a rejected 0-RTT handshake.

// With HTTP/3 this can only happen during a rejected 0-RTT handshake.
parent_.transitionActiveClientState(*this, ActiveClient::State::BUSY);
}
}

Expand Down
2 changes: 2 additions & 0 deletions source/common/quic/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class QuicHttpServerConnectionImpl : public QuicHttpConnectionImplBase,
void onUnderlyingConnectionAboveWriteBufferHighWatermark() override;
void onUnderlyingConnectionBelowWriteBufferLowWatermark() override;

EnvoyQuicServerSession& quicServerSession() { return quic_server_session_; }

private:
EnvoyQuicServerSession& quic_server_session_;
};
Expand Down
15 changes: 6 additions & 9 deletions source/common/quic/envoy_quic_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ void EnvoyQuicClientSession::OnCanCreateNewOutgoingStream(bool unidirectional) {
return;
}
uint32_t streams_available = streamsAvailable();
if (streams_available > 0) {
http_connection_callbacks_->onMaxStreamsChanged(streams_available);
}
http_connection_callbacks_->onMaxStreamsChanged(streams_available);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have thought about this in the previous PR, but I think we might want to tweak the name/documentation for this method. (I'd be happy to do this in a followup if that works for you).

The argument to this method is the number of currently available streams. But the "max streams" value in HTTP/3 is the largest stream id that can be opened (well, divided by four, but whatever :>). In other words, the terminology here is slightly misaligned. I think I would be inclined to renamed this onStreamsAvailableChanged() or some such. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the suggested name change. Also, be aware that there are two existing limits on pools: max concurrent requests, and max requests over the lifetime of the connection. For any of these cases, probably best to use a verbose name so it's clear which limit is being referenced.

Copy link
Copy Markdown
Contributor Author

@alyssawilk alyssawilk Oct 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary of this name. think if we change it to onStreamsAvailableChanged it weakens the clarity where the current APIs are named after the protocol frames. We have
onMaxStreamsChanged (MAX_STREAMs frame for HTTP/3) and
onSettings (SETTINGS affecting number of streams, among other things, for HTTP/2)
both change the streams available, but in very different ways.

Edit: would be happy to make the protocol more clear. onHttp3MaxStreamsChanged?

}

std::unique_ptr<quic::QuicSpdyClientStream> EnvoyQuicClientSession::CreateClientStream() {
Expand Down Expand Up @@ -144,12 +142,11 @@ uint64_t EnvoyQuicClientSession::streamsAvailable() {
void EnvoyQuicClientSession::OnTlsHandshakeComplete() {
quic::QuicSpdyClientSession::OnTlsHandshakeComplete();

// TODO(alyssawilk) support the case where a connection starts with 0 max streams.
ASSERT(streamsAvailable());
if (streamsAvailable() > 0) {
OnCanCreateNewOutgoingStream(false);
raiseConnectionEvent(Network::ConnectionEvent::Connected);
}
// Fake this to make sure we set the connection pool stream limit correctly
// before use. This may result in OnCanCreateNewOutgoingStream with zero
// available streams.
OnCanCreateNewOutgoingStream(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm reading EnvoyQuicClientSession::OnCanCreateNewOutgoingStream() correctly, if streamsAvailable() is 0 it will be a no-op. Are we sure we need to call it here when there are no streams available?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a no-op?
We'll call onMaxStreamsChanged(0) which will update that initial 100 streams to 0, no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, I hate the github UI. I missed that this method is changed in this PR specifically to make it not a no-op.

Lordy. Major reviewer fail on my part!

raiseConnectionEvent(Network::ConnectionEvent::Connected);
}

std::unique_ptr<quic::QuicCryptoClientStreamBase> EnvoyQuicClientSession::CreateQuicCryptoStream() {
Expand Down
27 changes: 27 additions & 0 deletions test/common/conn_pool/conn_pool_base_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ class TestActiveClient : public ActiveClient {
ASSERT_TRUE(testClient != nullptr);
testClient->active_streams_++;
}
int64_t currentUnusedCapacity() const override {
if (capacity_override_.has_value()) {
return capacity_override_.value();
}
return ActiveClient::currentUnusedCapacity();
}

uint32_t active_streams_{};
absl::optional<uint64_t> capacity_override_;
};

class TestPendingStream : public PendingStream {
Expand Down Expand Up @@ -417,6 +424,26 @@ TEST_F(ConnPoolImplDispatcherBaseTest, MaxConnectionDurationCallbackWhileConnect
pool_.destructAllConnections();
}

// Test the behavior of a client created with 0 zero streams available.
TEST_F(ConnPoolImplDispatcherBaseTest, NoAvailableStreams) {
// Start with a concurrent stream limit of 0.
stream_limit_ = 1;
newConnectingClient();
clients_.back()->capacity_override_ = 0;
pool_.decrClusterStreamCapacity(stream_limit_);

// Make sure that when the connected event is raised, there is no call to
// onPoolReady, and the client is marked as busy.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a connection is setup with an initial capacity of zero, do we assume it will become non-zero soon?

If so, is there any timer/timeout currently watching for this to take too long? I assume the connect_timeout is done already, because we're connected.

If not, should the pool create another connection? If we don't, won't the request be stuck here with no way to make forward progress?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wary of creating new connections in this case - I'd assuming new connections would also start with capacity 0.
in general if we hit connection and stream limits, we queue without a connection timeout (#18748) so I think this is comparable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When in this state, is the downstream-request-timeout is still in place, correct? So if nothing happens, eventually we'll give up (assuming there is a configured timeout)?

Another thought on this: should we not count the connection as established, from the pool's perspective, until it has non-zero capacity? Maybe the existing connect_timeout should cover this condition, because it's logically not fully established yet.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we change this to time out, we'd want to also change the way we time out HTTP/1 and HTTP/2 streams when we hit the connection + stream limits, and all would be outside the scope of this pr.
@mattklein123 for thoughts and additional visibility.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we change this to time out, we'd want to also change the way we time out HTTP/1 and HTTP/2 streams when we hit the connection + stream limits, and all would be outside the scope of this pr.
@mattklein123 for thoughts and additional visibility.

I think I'm missing a lot of context here, but AIUI for H3 we can have a connected connection which cannot accept any streams? Is that correct? To me this seems fundamentally different from H1 and H2 in which a new connection should be able to accept at least 1 stream? Without thinking about it too much I tend to agree with @ggreenway that a connection that doesn't ever have at least 1 available stream isn't really connected and should timeout? I think this is different from H1 or H2? Either way I agree this can be done as a separate PR whatever we decide.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we handle the above corner case correctly for H2? If it's possible there I agree it's the same. Either way I suggest we file an issue to track this and then follow up later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking, I think we'll back hole the traffic, like we would for HTTP/3 before this series of fixes
filed #18880

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is different for h2, because in h2 it is allowed to start sending frames (new stream, etc) before receiving a SETTINGS frame from the peer, which is what could set the max-streams to zero.

Also, if we get a SETTINGS frame with a stream limit of zero, we move the connection to draining and never use it again.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this function doesn't make sense to me. It seems like we should use min(peer value, envoy limit), not to only adjust downward.

// Adjust the concurrent stream limit if the negotiated concurrent stream limit
// is lower than the local max configured streams.
//
// Note: if multiple streams are assigned to a connection before the settings
// are received, they may still be reset by the peer. This could be avoided by
// not considering http/2 connections connected until the SETTINGS frame is
// received, but that would result in a latency penalty instead.
void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) {
  if (settings.maxConcurrentStreams().has_value() &&
      settings.maxConcurrentStreams().value() < concurrent_stream_limit_) {
    int64_t old_unused_capacity = currentUnusedCapacity();
    // Given config limits old_unused_capacity should never exceed int32_t.
    // TODO(alyssawilk) move remaining_streams_, concurrent_stream_limit_ and
    // currentUnusedCapacity() to be explicit int32_t
    ASSERT(std::numeric_limits<int32_t>::max() >= old_unused_capacity);
    concurrent_stream_limit_ = settings.maxConcurrentStreams().value();
    int64_t delta = old_unused_capacity - currentUnusedCapacity();
    parent_.decrClusterStreamCapacity(delta);
    ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta);
    negative_capacity_ += delta;
  }
  // As we don't increase stream limits when maxConcurrentStreams goes up, treat
  // a stream limit of 0 as a GOAWAY.
  if (concurrent_stream_limit_ == 0) {
    parent_.transitionActiveClientState(*this, ActiveClient::State::DRAINING);
  }
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're correct, that's why I filed a tracking bug for it :-)

EXPECT_CALL(pool_, onPoolReady).Times(0);
clients_.back()->onEvent(Network::ConnectionEvent::Connected);
EXPECT_EQ(ActiveClient::State::BUSY, clients_.back()->state());

// Clean up.
EXPECT_CALL(pool_, instantiateActiveClient);
EXPECT_CALL(pool_, onPoolFailure);
pool_.destructAllConnections();
}

// Remote close simulates the peer closing the connection.
TEST_F(ConnPoolImplBaseTest, PoolIdleCallbackTriggeredRemoteClose) {
EXPECT_CALL(dispatcher_, createTimer_(_)).Times(AnyNumber());
Expand Down
1 change: 1 addition & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@ envoy_cc_test_library(
] + envoy_select_enable_http3([
"//source/common/quic:active_quic_listener_lib",
"//source/common/quic:quic_factory_lib",
"@com_github_google_quiche//:quic_test_tools_session_peer_lib",
]),
)

Expand Down
16 changes: 16 additions & 0 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#ifdef ENVOY_ENABLE_QUIC
#include "source/common/quic/codec_impl.h"
#include "quiche/quic/test_tools/quic_session_peer.h"
#endif

#include "source/server/connection_handler_impl.h"
Expand Down Expand Up @@ -396,6 +397,21 @@ void FakeHttpConnection::encodeGoAway() {
postToConnectionThread([this]() { codec_->goAway(); });
}

void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) {
ASSERT(type_ >= Http::CodecType::HTTP3);

#ifdef ENVOY_ENABLE_QUIC
postToConnectionThread([this, max_streams]() {
auto codec = dynamic_cast<Quic::QuicHttpServerConnectionImpl*>(codec_.get());
quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams(&codec->quicServerSession(),
max_streams);
codec->quicServerSession().SendMaxStreams(1, false);
});
#else
UNREFERENCED_PARAMETER(max_streams);
#endif
}

void FakeHttpConnection::encodeProtocolError() {
ASSERT(type_ >= Http::CodecType::HTTP2);

Expand Down
4 changes: 4 additions & 0 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo
// Should only be called for HTTP2 or above, sends a GOAWAY frame with ENHANCE_YOUR_CALM.
void encodeProtocolError();

// Update the maximum number of concurrent streams. This is currently only
// supported for HTTP/3
void updateConcurrentStreams(uint64_t max_streams);

private:
struct ReadFilter : public Network::ReadFilterBaseImpl {
ReadFilter(FakeHttpConnection& parent) : parent_(parent) {}
Expand Down
56 changes: 56 additions & 0 deletions test/integration/quic_http_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,62 @@ TEST_P(QuicHttpIntegrationTest, Http3DownstreamKeepalive) {
ASSERT_TRUE(response->complete());
}

TEST_P(QuicHttpIntegrationTest, NoInitialStreams) {
// Set the fake upstream to start with 0 streams available.
setUpstreamProtocol(Http::CodecType::HTTP3);
envoy::config::listener::v3::QuicProtocolOptions options;
options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(0);
mergeOptions(options);
initialize();

// Create the client connection and send a request.
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt);
IntegrationStreamDecoderPtr response =
codec_client_->makeHeaderOnlyRequest(default_request_headers_);

// There should now be an upstream connection, but no upstream stream.
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_FALSE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_,
std::chrono::milliseconds(100)));

// Update the upstream to have 1 stream available. Now Envoy should ship the
// original request upstream.
fake_upstream_connection_->updateConcurrentStreams(1);
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));

// Make sure the standard request/response pipeline works as expected.
upstream_request_->encodeHeaders(default_response_headers_, true);
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
}

TEST_P(QuicHttpIntegrationTest, NoStreams) {
// Tighten the stream idle timeout, as it defaults to 5m
config_helper_.addConfigModifier(
[&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
hcm) -> void {
hcm.mutable_stream_idle_timeout()->set_seconds(0);
hcm.mutable_stream_idle_timeout()->set_nanos(400 * 1000 * 1000);
});

// Set the fake upstream to start with 0 streams available.
setUpstreamProtocol(Http::CodecType::HTTP3);
envoy::config::listener::v3::QuicProtocolOptions options;
options.mutable_quic_protocol_options()->mutable_max_concurrent_streams()->set_value(0);
mergeOptions(options);
initialize();

// Create the client connection and send a request.
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), absl::nullopt);
IntegrationStreamDecoderPtr response =
codec_client_->makeHeaderOnlyRequest(default_request_headers_);

// Make sure the time out closes the stream.
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
}

class QuicInplaceLdsIntegrationTest : public QuicHttpIntegrationTest {
public:
void inplaceInitialize(bool add_default_filter_chain = false) {
Expand Down