diff --git a/test/config/integration/BUILD b/test/config/integration/BUILD index bf7e32ef71112..8f56835c4a788 100644 --- a/test/config/integration/BUILD +++ b/test/config/integration/BUILD @@ -17,6 +17,7 @@ exports_files([ "server_ssl.json", "server_uds.json", "server_xfcc.json", + "tcp_proxy.json", ]) filegroup( diff --git a/test/config/integration/tcp_proxy.json b/test/config/integration/tcp_proxy.json new file mode 100644 index 0000000000000..ee0311f9ec672 --- /dev/null +++ b/test/config/integration/tcp_proxy.json @@ -0,0 +1,74 @@ +{ + "listeners": [ + { + "address": "tcp://{{ ip_loopback_address }}:0", + "filters": [ + { "type": "read", "name": + "tcp_proxy", + "config": { + "stat_prefix": "test_tcp", + "route_config": { + "routes": [ + { + "cluster": "cluster_1" + } + ] + } + } + } + ] + }, + { + "address": "tcp://{{ ip_loopback_address }}:0", + "ssl_context": { + "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", + "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/servercert.pem", + "private_key_file": "{{ test_rundir }}/test/config/integration/certs/serverkey.pem", + "alpn_protocols": "h2,http/1.1", + "alt_alpn_protocols": "http/1.1" + }, + "filters": [ + { "type": "read", "name": "tcp_proxy", + "config": { + "stat_prefix": "test_tcp_sans_tls", + "route_config": { + "routes": [ + { + "cluster": "cluster_1" + } + ] + } + } + }, + { "type": "read", "name": "client_ssl_auth", + "config": { + "auth_api_cluster": "ssl_auth", + "stat_prefix": "ssl_stats", + "refresh_delay_ms": 600000, + "ip_white_list": [ "127.0.0.1/32", "::1/64"] + } + } + ] + }], + "admin": { "access_log_path": "/dev/null", "address": "tcp://{{ ip_loopback_address }}:0" }, + "statsd_udp_ip_address": "{{ ip_loopback_address }}:8125", + + "cluster_manager": { + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 5000, + "type": "static", + "lb_type": "round_robin", + "hosts": [{"url": "tcp://{{ ip_loopback_address }}:{{ upstream_0 }}"}] + }, + { + "name": "ssl_auth", + "connect_timeout_ms": 5000, + "type": "strict_dns", + "lb_type": "round_robin", + "dns_lookup_family": "{{ dns_lookup_family }}", + "hosts": [{"url": "tcp://localhost:{{ upstream_1 }}"}] + }] + } +} diff --git a/test/integration/BUILD b/test/integration/BUILD index 4f292ebce63df..1852d85e3c0c1 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -102,12 +102,14 @@ envoy_cc_test_library( "fake_upstream.cc", "integration.cc", "server.cc", + "ssl_utility.cc", "utility.cc", ], hdrs = [ "fake_upstream.h", "integration.h", "server.h", + "ssl_utility.h", "utility.h", ], data = ["//test/common/runtime:filesystem_test_data"], @@ -234,14 +236,14 @@ envoy_cc_test( ) envoy_cc_test( - name = "xfcc_integration_test", + name = "tcp_proxy_integration_test", srcs = [ - "ssl_integration_test.h", - "xfcc_integration_test.cc", - "xfcc_integration_test.h", + "tcp_proxy_integration_test.cc", + "tcp_proxy_integration_test.h", ], data = [ - "//test/config/integration:server_xfcc.json", + "//test/config/integration:server_config_files", + "//test/config/integration:tcp_proxy.json", "//test/config/integration/certs", ], deps = [ @@ -255,6 +257,24 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "xfcc_integration_test", + srcs = [ + "ssl_integration_test.h", + "xfcc_integration_test.cc", + "xfcc_integration_test.h", + ], + data = [ + "//test/config/integration:server_xfcc.json", + "//test/config/integration/certs", + ], + deps = [ + ":integration_lib", + "//source/common/http:header_map_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "uds_integration_test", srcs = [ diff --git a/test/integration/integration.cc b/test/integration/integration.cc index 0ab71cc134977..e414f5b0cf0c2 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -178,22 +178,23 @@ void IntegrationCodecClient::ConnectionCallbacks::onEvent(uint32_t events) { IntegrationTcpClient::IntegrationTcpClient(Event::Dispatcher& dispatcher, uint32_t port, Network::Address::IpVersion version) - : callbacks_(new ConnectionCallbacks(*this)) { + : payload_reader_(new WaitForPayloadReader(dispatcher)), + callbacks_(new ConnectionCallbacks(*this)) { connection_ = dispatcher.createClientConnection(Network::Utility::resolveUrl( fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port))); connection_->addConnectionCallbacks(*callbacks_); - connection_->addReadFilter(callbacks_); + connection_->addReadFilter(payload_reader_); connection_->connect(); } void IntegrationTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } void IntegrationTcpClient::waitForData(const std::string& data) { - if (data_.find(data) == 0) { + if (payload_reader_->data().find(data) == 0) { return; } - data_to_wait_for_ = data; + payload_reader_->set_data_to_wait_for(data); connection_->dispatcher().run(Event::Dispatcher::RunType::Block); } @@ -209,17 +210,6 @@ void IntegrationTcpClient::write(const std::string& data) { // NOTE: We should run blocking until all the body data is flushed. } -Network::FilterStatus IntegrationTcpClient::ConnectionCallbacks::onData(Buffer::Instance& data) { - parent_.data_.append(TestUtility::bufferToString(data)); - data.drain(data.length()); - if (!parent_.data_to_wait_for_.empty() && parent_.data_.find(parent_.data_to_wait_for_) == 0) { - parent_.data_to_wait_for_.clear(); - parent_.connection_->dispatcher().exit(); - } - - return Network::FilterStatus::StopIteration; -} - void IntegrationTcpClient::ConnectionCallbacks::onEvent(uint32_t events) { if (events == Network::ConnectionEvent::RemoteClose) { parent_.disconnected_ = true; diff --git a/test/integration/integration.h b/test/integration/integration.h index 778372318dad8..46f8b7e69b132 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -13,6 +13,7 @@ #include "test/integration/fake_upstream.h" #include "test/integration/server.h" +#include "test/integration/utility.h" #include "test/test_common/environment.h" #include "test/test_common/printers.h" @@ -119,30 +120,25 @@ class IntegrationTcpClient { Network::Address::IpVersion version); void close(); - const std::string& data() { return data_; } void waitForData(const std::string& data); void waitForDisconnect(); void write(const std::string& data); + const std::string& data() { return payload_reader_->data(); } private: - struct ConnectionCallbacks : public Network::ConnectionCallbacks, - public Network::ReadFilterBaseImpl { + struct ConnectionCallbacks : public Network::ConnectionCallbacks { ConnectionCallbacks(IntegrationTcpClient& parent) : parent_(parent) {} // Network::ConnectionCallbacks void onEvent(uint32_t events) override; - // Network::ReadFilter - Network::FilterStatus onData(Buffer::Instance& data) override; - IntegrationTcpClient& parent_; }; + std::shared_ptr payload_reader_; std::shared_ptr callbacks_; Network::ClientConnectionPtr connection_; bool disconnected_{}; - std::string data_; - std::string data_to_wait_for_; }; typedef std::unique_ptr IntegrationTcpClientPtr; diff --git a/test/integration/server.h b/test/integration/server.h index ab0b011cd7950..feec1ab3037af 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -78,6 +78,45 @@ class TestComponentFactory : public ComponentFactory { namespace Stats { +/** + * This is a wrapper for Scopes for the TestIsolatedStoreImpl to ensure new scopes do + * not interact with the store without grabbing the lock from TestIsolatedStoreImpl. + */ +class TestScopeWrapper : public Scope { +public: + TestScopeWrapper(std::mutex& lock, ScopePtr wrapped_scope) + : lock_(lock), wrapped_scope_(std::move(wrapped_scope)) {} + + void deliverHistogramToSinks(const std::string& name, uint64_t value) override { + std::unique_lock lock(lock_); + wrapped_scope_->deliverHistogramToSinks(name, value); + } + + void deliverTimingToSinks(const std::string& name, std::chrono::milliseconds ms) override { + std::unique_lock lock(lock_); + wrapped_scope_->deliverTimingToSinks(name, ms); + } + + Counter& counter(const std::string& name) override { + std::unique_lock lock(lock_); + return wrapped_scope_->counter(name); + } + + Gauge& gauge(const std::string& name) override { + std::unique_lock lock(lock_); + return wrapped_scope_->gauge(name); + } + + Timer& timer(const std::string& name) override { + std::unique_lock lock(lock_); + return wrapped_scope_->timer(name); + } + +private: + std::mutex& lock_; + ScopePtr wrapped_scope_; +}; + /** * This is a variant of the isolated store that has locking across all operations so that it can * be used during the integration tests. @@ -111,7 +150,7 @@ class TestIsolatedStoreImpl : public StoreRoot { } ScopePtr createScope(const std::string& name) override { std::unique_lock lock(lock_); - return store_.createScope(name); + return ScopePtr{new TestScopeWrapper(lock_, store_.createScope(name))}; } // Stats::StoreRoot diff --git a/test/integration/ssl_integration_test.cc b/test/integration/ssl_integration_test.cc index b4316502d2f0f..85f5fc905841e 100644 --- a/test/integration/ssl_integration_test.cc +++ b/test/integration/ssl_integration_test.cc @@ -8,6 +8,7 @@ #include "common/ssl/context_config_impl.h" #include "common/ssl/context_manager_impl.h" +#include "test/integration/ssl_utility.h" #include "test/test_common/network_utility.h" #include "gmock/gmock.h" @@ -36,10 +37,10 @@ void SslIntegrationTest::SetUp() { version_), version_); registerTestServerPorts({"http"}); - client_ssl_ctx_plain_ = createClientSslContext(false, false); - client_ssl_ctx_alpn_ = createClientSslContext(true, false); - client_ssl_ctx_san_ = createClientSslContext(false, true); - client_ssl_ctx_alpn_san_ = createClientSslContext(true, true); + client_ssl_ctx_plain_ = createClientSslContext(false, false, *context_manager_); + client_ssl_ctx_alpn_ = createClientSslContext(true, false, *context_manager_); + client_ssl_ctx_san_ = createClientSslContext(false, true, *context_manager_); + client_ssl_ctx_alpn_san_ = createClientSslContext(true, true, *context_manager_); } void SslIntegrationTest::TearDown() { @@ -68,59 +69,8 @@ ServerContextPtr SslIntegrationTest::createUpstreamSslContext() { return context_manager_->createSslServerContext(*upstream_stats_store, cfg); } -ClientContextPtr SslIntegrationTest::createClientSslContext(bool alpn, bool san) { - std::string json_plain = R"EOF( -{ - "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", - "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", - "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem" -} -)EOF"; - - std::string json_alpn = R"EOF( -{ - "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", - "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", - "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", - "alpn_protocols": "h2,http/1.1" -} -)EOF"; - - std::string json_san = R"EOF( -{ - "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", - "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", - "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", - "verify_subject_alt_name": [ "spiffe://lyft.com/backend-team" ] -} -)EOF"; - - std::string json_alpn_san = R"EOF( -{ - "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", - "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", - "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", - "alpn_protocols": "h2,http/1.1", - "verify_subject_alt_name": [ "spiffe://lyft.com/backend-team" ] -} -)EOF"; - - std::string target; - if (alpn) { - target = san ? json_alpn_san : json_alpn; - } else { - target = san ? json_san : json_plain; - } - Json::ObjectSharedPtr loader = TestEnvironment::jsonLoadFromString(target); - ContextConfigImpl cfg(*loader); - static auto* client_stats_store = new Stats::TestIsolatedStoreImpl(); - return context_manager_->createSslClientContext(*client_stats_store, cfg); -} - Network::ClientConnectionPtr SslIntegrationTest::makeSslClientConnection(bool alpn, bool san) { - Network::Address::InstanceConstSharedPtr address = - Network::Utility::resolveUrl("tcp://" + Network::Test::getLoopbackAddressUrlString(version_) + - ":" + std::to_string(lookupPort("http"))); + Network::Address::InstanceConstSharedPtr address = getSslAddress(version_, lookupPort("http")); if (alpn) { return dispatcher_->createSslClientConnection( san ? *client_ssl_ctx_alpn_san_ : *client_ssl_ctx_alpn_, address); diff --git a/test/integration/ssl_integration_test.h b/test/integration/ssl_integration_test.h index a9a0fd61b07c7..0f295ce24ff9c 100644 --- a/test/integration/ssl_integration_test.h +++ b/test/integration/ssl_integration_test.h @@ -53,7 +53,6 @@ class SslIntegrationTest : public BaseIntegrationTest, Network::ClientConnectionPtr makeSslClientConnection(bool alpn, bool san); ServerContextPtr createUpstreamSslContext(); - ClientContextPtr createClientSslContext(bool alpn, bool san); void checkStats(); private: diff --git a/test/integration/ssl_utility.cc b/test/integration/ssl_utility.cc new file mode 100644 index 0000000000000..a59ed24c3e292 --- /dev/null +++ b/test/integration/ssl_utility.cc @@ -0,0 +1,72 @@ +#include "test/integration/ssl_utility.h" + +#include "common/json/json_loader.h" +#include "common/network/utility.h" +#include "common/ssl/context_config_impl.h" +#include "common/ssl/context_manager_impl.h" + +#include "test/integration/server.h" +#include "test/test_common/environment.h" +#include "test/test_common/network_utility.h" + +namespace Envoy { +namespace Ssl { + +ClientContextPtr createClientSslContext(bool alpn, bool san, ContextManager& context_manager) { + const std::string json_plain = R"EOF( +{ + "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", + "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", + "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem" +} +)EOF"; + + const std::string json_alpn = R"EOF( +{ + "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", + "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", + "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", + "alpn_protocols": "h2,http/1.1" +} +)EOF"; + + const std::string json_san = R"EOF( +{ + "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", + "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", + "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", + "verify_subject_alt_name": [ "spiffe://lyft.com/backend-team" ] +} +)EOF"; + + const std::string json_alpn_san = R"EOF( +{ + "ca_cert_file": "{{ test_rundir }}/test/config/integration/certs/cacert.pem", + "cert_chain_file": "{{ test_rundir }}/test/config/integration/certs/clientcert.pem", + "private_key_file": "{{ test_rundir }}/test/config/integration/certs/clientkey.pem", + "alpn_protocols": "h2,http/1.1", + "verify_subject_alt_name": [ "spiffe://lyft.com/backend-team" ] +} +)EOF"; + + std::string target; + if (alpn) { + target = san ? json_alpn_san : json_alpn; + } else { + target = san ? json_san : json_plain; + } + Json::ObjectSharedPtr loader = TestEnvironment::jsonLoadFromString(target); + ContextConfigImpl cfg(*loader); + static auto* client_stats_store = new Stats::TestIsolatedStoreImpl(); + return context_manager.createSslClientContext(*client_stats_store, cfg); +} + +Network::Address::InstanceConstSharedPtr getSslAddress(const Network::Address::IpVersion& version, + int port) { + std::string url = + "tcp://" + Network::Test::getLoopbackAddressUrlString(version) + ":" + std::to_string(port); + return Network::Utility::resolveUrl(url); +} + +} // Ssl +} // Envoy diff --git a/test/integration/ssl_utility.h b/test/integration/ssl_utility.h new file mode 100644 index 0000000000000..490aba3bf7eb6 --- /dev/null +++ b/test/integration/ssl_utility.h @@ -0,0 +1,15 @@ +#pragma once + +#include "envoy/network/address.h" +#include "envoy/ssl/context_manager.h" + +namespace Envoy { +namespace Ssl { + +ClientContextPtr createClientSslContext(bool alpn, bool san, ContextManager& context_manager); + +Network::Address::InstanceConstSharedPtr getSslAddress(const Network::Address::IpVersion& version, + int port); + +} // Ssl +} // Envoy diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc new file mode 100644 index 0000000000000..01ee4b7c24c16 --- /dev/null +++ b/test/integration/tcp_proxy_integration_test.cc @@ -0,0 +1,133 @@ +#include "test/integration/tcp_proxy_integration_test.h" + +#include "common/network/utility.h" +#include "common/ssl/context_manager_impl.h" + +#include "test/integration/ssl_utility.h" +#include "test/integration/utility.h" +#include "test/mocks/runtime/mocks.h" + +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +INSTANTIATE_TEST_CASE_P(IpVersions, TcpProxyIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +// Test proxying data in both directions, and that all data is flushed properly +// when there is an upstream disconnect. +TEST_P(TcpProxyIntegrationTest, TcpProxyUpstreamDisconnect) { + IntegrationTcpClientPtr tcp_client; + FakeRawConnectionPtr fake_upstream_connection; + FakeRawConnectionPtr fake_rest_connection; + executeActions({ + [&]() -> void { tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); }, + [&]() -> void { tcp_client->write("hello"); }, + [&]() -> void { fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); }, + [&]() -> void { fake_upstream_connection->waitForData(5); }, + [&]() -> void { fake_upstream_connection->write("world"); }, + [&]() -> void { fake_upstream_connection->close(); }, + [&]() -> void { fake_upstream_connection->waitForDisconnect(); }, + [&]() -> void { tcp_client->waitForDisconnect(); }, + + // Clean up unused client_ssl_auth + [&]() -> void { fake_rest_connection = fake_upstreams_[1]->waitForRawConnection(); }, + [&]() -> void { fake_rest_connection->close(); }, + [&]() -> void { fake_rest_connection->waitForDisconnect(); }, + }); + + EXPECT_EQ("world", tcp_client->data()); +} + +// Test proxying data in both directions, and that all data is flushed properly +// when the client disconnects. +TEST_P(TcpProxyIntegrationTest, TcpProxyDownstreamDisconnect) { + IntegrationTcpClientPtr tcp_client; + FakeRawConnectionPtr fake_upstream_connection; + FakeRawConnectionPtr fake_rest_connection; + executeActions({ + [&]() -> void { tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); }, + [&]() -> void { tcp_client->write("hello"); }, + [&]() -> void { fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); }, + [&]() -> void { fake_upstream_connection->waitForData(5); }, + [&]() -> void { fake_upstream_connection->write("world"); }, + [&]() -> void { tcp_client->waitForData("world"); }, + [&]() -> void { tcp_client->write("hello"); }, [&]() -> void { tcp_client->close(); }, + [&]() -> void { fake_upstream_connection->waitForData(10); }, + [&]() -> void { fake_upstream_connection->waitForDisconnect(); }, + + // Clean up unused client_ssl_auth + [&]() -> void { fake_rest_connection = fake_upstreams_[1]->waitForRawConnection(); }, + [&]() -> void { fake_rest_connection->close(); }, + [&]() -> void { fake_rest_connection->waitForDisconnect(); }, + }); +} + +// Test proxying data in both directions with envoy doing TCP and TLS +// termination. +TEST_P(TcpProxyIntegrationTest, SendTlsToTlsListener) { + Network::ClientConnectionPtr ssl_client; + FakeRawConnectionPtr fake_upstream_connection; + FakeHttpConnectionPtr fake_rest_connection; + testing::NiceMock runtime; + std::unique_ptr context_manager(new Ssl::ContextManagerImpl(runtime)); + FakeStreamPtr request; + Ssl::ClientContextPtr context; + ConnectionStatusCallbacks connect_callbacks; + executeActions({ + // Set up the SSl client. + [&]() -> void { + Network::Address::InstanceConstSharedPtr address = + Ssl::getSslAddress(version_, lookupPort("tcp_proxy_with_tls_termination")); + context = Ssl::createClientSslContext(false, false, *context_manager); + ssl_client = dispatcher_->createSslClientConnection(*context, address); + }, + // Set up the initial REST response for the ssl_auth filter to avoid the async client doing + // reconnects. Loopback is whitelisted by default so no response payload is necessary. + [&]() -> void { + fake_rest_connection = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_); + request = fake_rest_connection->waitForNewStream(); + request->waitForEndStream(*dispatcher_); + request->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + request->encodeData(0, true); + }, + // Perform the SSL handshake. Loopback is whitelisted in tcp_proxy.json for the ssl_auth + // filter so there will be no pause waiting on auth data. + [&]() -> void { + ssl_client->connect(); + ssl_client->addConnectionCallbacks(connect_callbacks); + ssl_client->connect(); + while (!connect_callbacks.connected()) { + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + }, + // Ship some data upstream. + [&]() -> void { + Buffer::OwnedImpl buffer("hello"); + ssl_client->write(buffer); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + }, + // Make sure the data makes it upstream. + [&]() -> void { fake_upstream_connection = fake_upstreams_[0]->waitForRawConnection(); }, + [&]() -> void { fake_upstream_connection->waitForData(5); }, + // Now send data downstream and make sure it arrives. + [&]() -> void { + std::shared_ptr payload_reader( + new WaitForPayloadReader(*dispatcher_)); + ssl_client->addReadFilter(payload_reader); + fake_upstream_connection->write("world"); + payload_reader->set_data_to_wait_for("world"); + ssl_client->dispatcher().run(Event::Dispatcher::RunType::Block); + }, + // Clean up. + [&]() -> void { ssl_client->close(Network::ConnectionCloseType::NoFlush); }, + [&]() -> void { fake_upstream_connection->close(); }, + [&]() -> void { fake_upstream_connection->waitForDisconnect(); }, + [&]() -> void { fake_rest_connection->close(); }, + [&]() -> void { fake_rest_connection->waitForDisconnect(); }, + }); +} + +} // namespace +} // Envoy diff --git a/test/integration/tcp_proxy_integration_test.h b/test/integration/tcp_proxy_integration_test.h new file mode 100644 index 0000000000000..99b99dcebf722 --- /dev/null +++ b/test/integration/tcp_proxy_integration_test.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +#include "test/integration/integration.h" + +#include "gtest/gtest.h" + +namespace Envoy { +class TcpProxyIntegrationTest : public BaseIntegrationTest, + public testing::TestWithParam { +public: + TcpProxyIntegrationTest() : BaseIntegrationTest(GetParam()) {} + + void SetUp() override { + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); + registerPort("upstream_0", fake_upstreams_.back()->localAddress()->ip()->port()); + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP1, version_)); + registerPort("upstream_1", fake_upstreams_.back()->localAddress()->ip()->port()); + createTestServer("test/config/integration/tcp_proxy.json", + {"tcp_proxy", "tcp_proxy_with_tls_termination"}); + } + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } +}; +} // Envoy diff --git a/test/integration/utility.cc b/test/integration/utility.cc index 1fd61136fa123..d4f609586d2ee 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -102,4 +102,19 @@ RawConnectionDriver::~RawConnectionDriver() {} void RawConnectionDriver::run() { dispatcher_->run(Event::Dispatcher::RunType::Block); } void RawConnectionDriver::close() { client_->close(Network::ConnectionCloseType::FlushWrite); } + +WaitForPayloadReader::WaitForPayloadReader(Event::Dispatcher& dispatcher) + : dispatcher_(dispatcher) {} + +Network::FilterStatus WaitForPayloadReader::onData(Buffer::Instance& data) { + data_.append(TestUtility::bufferToString(data)); + data.drain(data.length()); + if (!data_to_wait_for_.empty() && data_.find(data_to_wait_for_) == 0) { + data_to_wait_for_.clear(); + dispatcher_.exit(); + } + + return Network::FilterStatus::StopIteration; +} + } // Envoy diff --git a/test/integration/utility.h b/test/integration/utility.h index 8b2349208f0ff..b48c10f4a2a7a 100644 --- a/test/integration/utility.h +++ b/test/integration/utility.h @@ -102,4 +102,40 @@ class IntegrationUtil { const std::string& body, Http::CodecClient::Type type, Network::Address::IpVersion version, const std::string& host = "host"); }; + +// A set of connection callbacks which tracks connection state. +class ConnectionStatusCallbacks : public Network::ConnectionCallbacks { +public: + bool connected() const { return connected_; } + bool closed() const { return closed_; } + + // Network::ConnectionCallbacks + void onEvent(uint32_t events) { + closed_ |= (events & Network::ConnectionEvent::RemoteClose || + events & Network::ConnectionEvent::LocalClose); + connected_ |= events & Network::ConnectionEvent::Connected; + } + +private: + bool connected_{false}; + bool closed_{false}; +}; + +// A read filter which waits for a given data then stops the dispatcher loop. +class WaitForPayloadReader : public Network::ReadFilterBaseImpl { +public: + WaitForPayloadReader(Event::Dispatcher& dispatcher); + + // Network::ReadFilter + Network::FilterStatus onData(Buffer::Instance& data) override; + + void set_data_to_wait_for(const std::string& data) { data_to_wait_for_ = data; } + const std::string& data() { return data_; } + +private: + Event::Dispatcher& dispatcher_; + std::string data_to_wait_for_; + std::string data_; +}; + } // Envoy