diff --git a/test/integration/autonomous_upstream.cc b/test/integration/autonomous_upstream.cc index e2185bbbf2a6..16d49cb50e60 100644 --- a/test/integration/autonomous_upstream.cc +++ b/test/integration/autonomous_upstream.cc @@ -21,6 +21,7 @@ const char AutonomousStream::RESPONSE_SIZE_BYTES[] = "response_size_bytes"; const char AutonomousStream::RESPONSE_DATA_BLOCKS[] = "response_data_blocks"; const char AutonomousStream::EXPECT_REQUEST_SIZE_BYTES[] = "expect_request_size_bytes"; const char AutonomousStream::RESET_AFTER_REQUEST[] = "reset_after_request"; +const char AutonomousStream::CLOSE_AFTER_RESPONSE[] = "close_after_response"; const char AutonomousStream::NO_TRAILERS[] = "no_trailers"; const char AutonomousStream::NO_END_STREAM[] = "no_end_stream"; @@ -84,6 +85,11 @@ void AutonomousStream::sendResponse() { encodeTrailers(upstream_.responseTrailers()); } } + if (!headers.get_(CLOSE_AFTER_RESPONSE).empty()) { + parent_.connection().dispatcher().post( + [this]() -> void { parent_.connection().close(Network::ConnectionCloseType::FlushWrite); }); + return; + } } AutonomousHttpConnection::AutonomousHttpConnection(AutonomousUpstream& autonomous_upstream, diff --git a/test/integration/autonomous_upstream.h b/test/integration/autonomous_upstream.h index 3a5acd2443a5..d5ae283fec56 100644 --- a/test/integration/autonomous_upstream.h +++ b/test/integration/autonomous_upstream.h @@ -25,6 +25,8 @@ class AutonomousStream : public FakeStream { static const char NO_TRAILERS[]; // Prevents upstream from finishing response. static const char NO_END_STREAM[]; + // Closes the underlying connection after a given response is sent. + static const char CLOSE_AFTER_RESPONSE[]; AutonomousStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder, AutonomousUpstream& upstream, bool allow_incomplete_streams); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index d648d7117dec..d796d27bcff2 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -222,9 +222,9 @@ class FakeStream : public Http::RequestDecoder, absl::Mutex lock_; Http::RequestHeaderMapPtr headers_ ABSL_GUARDED_BY(lock_); Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_); + FakeHttpConnection& parent_; private: - FakeHttpConnection& parent_; Http::ResponseEncoder& encoder_; Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_); bool end_stream_ ABSL_GUARDED_BY(lock_){}; diff --git a/test/integration/integration_tcp_client.cc b/test/integration/integration_tcp_client.cc index 500d26d42aec..aabf2a957d95 100644 --- a/test/integration/integration_tcp_client.cc +++ b/test/integration/integration_tcp_client.cc @@ -48,7 +48,12 @@ IntegrationTcpClient::IntegrationTcpClient( client_write_buffer_ = new NiceMock(below_low, above_high, above_overflow); return client_write_buffer_; + })) + .WillRepeatedly(Invoke([](std::function below_low, std::function above_high, + std::function above_overflow) -> Buffer::Instance* { + return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); })); + ; connection_ = dispatcher.createClientConnection( Network::Utility::resolveUrl( diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index 9682480725ba..21a4f8408b2b 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -25,6 +25,7 @@ using testing::_; using testing::AtLeast; +using testing::HasSubstr; using testing::Invoke; using testing::MatchesRegex; using testing::NiceMock; @@ -150,6 +151,86 @@ TEST_P(TcpProxyIntegrationTest, TcpProxyDownstreamDisconnect) { tcp_client->waitForDisconnect(); } +TEST_P(TcpProxyIntegrationTest, TcpProxyManyConnections) { + autonomous_upstream_ = true; + initialize(); + const int num_connections = 50; + std::vector clients(num_connections); + + for (int i = 0; i < num_connections; ++i) { + clients[i] = makeTcpConnection(lookupPort("tcp_proxy")); + } + test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", num_connections); + for (int i = 0; i < num_connections; ++i) { + IntegrationTcpClientPtr& tcp_client = clients[i]; + // The autonomous upstream is an HTTP upstream, so send raw HTTP. + // This particular request will result in the upstream sending a response, + // and flush-closing due to the 'close_after_response' header. + ASSERT_TRUE(tcp_client->write( + "GET / HTTP/1.1\r\nHost: foo\r\nclose_after_response: yes\r\ncontent-length: 0\r\n\r\n", + false)); + tcp_client->waitForHalfClose(); + tcp_client->close(); + EXPECT_THAT(tcp_client->data(), HasSubstr("aaaaaaaaaa")); + } +} + +TEST_P(TcpProxyIntegrationTest, TcpProxyRandomBehavior) { + autonomous_upstream_ = true; + initialize(); + std::list clients; + + // The autonomous upstream parses HTTP, and HTTP headers and sends responses + // when full requests are received. basic_request will result in + // bidirectional data. request_with_close will result in bidirectional data, + // but also the upstream closing the connection. + const char* basic_request = "GET / HTTP/1.1\r\nHost: foo\r\ncontent-length: 0\r\n\r\n"; + const char* request_with_close = + "GET / HTTP/1.1\r\nHost: foo\r\nclose_after_response: yes\r\ncontent-length: 0\r\n\r\n"; + TestRandomGenerator rand; + + // Seed some initial clients + for (int i = 0; i < 5; ++i) { + clients.push_back(makeTcpConnection(lookupPort("tcp_proxy"))); + } + + // Now randomly write / add more connections / close. + for (int i = 0; i < 50; ++i) { + int action = rand.random() % 3; + + if (action == 0) { + // Add a new connection. + clients.push_back(makeTcpConnection(lookupPort("tcp_proxy"))); + } + if (clients.empty()) { + break; + } + IntegrationTcpClientPtr& tcp_client = clients.front(); + if (action == 1) { + // Write to the first connection. + ASSERT_TRUE(tcp_client->write(basic_request, false)); + tcp_client->waitForData("\r\n\r\n", false); + tcp_client->clearData(tcp_client->data().size()); + } else if (action == 2) { + // Close the first connection. + ASSERT_TRUE(tcp_client->write(request_with_close, false)); + tcp_client->waitForData("\r\n\r\n", false); + tcp_client->waitForHalfClose(); + tcp_client->close(); + clients.pop_front(); + } + } + + while (!clients.empty()) { + IntegrationTcpClientPtr& tcp_client = clients.front(); + ASSERT_TRUE(tcp_client->write(request_with_close, false)); + tcp_client->waitForData("\r\n\r\n", false); + tcp_client->waitForHalfClose(); + tcp_client->close(); + clients.pop_front(); + } +} + TEST_P(TcpProxyIntegrationTest, NoUpstream) { // Set the first upstream to have an invalid port, so connection will fail, // but it won't fail synchronously (as it would if there were simply no