diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 6aa74011e4f1d..29f2289a5ce98 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -78,6 +78,13 @@ void FakeStream::encodeHeaders(const Http::HeaderMapImpl& headers, bool end_stre }); } +void FakeStream::encodeData(absl::string_view data, bool end_stream) { + parent_.connection().dispatcher().post([this, data, end_stream]() -> void { + Buffer::OwnedImpl fake_data(data.data(), data.size()); + encoder_.encodeData(fake_data, end_stream); + }); +} + void FakeStream::encodeData(uint64_t size, bool end_stream) { parent_.connection().dispatcher().post([this, size, end_stream]() -> void { Buffer::OwnedImpl data(std::string(size, 'a')); @@ -138,6 +145,18 @@ AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, ui return AssertionSuccess(); } +AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, + absl::string_view data, milliseconds timeout) { + auto succeeded = waitForData(client_dispatcher, data.length(), timeout); + if (succeeded) { + Buffer::OwnedImpl buffer(data.data(), data.length()); + if (!TestUtility::buffersEqual(body(), buffer)) { + return AssertionFailure() << body().toString() << " not equal to " << data; + } + } + return succeeded; +} + AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher, milliseconds timeout) { Thread::LockGuard lock(lock_); diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 574a075874529..052ac04e60df3 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -52,6 +52,7 @@ class FakeStream : public Http::StreamDecoder, void encodeHeaders(const Http::HeaderMapImpl& headers, bool end_stream); void encodeData(uint64_t size, bool end_stream); void encodeData(Buffer::Instance& data, bool end_stream); + void encodeData(absl::string_view data, bool end_stream); void encodeTrailers(const Http::HeaderMapImpl& trailers); void encodeResetStream(); const Http::HeaderMap& headers() { return *headers_; } @@ -67,6 +68,11 @@ class FakeStream : public Http::StreamDecoder, waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForData(Event::Dispatcher& client_dispatcher, absl::string_view body, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + ABSL_MUST_USE_RESULT testing::AssertionResult waitForEndStream(Event::Dispatcher& client_dispatcher, diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index f8ab7a672bd80..b5d750cd8bf86 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -109,6 +109,13 @@ IntegrationCodecClient::makeRequestWithBody(const Http::HeaderMap& headers, uint return response; } +void IntegrationCodecClient::sendData(Http::StreamEncoder& encoder, absl::string_view data, + bool end_stream) { + Buffer::OwnedImpl buffer_data(data.data(), data.size()); + encoder.encodeData(buffer_data, end_stream); + flushWrite(); +} + void IntegrationCodecClient::sendData(Http::StreamEncoder& encoder, Buffer::Instance& data, bool end_stream) { encoder.encodeData(data, end_stream); diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 1b6c3b760157c..ca20dc013539e 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -26,6 +26,7 @@ class IntegrationCodecClient : public Http::CodecClientProd { IntegrationStreamDecoderPtr makeRequestWithBody(const Http::HeaderMap& headers, uint64_t body_size); bool sawGoAway() { return saw_goaway_; } + void sendData(Http::StreamEncoder& encoder, absl::string_view data, bool end_stream); void sendData(Http::StreamEncoder& encoder, Buffer::Instance& data, bool end_stream); void sendData(Http::StreamEncoder& encoder, uint64_t size, bool end_stream); void sendTrailers(Http::StreamEncoder& encoder, const Http::HeaderMap& trailers); diff --git a/test/integration/websocket_integration_test.cc b/test/integration/websocket_integration_test.cc index 675faad05d03e..14de3e9ebbc6c 100644 --- a/test/integration/websocket_integration_test.cc +++ b/test/integration/websocket_integration_test.cc @@ -21,7 +21,20 @@ using testing::MatchesRegex; namespace Envoy { namespace { -bool headersRead(const std::string& data) { return data.find("\r\n\r\n") != std::string::npos; } +Http::TestHeaderMapImpl upgradeRequestHeaders(const char* upgrade_type = "websocket", + uint32_t content_length = 0) { + return Http::TestHeaderMapImpl{ + {":authority", "host"}, {"content-length", fmt::format("{}", content_length)}, + {":path", "/websocket/test"}, {":method", "GET"}, + {"upgrade", upgrade_type}, {"connection", "keep-alive, Upgrade"}}; +} + +Http::TestHeaderMapImpl upgradeResponseHeaders(const char* upgrade_type = "websocket") { + return Http::TestHeaderMapImpl{{":status", "101"}, + {"connection", "Upgrade"}, + {"upgrade", upgrade_type}, + {"content-length", "0"}}; +} static std::string websocketTestParamsToString( const testing::TestParamInfo> params) { @@ -32,6 +45,63 @@ static std::string websocketTestParamsToString( } // namespace +void WebsocketIntegrationTest::validateUpgradeRequestHeaders( + const Http::HeaderMap& original_proxied_request_headers, + const Http::HeaderMap& original_request_headers) { + Http::TestHeaderMapImpl proxied_request_headers(original_proxied_request_headers); + if (proxied_request_headers.ForwardedProto()) { + ASSERT_STREQ(proxied_request_headers.ForwardedProto()->value().c_str(), "http"); + proxied_request_headers.removeForwardedProto(); + } + if (proxied_request_headers.Scheme()) { + ASSERT_STREQ(proxied_request_headers.Scheme()->value().c_str(), "http"); + proxied_request_headers.removeScheme(); + } + + if (!old_style_websockets_) { + // Check for and remove headers added by default for HTTP requests. + ASSERT_TRUE(proxied_request_headers.RequestId() != nullptr); + ASSERT_TRUE(proxied_request_headers.EnvoyExpectedRequestTimeoutMs() != nullptr); + proxied_request_headers.removeEnvoyExpectedRequestTimeoutMs(); + } else { + // Check for and undo the path rewrite. + ASSERT_STREQ(proxied_request_headers.Path()->value().c_str(), "/websocket"); + proxied_request_headers.Path()->value().append("/test", 5); + ASSERT_STREQ(proxied_request_headers.EnvoyOriginalPath()->value().c_str(), "/websocket/test"); + proxied_request_headers.removeEnvoyOriginalPath(); + } + commonValidate(proxied_request_headers, original_request_headers); + proxied_request_headers.removeRequestId(); + + EXPECT_EQ(proxied_request_headers, original_request_headers); +} + +void WebsocketIntegrationTest::validateUpgradeResponseHeaders( + const Http::HeaderMap& original_proxied_response_headers, + const Http::HeaderMap& original_response_headers) { + Http::TestHeaderMapImpl proxied_response_headers(original_proxied_response_headers); + if (!old_style_websockets_) { + // Check for and remove headers added by default for HTTP responses. + ASSERT_TRUE(proxied_response_headers.Date() != nullptr); + ASSERT_TRUE(proxied_response_headers.Server() != nullptr); + ASSERT_STREQ(proxied_response_headers.Server()->value().c_str(), "envoy"); + proxied_response_headers.removeDate(); + proxied_response_headers.removeServer(); + } + commonValidate(proxied_response_headers, original_response_headers); + + EXPECT_EQ(proxied_response_headers, original_response_headers); +} + +void WebsocketIntegrationTest::commonValidate(Http::HeaderMap& proxied_headers, + const Http::HeaderMap& original_headers) { + // If no content length is specified, the HTTP codec should add a chunked encoding header. + if (original_headers.ContentLength() == nullptr) { + ASSERT_STREQ(proxied_headers.TransferEncoding()->value().c_str(), "chunked"); + proxied_headers.removeTransferEncoding(); + } +} + INSTANTIATE_TEST_CASE_P(IpVersions, WebsocketIntegrationTest, testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), @@ -67,177 +137,122 @@ void WebsocketIntegrationTest::initialize() { HttpIntegrationTest::initialize(); } -void WebsocketIntegrationTest::validateInitialUpstreamData(const std::string& received_data) { - if (old_style_websockets_) { - // The request path gets rewritten from /websocket/test to /websocket. - // The size of headers received by the destination is 228 bytes. - EXPECT_EQ(received_data.size(), 228); - } - // In HTTP1, the transfer-length is defined by use of the "chunked" transfer-coding, even if - // content-length header is present. No body websocket upgrade request send to upstream has - // content-length header and has no transfer-encoding header. - EXPECT_NE(received_data.find("content-length:"), std::string::npos); - EXPECT_EQ(received_data.find("transfer-encoding:"), std::string::npos); -} - -void WebsocketIntegrationTest::validateInitialDownstreamData(const std::string& received_data, - const std::string& expected_data) { - if (old_style_websockets_) { - ASSERT_EQ(expected_data, received_data); - } else { - // Strip out the date header since we're not going to generate an exact match. - std::regex extra_request_headers("date:.*\r\nserver: envoy\r\n"); - std::string stripped_data = std::regex_replace(received_data, extra_request_headers, ""); - EXPECT_EQ(expected_data, stripped_data); - } -} - -void WebsocketIntegrationTest::validateFinalDownstreamData(const std::string& received_data, - const std::string& expected_data) { - if (old_style_websockets_) { - EXPECT_EQ(received_data, expected_data); - } else { - // Strip out the date header since we're not going to generate an exact match. - std::regex extra_request_headers("date:.*\r\nserver: envoy\r\n"); - std::string stripped_data = std::regex_replace(received_data, extra_request_headers, ""); - EXPECT_EQ(expected_data, stripped_data); - } -} - -void WebsocketIntegrationTest::validateFinalUpstreamData(const std::string& received_data, - const std::string& expected_data) { - std::regex extra_response_headers("x-request-id:.*\r\n"); - std::string stripped_data = std::regex_replace(received_data, extra_response_headers, ""); - EXPECT_EQ(expected_data, stripped_data); -} - -void WebsocketIntegrationTest::performWebSocketUpgrade(const std::string& upgrade_req_string, - const std::string& upgrade_resp_string) { +void WebsocketIntegrationTest::performUpgrade( + const Http::TestHeaderMapImpl& upgrade_request_headers, + const Http::TestHeaderMapImpl& upgrade_response_headers) { // Establish the initial connection. - tcp_client_ = makeTcpConnection(lookupPort("http")); + codec_client_ = makeHttpConnection(lookupPort("http")); - // Send the websocket upgrade request. - tcp_client_->write(upgrade_req_string); + // Send websocket upgrade request + auto encoder_decoder = codec_client_->startRequest(upgrade_request_headers); + request_encoder_ = &encoder_decoder.first; + response_ = std::move(encoder_decoder.second); if (old_style_websockets_) { test_server_->waitForCounterGe("tcp.websocket.downstream_cx_total", 1); } // Verify the upgrade was received upstream. - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_tcp_upstream_connection_)); - std::string data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData(&headersRead, &data)); - validateInitialUpstreamData(data); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + validateUpgradeRequestHeaders(upstream_request_->headers(), upgrade_request_headers); // Send the upgrade response - ASSERT_TRUE(fake_tcp_upstream_connection_->write(upgrade_resp_string)); + upstream_request_->encodeHeaders(upgrade_response_headers, false); // Verify the upgrade response was received downstream. - tcp_client_->waitForData("\r\n\r\n", false); - validateInitialDownstreamData(tcp_client_->data(), downstreamRespStr()); + response_->waitForHeaders(); + validateUpgradeResponseHeaders(response_->headers(), upgrade_response_headers); } void WebsocketIntegrationTest::sendBidirectionalData() { // Verify that the client can still send data upstream, and that upstream // receives it. - tcp_client_->write("hello"); - ASSERT_TRUE( - fake_tcp_upstream_connection_->waitForData(FakeRawConnection::waitForInexactMatch("hello"))); + codec_client_->sendData(*request_encoder_, "hello", false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello")); // Verify the upstream can send data to the client and that the client // receives it. - ASSERT_TRUE(fake_tcp_upstream_connection_->write("world")); - tcp_client_->waitForData("world", false); + upstream_request_->encodeData("world", false); + response_->waitForBodyData(5); + EXPECT_EQ("world", response_->body()); } TEST_P(WebsocketIntegrationTest, WebSocketConnectionDownstreamDisconnect) { config_helper_.addConfigModifier(setRouteUsingWebsocket(nullptr, old_style_websockets_)); initialize(); - performWebSocketUpgrade(upgrade_req_str_, upgrade_resp_str_); + performUpgrade(upgradeRequestHeaders(), upgradeResponseHeaders()); sendBidirectionalData(); // Send some final data from the client, and disconnect. - tcp_client_->write("bye!"); - tcp_client_->close(); + codec_client_->sendData(*request_encoder_, "bye!", false); + codec_client_->close(); // Verify the final data was received and that the connection is torn down. - std::string final_data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch("bye"), &final_data)); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); - - validateFinalDownstreamData(tcp_client_->data(), downstreamRespStr() + "world"); - - if (old_style_websockets_) { - return; - } - - const std::string upstream_payload = "GET /websocket/test HTTP/1.1\r\n" - "host: host\r\n" - "connection: keep-alive, Upgrade\r\n" - "upgrade: websocket\r\n" - "content-length: 0\r\n" - "x-forwarded-proto: http\r\n" - "x-envoy-expected-rq-timeout-ms: 15000\r\n\r\n" - "hellobye!"; - validateFinalUpstreamData(final_data, upstream_payload); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hellobye!")); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); } TEST_P(WebsocketIntegrationTest, WebSocketConnectionUpstreamDisconnect) { config_helper_.addConfigModifier(setRouteUsingWebsocket(nullptr, old_style_websockets_)); initialize(); - performWebSocketUpgrade(upgrade_req_str_, upgrade_resp_str_); + performUpgrade(upgradeRequestHeaders(), upgradeResponseHeaders()); // Standard TCP proxy semantics post upgrade - tcp_client_->write("hello"); - ASSERT_TRUE( - fake_tcp_upstream_connection_->waitForData(FakeRawConnection::waitForInexactMatch("hello"))); + codec_client_->sendData(*request_encoder_, "hello", false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello")); // Send data downstream and disconnect immediately. - ASSERT_TRUE(fake_tcp_upstream_connection_->write("world")); - ASSERT_TRUE(fake_tcp_upstream_connection_->close()); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); + upstream_request_->encodeData("world", false); + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); // Verify both the data and the disconnect went through. - tcp_client_->waitForData("world", false); - tcp_client_->waitForDisconnect(); - ASSERT(!fake_tcp_upstream_connection_->connected()); - - validateFinalDownstreamData(tcp_client_->data(), downstreamRespStr() + "world"); + response_->waitForBodyData(5); + EXPECT_EQ("world", response_->body()); + codec_client_->waitForDisconnect(); + ASSERT(!fake_upstream_connection_->connected()); } TEST_P(WebsocketIntegrationTest, EarlyData) { config_helper_.addConfigModifier(setRouteUsingWebsocket(nullptr, old_style_websockets_)); initialize(); - // WebSocket upgrade with early data (HTTP body) + // Establish the initial connection. + codec_client_ = makeHttpConnection(lookupPort("http")); + const std::string early_data_req_str = "hello"; const std::string early_data_resp_str = "world"; - const std::string upgrade_req_str = - createUpgradeRequest("websocket", early_data_req_str.length()); - tcp_client_ = makeTcpConnection(lookupPort("http")); - // Send early data alongside websocket upgrade request - tcp_client_->write(upgrade_req_str + early_data_req_str); - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_tcp_upstream_connection_)); + + // Send websocket upgrade request with early data. + auto encoder_decoder = + codec_client_->startRequest(upgradeRequestHeaders("websocket", early_data_req_str.size())); + request_encoder_ = &encoder_decoder.first; + response_ = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, early_data_req_str, false); // Wait for both the upgrade, and the early data. - std::string data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch(early_data_req_str.c_str()), &data)); - // We expect to find the early data on the upstream side - EXPECT_TRUE(StringUtil::endsWith(data, early_data_req_str)); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hello")); + // Accept websocket upgrade request - ASSERT_TRUE(fake_tcp_upstream_connection_->write(upgrade_resp_str_)); + upstream_request_->encodeHeaders(upgradeResponseHeaders(), false); // Reply also with early data - ASSERT_TRUE(fake_tcp_upstream_connection_->write(early_data_resp_str)); + upstream_request_->encodeData(early_data_resp_str, false); // upstream disconnect - ASSERT_TRUE(fake_tcp_upstream_connection_->close()); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); - tcp_client_->waitForData(early_data_resp_str, false); - tcp_client_->waitForDisconnect(); - - validateFinalDownstreamData(tcp_client_->data(), downstreamRespStr() + "world"); + ASSERT_TRUE(fake_upstream_connection_->close()); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + response_->waitForHeaders(); + auto upgrade_response_headers(upgradeResponseHeaders()); + validateUpgradeResponseHeaders(response_->headers(), upgrade_response_headers); + response_->waitForBodyData(5); + codec_client_->waitForDisconnect(); + EXPECT_EQ("world", response_->body()); } TEST_P(WebsocketIntegrationTest, WebSocketConnectionIdleTimeout) { @@ -259,7 +274,7 @@ TEST_P(WebsocketIntegrationTest, WebSocketConnectionIdleTimeout) { initialize(); // WebSocket upgrade, send some data and disconnect downstream - performWebSocketUpgrade(upgrade_req_str_, upgrade_resp_str_); + performUpgrade(upgradeRequestHeaders(), upgradeResponseHeaders()); sendBidirectionalData(); if (old_style_websockets_) { @@ -267,8 +282,8 @@ TEST_P(WebsocketIntegrationTest, WebSocketConnectionIdleTimeout) { } else { test_server_->waitForCounterGe("http.config_test.downstream_rq_idle_timeout", 1); } - tcp_client_->waitForDisconnect(); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); + codec_client_->waitForDisconnect(); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); } TEST_P(WebsocketIntegrationTest, WebSocketLogging) { @@ -310,11 +325,11 @@ TEST_P(WebsocketIntegrationTest, WebSocketLogging) { initialize(); - performWebSocketUpgrade(upgrade_req_str_, upgrade_resp_str_); + performUpgrade(upgradeRequestHeaders(), upgradeResponseHeaders()); sendBidirectionalData(); - tcp_client_->close(); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); + codec_client_->close(); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); std::string log_result; do { @@ -326,8 +341,8 @@ TEST_P(WebsocketIntegrationTest, WebSocketLogging) { : R"EOF(\[::1\]:[0-9]+)EOF"; EXPECT_THAT(log_result, MatchesRegex(fmt::format(expected_log_template, - 82, // response length - 5, // hello length + 101, // response length + 5, // hello length ip_port_regex, ip_port_regex, ip_port_regex))); } @@ -347,56 +362,17 @@ TEST_P(WebsocketIntegrationTest, NonWebsocketUpgrade) { config_helper_.addConfigModifier(setRouteUsingWebsocket(nullptr, old_style_websockets_)); initialize(); - const std::string upgrade_req_str = createUpgradeRequest("foo"); - const std::string upgrade_resp_str = createUpgradeResponse("foo"); - - // Upgrade, send some data and disconnect downstream - - tcp_client_ = makeTcpConnection(lookupPort("http")); - // Send websocket upgrade request - // The size of headers received by the destination is 228 bytes. - tcp_client_->write(upgrade_req_str); - if (old_style_websockets_) { - test_server_->waitForCounterGe("tcp.websocket.downstream_cx_total", 1); - } - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_tcp_upstream_connection_)); - std::string data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData(&headersRead, &data)); - validateInitialUpstreamData(data); - - // Accept websocket upgrade request - ASSERT_TRUE(fake_tcp_upstream_connection_->write(upgrade_resp_str)); - tcp_client_->waitForData("\r\n\r\n", false); - if (old_style_websockets_) { - ASSERT_EQ(tcp_client_->data(), upgrade_resp_str); - } - + performUpgrade(upgradeRequestHeaders("foo", 0), upgradeResponseHeaders("foo")); sendBidirectionalData(); // downstream disconnect - tcp_client_->write("bye!"); - tcp_client_->close(); - std::string final_data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch("bye"), &final_data)); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); - - const std::string modified_upgrade_resp_str = "HTTP/1.1 101 Switching Protocols\r\nconnection: " - "Upgrade\r\nupgrade: foo\r\ncontent-length: " - "0\r\n\r\n"; - validateFinalDownstreamData(tcp_client_->data(), modified_upgrade_resp_str + "world"); - const std::string upstream_payload = "GET /websocket/test HTTP/1.1\r\n" - "host: host\r\n" - "connection: keep-alive, Upgrade\r\n" - "upgrade: foo\r\n" - "content-length: 0\r\n" - "x-forwarded-proto: http\r\n" - "x-envoy-expected-rq-timeout-ms: 15000\r\n\r\n" - "hellobye!"; - - std::regex extra_response_headers("x-request-id:.*\r\n"); - std::string stripped_data = std::regex_replace(final_data, extra_response_headers, ""); - EXPECT_EQ(upstream_payload, stripped_data); + codec_client_->sendData(*request_encoder_, "bye!", false); + codec_client_->close(); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, "hellobye!")); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); + + auto upgrade_response_headers(upgradeResponseHeaders("foo")); + validateUpgradeResponseHeaders(response_->headers(), upgrade_response_headers); } TEST_P(WebsocketIntegrationTest, WebsocketCustomFilterChain) { @@ -425,109 +401,79 @@ TEST_P(WebsocketIntegrationTest, WebsocketCustomFilterChain) { // Websocket upgrades are configured to disallow large payload. const std::string early_data_req_str(2048, 'a'); { - const std::string upgrade_req_str = - createUpgradeRequest("websocket", early_data_req_str.length()); - IntegrationTcpClientPtr tcp_client_ = makeTcpConnection(lookupPort("http")); - tcp_client_->write(upgrade_req_str + early_data_req_str); - tcp_client_->waitForData("\r\n\r\n", false); - EXPECT_NE(tcp_client_->data().find("413"), std::string::npos); - tcp_client_->waitForDisconnect(true); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = codec_client_->startRequest( + upgradeRequestHeaders("websocket", early_data_req_str.length())); + response_ = std::move(encoder_decoder.second); + codec_client_->sendData(encoder_decoder.first, early_data_req_str, false); + response_->waitForEndStream(); + EXPECT_STREQ("413", response_->headers().Status()->value().c_str()); + codec_client_->waitForDisconnect(); } // HTTP requests are configured to disallow large bodies. { - const std::string upgrade_req_str = fmt::format("GET / HTTP/1.1\r\nHost: host\r\nConnection: " - "keep-alive\r\nContent-Length: {}\r\n\r\n", - early_data_req_str.length()); - IntegrationTcpClientPtr tcp_client_ = makeTcpConnection(lookupPort("http")); - tcp_client_->write(upgrade_req_str + early_data_req_str); - tcp_client_->waitForData("\r\n\r\n", false); - EXPECT_NE(tcp_client_->data().find("413"), std::string::npos); - tcp_client_->waitForDisconnect(true); + Http::TestHeaderMapImpl request_headers{ + {":method", "GET"}, {":path", "/"}, {"content-length", "2048"}, {":authority", "host"}}; + codec_client_ = makeHttpConnection(lookupPort("http")); + auto encoder_decoder = codec_client_->startRequest(request_headers); + response_ = std::move(encoder_decoder.second); + codec_client_->sendData(encoder_decoder.first, early_data_req_str, false); + response_->waitForEndStream(); + EXPECT_STREQ("413", response_->headers().Status()->value().c_str()); + codec_client_->waitForDisconnect(); } // Foo upgrades are configured without the buffer filter, so should explicitly // allow large payload. { - const std::string upgrade_req_str = createUpgradeRequest("foo", early_data_req_str.length()); - IntegrationTcpClientPtr tcp_client_ = makeTcpConnection(lookupPort("http")); - tcp_client_->write(upgrade_req_str + early_data_req_str); - FakeRawConnectionPtr fake_tcp_upstream_connection_; - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_tcp_upstream_connection_)); - // Make sure the full payload arrives. - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch(early_data_req_str.c_str()))); + performUpgrade(upgradeRequestHeaders("foo", early_data_req_str.length()), + upgradeResponseHeaders("foo")); + codec_client_->sendData(*request_encoder_, early_data_req_str, false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, early_data_req_str)); + // Tear down all the connections cleanly. - tcp_client_->close(); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); + codec_client_->close(); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); } } TEST_P(WebsocketIntegrationTest, BidirectionalChunkedData) { config_helper_.addConfigModifier(setRouteUsingWebsocket(nullptr, old_style_websockets_)); initialize(); - const std::string upgrade_req_str = "GET /websocket/test HTTP/1.1\r\nHost: host\r\nconnection: " - "keep-alive, Upgrade\r\nupgrade: Websocket\r\n" - "transfer-encoding: chunked\r\n\r\n" - "3\r\n123\r\n0\r\n\r\n" - "SomeWebSocketPayload"; - - // Upgrade, send initial data and wait for it to be received. - IntegrationTcpClientPtr tcp_client_ = makeTcpConnection(lookupPort("http")); - tcp_client_->write(upgrade_req_str); - FakeRawConnectionPtr fake_tcp_upstream_connection_; - ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_tcp_upstream_connection_)); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch("SomeWebSocketPayload"))); - - // Finish the upgrade. - const std::string upgrade_resp_str = - "HTTP/1.1 101 Switching Protocols\r\nconnection: Upgrade\r\nupgrade: Websocket\r\n" - "transfer-encoding: chunked\r\n\r\n" - "4\r\nabcd\r\n0\r\n\r\n" - "SomeWebsocketResponsePayload"; - ASSERT_TRUE(fake_tcp_upstream_connection_->write(upgrade_resp_str)); - tcp_client_->waitForData("SomeWebsocketResponsePayload", false); - - // Verify bidirectional data still works. - tcp_client_->write("FinalClientPayload"); - std::string final_data; - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForData( - FakeRawConnection::waitForInexactMatch("FinalClientPayload"), &final_data)); - ASSERT_TRUE(fake_tcp_upstream_connection_->write("FinalServerPayload")); - tcp_client_->waitForData("FinalServerPayload", false); + + auto request_headers = upgradeRequestHeaders(); + request_headers.removeContentLength(); + auto response_headers = upgradeResponseHeaders(); + response_headers.removeContentLength(); + performUpgrade(request_headers, response_headers); + + // With content-length not present, the HTTP codec will send the request with + // transfer-encoding: chunked. + ASSERT_TRUE(upstream_request_->headers().TransferEncoding() != nullptr); + ASSERT_TRUE(response_->headers().TransferEncoding() != nullptr); + + // Send both a chunked request body and "websocket" payload. + std::string request_payload = "3\r\n123\r\n0\r\n\r\nSomeWebsocketRequestPayload"; + codec_client_->sendData(*request_encoder_, request_payload, false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, request_payload)); + + // Send both a chunked response body and "websocket" payload. + std::string response_payload = "4\r\nabcd\r\n0\r\n\r\nSomeWebsocketResponsePayload"; + upstream_request_->encodeData(response_payload, false); + response_->waitForBodyData(response_payload.size()); + EXPECT_EQ(response_payload, response_->body()); + + // Verify follow-up bidirectional data still works. + codec_client_->sendData(*request_encoder_, "FinalClientPayload", false); + ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, request_payload + "FinalClientPayload")); + upstream_request_->encodeData("FinalServerPayload", false); + response_->waitForBodyData(5); + EXPECT_EQ(response_payload + "FinalServerPayload", response_->body()); // Clean up. - tcp_client_->close(); - ASSERT_TRUE(fake_tcp_upstream_connection_->waitForDisconnect()); - - const std::string modified_upstream_payload = - "GET /websocket/test HTTP/1.1\r\n" - "host: host\r\n" - "connection: keep-alive, Upgrade\r\n" - "upgrade: Websocket\r\n" - "x-forwarded-proto: http\r\n" - "x-envoy-expected-rq-timeout-ms: 15000\r\n" - "transfer-encoding: chunked\r\n\r\n" - "3\r\n123\r\n0\r\n\r\nSomeWebSocketPayloadFinalClientPayload"; - const std::string old_style_modified_payload = - "GET /websocket HTTP/1.1\r\n" - "host: host\r\n" - "connection: keep-alive, Upgrade\r\n" - "upgrade: Websocket\r\n" - "x-forwarded-proto: http\r\n" - "x-envoy-original-path: /websocket/test\r\n" - "transfer-encoding: chunked\r\n\r\n" - "3\r\n123\r\n0\r\n\r\nSomeWebSocketPayloadFinalClientPayload"; - validateFinalUpstreamData(final_data, old_style_websockets_ ? old_style_modified_payload - : modified_upstream_payload); - - const std::string modified_downstream_payload = - "HTTP/1.1 101 Switching Protocols\r\nconnection: Upgrade\r\nupgrade: Websocket\r\n" - "transfer-encoding: chunked\r\n\r\n" - "4\r\nabcd\r\n0\r\n\r\n" - "SomeWebsocketResponsePayloadFinalServerPayload"; - validateFinalDownstreamData(tcp_client_->data(), modified_downstream_payload); + codec_client_->close(); + ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect()); } } // namespace Envoy diff --git a/test/integration/websocket_integration_test.h b/test/integration/websocket_integration_test.h index a728a7c87caec..d37476d67b955 100644 --- a/test/integration/websocket_integration_test.h +++ b/test/integration/websocket_integration_test.h @@ -13,54 +13,26 @@ class WebsocketIntegrationTest void initialize() override; WebsocketIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, std::get<0>(GetParam())) {} - bool old_style_websockets_{std::get<1>(GetParam())}; - void TearDown() override { - fake_tcp_upstream_connection_.reset(); - tcp_client_.reset(); - } protected: - void performWebSocketUpgrade(const std::string& upgrade_req_string, - const std::string& upgrade_resp_string); + void performUpgrade(const Http::TestHeaderMapImpl& upgrade_request_headers, + const Http::TestHeaderMapImpl& upgrade_response_headers); void sendBidirectionalData(); - void validateInitialUpstreamData(const std::string& received_data); - void validateInitialDownstreamData(const std::string& received_data, - const std::string& expected_data); - void validateFinalDownstreamData(const std::string& received_data, - const std::string& expected_data); - void validateFinalUpstreamData(const std::string& received_data, - const std::string& expected_data); - - const std::string& downstreamRespStr() { - return old_style_websockets_ ? upgrade_resp_str_ : modified_upgrade_resp_str_; - } - - static std::string createUpgradeRequest(absl::string_view upgrade_type, - absl::optional content_length = absl::nullopt) { - std::string content_length_string = - content_length.has_value() ? fmt::format("Content-Length: {}\r\n", content_length.value()) - : ""; - return fmt::format("GET /websocket/test HTTP/1.1\r\nHost: host\r\nConnection: " - "keep-alive, Upgrade\r\nUpgrade: {}\r\n{}\r\n", - upgrade_type, content_length_string); - } + void validateUpgradeRequestHeaders(const Http::HeaderMap& proxied_request_headers, + const Http::HeaderMap& original_request_headers); - static std::string createUpgradeResponse(absl::string_view upgrade_type) { - return fmt::format( - "HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: {}\r\n\r\n", - upgrade_type); - } + void validateUpgradeResponseHeaders(const Http::HeaderMap& proxied_response_headers, + const Http::HeaderMap& original_response_headers); - const std::string upgrade_req_str_ = createUpgradeRequest("websocket"); - const std::string upgrade_resp_str_ = createUpgradeResponse("websocket"); + void commonValidate(Http::HeaderMap& proxied_headers, const Http::HeaderMap& original_headers); - const std::string modified_upgrade_resp_str_ = "HTTP/1.1 101 Switching Protocols\r\nconnection: " - "Upgrade\r\nupgrade: websocket\r\ncontent-length: " - "0\r\n\r\n"; - - FakeRawConnectionPtr fake_tcp_upstream_connection_; - IntegrationTcpClientPtr tcp_client_; + // True if the test uses "old style" TCP proxy websockets. False to use the + // new style "HTTP filter chain" websockets. + // See + // https://github.com/envoyproxy/envoy/blob/master/docs/root/intro/arch_overview/websocket.rst + bool old_style_websockets_{std::get<1>(GetParam())}; + IntegrationStreamDecoderPtr response_; }; } // namespace Envoy