Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions test/integration/autonomous_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const char AutonomousStream::RESPONSE_SIZE_BYTES[] = "response_size_bytes";
const char AutonomousStream::RESPONSE_DATA_BLOCKS[] = "response_data_blocks";
const char AutonomousStream::EXPECT_REQUEST_SIZE_BYTES[] = "expect_request_size_bytes";
const char AutonomousStream::RESET_AFTER_REQUEST[] = "reset_after_request";
const char AutonomousStream::CLOSE_AFTER_RESPONSE[] = "close_after_response";
const char AutonomousStream::NO_TRAILERS[] = "no_trailers";
const char AutonomousStream::NO_END_STREAM[] = "no_end_stream";

Expand Down Expand Up @@ -84,6 +85,11 @@ void AutonomousStream::sendResponse() {
encodeTrailers(upstream_.responseTrailers());
}
}
if (!headers.get_(CLOSE_AFTER_RESPONSE).empty()) {
parent_.connection().dispatcher().post(
[this]() -> void { parent_.connection().close(Network::ConnectionCloseType::FlushWrite); });
return;
}
}

AutonomousHttpConnection::AutonomousHttpConnection(AutonomousUpstream& autonomous_upstream,
Expand Down
2 changes: 2 additions & 0 deletions test/integration/autonomous_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class AutonomousStream : public FakeStream {
static const char NO_TRAILERS[];
// Prevents upstream from finishing response.
static const char NO_END_STREAM[];
// Closes the underlying connection after a given response is sent.
static const char CLOSE_AFTER_RESPONSE[];

AutonomousStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
AutonomousUpstream& upstream, bool allow_incomplete_streams);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ class FakeStream : public Http::RequestDecoder,
absl::Mutex lock_;
Http::RequestHeaderMapPtr headers_ ABSL_GUARDED_BY(lock_);
Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_);
FakeHttpConnection& parent_;

private:
FakeHttpConnection& parent_;
Http::ResponseEncoder& encoder_;
Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_);
bool end_stream_ ABSL_GUARDED_BY(lock_){};
Expand Down
5 changes: 5 additions & 0 deletions test/integration/integration_tcp_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ IntegrationTcpClient::IntegrationTcpClient(
client_write_buffer_ =
new NiceMock<MockWatermarkBuffer>(below_low, above_high, above_overflow);
return client_write_buffer_;
}))
.WillRepeatedly(Invoke([](std::function<void()> below_low, std::function<void()> above_high,
std::function<void()> above_overflow) -> Buffer::Instance* {
return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow);
}));
;

connection_ = dispatcher.createClientConnection(
Network::Utility::resolveUrl(
Expand Down
81 changes: 81 additions & 0 deletions test/integration/tcp_proxy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

using testing::_;
using testing::AtLeast;
using testing::HasSubstr;
using testing::Invoke;
using testing::MatchesRegex;
using testing::NiceMock;
Expand Down Expand Up @@ -150,6 +151,86 @@ TEST_P(TcpProxyIntegrationTest, TcpProxyDownstreamDisconnect) {
tcp_client->waitForDisconnect();
}

TEST_P(TcpProxyIntegrationTest, TcpProxyManyConnections) {
autonomous_upstream_ = true;
initialize();
const int num_connections = 50;
std::vector<IntegrationTcpClientPtr> clients(num_connections);

for (int i = 0; i < num_connections; ++i) {
clients[i] = makeTcpConnection(lookupPort("tcp_proxy"));
}
test_server_->waitForCounterGe("cluster.cluster_0.upstream_cx_total", num_connections);
for (int i = 0; i < num_connections; ++i) {
IntegrationTcpClientPtr& tcp_client = clients[i];
// The autonomous upstream is an HTTP upstream, so send raw HTTP.
// This particular request will result in the upstream sending a response,
// and flush-closing due to the 'close_after_response' header.
ASSERT_TRUE(tcp_client->write(
"GET / HTTP/1.1\r\nHost: foo\r\nclose_after_response: yes\r\ncontent-length: 0\r\n\r\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems unexpected that we are using HTTP manually in a TCP test. Do you want to comment on what the testing strategy is here?

false));
tcp_client->waitForHalfClose();
tcp_client->close();
EXPECT_THAT(tcp_client->data(), HasSubstr("aaaaaaaaaa"));
}
}

TEST_P(TcpProxyIntegrationTest, TcpProxyRandomBehavior) {
autonomous_upstream_ = true;
initialize();
std::list<IntegrationTcpClientPtr> clients;

// The autonomous upstream parses HTTP, and HTTP headers and sends responses
// when full requests are received. basic_request will result in
// bidirectional data. request_with_close will result in bidirectional data,
// but also the upstream closing the connection.
const char* basic_request = "GET / HTTP/1.1\r\nHost: foo\r\ncontent-length: 0\r\n\r\n";
const char* request_with_close =
"GET / HTTP/1.1\r\nHost: foo\r\nclose_after_response: yes\r\ncontent-length: 0\r\n\r\n";
TestRandomGenerator rand;

// Seed some initial clients
for (int i = 0; i < 5; ++i) {
clients.push_back(makeTcpConnection(lookupPort("tcp_proxy")));
}

// Now randomly write / add more connections / close.
for (int i = 0; i < 50; ++i) {
int action = rand.random() % 3;

if (action == 0) {
// Add a new connection.
clients.push_back(makeTcpConnection(lookupPort("tcp_proxy")));
}
if (clients.empty()) {
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like 2/3 of the time this test will do nothing; is that intended?

Do you want to seed the list with a few clients first?

}
IntegrationTcpClientPtr& tcp_client = clients.front();
if (action == 1) {
// Write to the first connection.
ASSERT_TRUE(tcp_client->write(basic_request, false));
tcp_client->waitForData("\r\n\r\n", false);
tcp_client->clearData(tcp_client->data().size());
} else if (action == 2) {
// Close the first connection.
ASSERT_TRUE(tcp_client->write(request_with_close, false));
tcp_client->waitForData("\r\n\r\n", false);
tcp_client->waitForHalfClose();
tcp_client->close();
clients.pop_front();
}
}

while (!clients.empty()) {
IntegrationTcpClientPtr& tcp_client = clients.front();
ASSERT_TRUE(tcp_client->write(request_with_close, false));
tcp_client->waitForData("\r\n\r\n", false);
tcp_client->waitForHalfClose();
tcp_client->close();
clients.pop_front();
}
}

TEST_P(TcpProxyIntegrationTest, NoUpstream) {
// Set the first upstream to have an invalid port, so connection will fail,
// but it won't fail synchronously (as it would if there were simply no
Expand Down