diff --git a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst index 1822e08715c9d..7102a47cc6a78 100644 --- a/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst +++ b/docs/root/configuration/listeners/network_filters/tcp_proxy_filter.rst @@ -34,6 +34,9 @@ To define metadata that a suitable upstream host must match, use one of the foll and :ref:`ClusterWeight.metadata_match` to define required metadata for a weighted upstream cluster (metadata from the latter will be merged on top of the former). +In addition, dynamic metadata can be set by earlier network filters on the `StreamInfo`. Setting the dynamic metadata +must happen before `onNewConnection()` is called on the `TcpProxy` filter to affect load balancing. + .. _config_network_filters_tcp_proxy_stats: Statistics diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index f1d42573eabf2..ad4e70ba4af99 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -77,6 +77,7 @@ New Features * stats: allow configuring histogram buckets for stats sinks and admin endpoints that support it. * tap: added :ref:`generic body matcher` to scan http requests and responses for text or hex patterns. * tcp: switched the TCP connection pool to the new "shared" connection pool, sharing a common code base with HTTP and HTTP/2. Any unexpected behavioral changes can be temporarily reverted by setting `envoy.reloadable_features.new_tcp_connection_pool` to false. +* tcp_proxy: allow earlier network filters to set metadataMatchCriteria on the connection StreamInfo to influence load balancing. * watchdog: support randomizing the watchdog's kill timeout to prevent synchronized kills via a maximium jitter parameter :ref:`max_kill_timeout_jitter`. * watchdog: supports an extension point where actions can be registered to fire on watchdog events such as miss, megamiss, kill and multikill. See ref:`watchdog actions`. * xds: added :ref:`extension config discovery` support for HTTP filters. diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 92dd68e4be4ab..5aa4b9d64cc49 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -538,6 +538,25 @@ void Filter::onPoolReady(Http::RequestEncoder& request_encoder, info.downstreamSslConnection()); } +const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() { + const Router::MetadataMatchCriteria* route_criteria = + (route_ != nullptr) ? route_->metadataMatchCriteria() : nullptr; + + const auto& request_metadata = getStreamInfo().dynamicMetadata().filter_metadata(); + const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB); + + if (filter_it != request_metadata.end() && route_criteria != nullptr) { + metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second); + return metadata_match_criteria_.get(); + } else if (filter_it != request_metadata.end()) { + metadata_match_criteria_ = + std::make_unique(filter_it->second); + return metadata_match_criteria_.get(); + } else { + return route_criteria; + } +} + void Filter::onConnectTimeout() { ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection()); read_callbacks_->upstreamHost()->outlierDetector().putResult( diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 8a402e8a4cd20..1bb73ddd377f8 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -265,14 +265,7 @@ class Filter : public Network::ReadFilter, Ssl::ConnectionInfoConstSharedPtr ssl_info); // Upstream::LoadBalancerContext - const Router::MetadataMatchCriteria* metadataMatchCriteria() override { - if (route_) { - return route_->metadataMatchCriteria(); - } - return nullptr; - } - - // Upstream::LoadBalancerContext + const Router::MetadataMatchCriteria* metadataMatchCriteria() override; absl::optional computeHashKey() override { auto hash_policy = config_->hashPolicy(); if (hash_policy) { @@ -376,6 +369,7 @@ class Filter : public Network::ReadFilter, // read filter. std::unique_ptr upstream_; RouteConstSharedPtr route_; + Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_; Network::TransportSocketOptionsSharedPtr transport_socket_options_; uint32_t connect_attempts_{}; bool connecting_{}; diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index 3658360dcad07..6ca861a28799f 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -1361,6 +1361,100 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) { } } +// Test that metadata match criteria provided on the StreamInfo is used. +TEST_F(TcpProxyTest, StreamInfoDynamicMetadata) { + configure(defaultConfig()); + + ProtobufWkt::Value val; + val.set_string_value("val"); + + envoy::config::core::v3::Metadata metadata; + ProtobufWkt::Struct& map = + (*metadata.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB]; + (*map.mutable_fields())["test"] = val; + EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata()) + .WillOnce(ReturnRef(metadata)); + + filter_ = std::make_unique(config_, factory_context_.cluster_manager_); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + + Upstream::LoadBalancerContext* context; + + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _)) + .WillOnce(DoAll(SaveArg<2>(&context), Return(nullptr))); + EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); + + EXPECT_NE(nullptr, context); + + const auto effective_criteria = context->metadataMatchCriteria(); + EXPECT_NE(nullptr, effective_criteria); + + const auto& effective_criterions = effective_criteria->metadataMatchCriteria(); + EXPECT_EQ(1, effective_criterions.size()); + + EXPECT_EQ("test", effective_criterions[0]->name()); + EXPECT_EQ(HashedValue(val), effective_criterions[0]->value()); +} + +// Test that if both streamInfo and configuration add metadata match criteria, they +// are merged. +TEST_F(TcpProxyTest, StreamInfoDynamicMetadataAndConfigMerged) { + const std::string yaml = R"EOF( + stat_prefix: name + weighted_clusters: + clusters: + - name: cluster1 + weight: 1 + metadata_match: + filter_metadata: + envoy.lb: + k0: v0 + k1: from_config +)EOF"; + + config_ = std::make_shared(constructConfigFromYaml(yaml, factory_context_)); + + ProtobufWkt::Value v0, v1, v2; + v0.set_string_value("v0"); + v1.set_string_value("from_streaminfo"); // 'v1' is overridden with this value by streamInfo. + v2.set_string_value("v2"); + HashedValue hv0(v0), hv1(v1), hv2(v2); + + envoy::config::core::v3::Metadata metadata; + ProtobufWkt::Struct& map = + (*metadata.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB]; + (*map.mutable_fields())["k1"] = v1; + (*map.mutable_fields())["k2"] = v2; + EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata()) + .WillOnce(ReturnRef(metadata)); + + filter_ = std::make_unique(config_, factory_context_.cluster_manager_); + filter_->initializeReadFilterCallbacks(filter_callbacks_); + + Upstream::LoadBalancerContext* context; + + EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _)) + .WillOnce(DoAll(SaveArg<2>(&context), Return(nullptr))); + EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection()); + + EXPECT_NE(nullptr, context); + + const auto effective_criteria = context->metadataMatchCriteria(); + EXPECT_NE(nullptr, effective_criteria); + + const auto& effective_criterions = effective_criteria->metadataMatchCriteria(); + EXPECT_EQ(3, effective_criterions.size()); + + EXPECT_EQ("k0", effective_criterions[0]->name()); + EXPECT_EQ(hv0, effective_criterions[0]->value()); + + EXPECT_EQ("k1", effective_criterions[1]->name()); + EXPECT_EQ(hv1, effective_criterions[1]->value()); + + EXPECT_EQ("k2", effective_criterions[2]->name()); + EXPECT_EQ(hv2, effective_criterions[2]->value()); +} + TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DisconnectBeforeData)) { configure(defaultConfig()); filter_ = std::make_unique(config_, factory_context_.cluster_manager_); diff --git a/test/integration/BUILD b/test/integration/BUILD index c789417f3e518..c871206720a39 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -978,6 +978,11 @@ envoy_cc_test( ], ) +envoy_proto_library( + name = "tcp_proxy_integration_proto", + srcs = [":tcp_proxy_integration_test.proto"], +) + envoy_cc_test( name = "tcp_proxy_integration_test", srcs = [ @@ -991,17 +996,20 @@ envoy_cc_test( tags = ["fails_on_windows"], deps = [ ":integration_lib", + ":tcp_proxy_integration_proto_cc_proto", "//source/common/config:api_version_lib", "//source/common/event:dispatcher_includes", "//source/common/event:dispatcher_lib", "//source/common/network:utility_lib", "//source/extensions/access_loggers/file:config", + "//source/extensions/filters/network/common:factory_base_lib", "//source/extensions/filters/network/tcp_proxy:config", "//source/extensions/transport_sockets/tls:config", "//source/extensions/transport_sockets/tls:context_config_lib", "//source/extensions/transport_sockets/tls:context_lib", "//test/mocks/runtime:runtime_mocks", "//test/mocks/secret:secret_mocks", + "//test/test_common:registry_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto", "@envoy_api//envoy/config/cluster/v3:pkg_cc_proto", diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index ee2d25ce0b4fc..c66f3fe056d50 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -623,7 +623,8 @@ AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* }; ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes); if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { - return AssertionFailure() << "Timed out waiting for data."; + return AssertionFailure() << fmt::format( + "Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes); } if (data != nullptr) { *data = data_; diff --git a/test/integration/tcp_proxy_integration_test.cc b/test/integration/tcp_proxy_integration_test.cc index 5533308b5dc4b..5314dbae87d26 100644 --- a/test/integration/tcp_proxy_integration_test.cc +++ b/test/integration/tcp_proxy_integration_test.cc @@ -12,10 +12,14 @@ #include "common/config/api_version.h" #include "common/network/utility.h" +#include "extensions/filters/network/common/factory_base.h" #include "extensions/transport_sockets/tls/context_manager_impl.h" #include "test/integration/ssl_utility.h" +#include "test/integration/tcp_proxy_integration_test.pb.h" +#include "test/integration/tcp_proxy_integration_test.pb.validate.h" #include "test/integration/utility.h" +#include "test/test_common/registry.h" #include "gtest/gtest.h" @@ -557,15 +561,19 @@ TEST_P(TcpProxyIntegrationTest, TestCloseOnHealthFailure) { class TcpProxyMetadataMatchIntegrationTest : public TcpProxyIntegrationTest { public: + TcpProxyMetadataMatchIntegrationTest(uint32_t tcp_proxy_filter_index = 0) + : tcp_proxy_filter_index_(tcp_proxy_filter_index) {} void initialize() override; - void expectEndpointToMatchRoute(); - void expectEndpointNotToMatchRoute(); + void expectEndpointToMatchRoute( + std::function initial_data_cb = nullptr); + void expectEndpointNotToMatchRoute(const std::string& write_data = "hello"); envoy::config::core::v3::Metadata lbMetadata(std::map values); envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy tcp_proxy_; envoy::config::core::v3::Metadata endpoint_metadata_; + const uint32_t tcp_proxy_filter_index_; }; envoy::config::core::v3::Metadata @@ -594,7 +602,7 @@ void TcpProxyMetadataMatchIntegrationTest::initialize() { ASSERT(static_resources->listeners_size() == 1); static_resources->mutable_listeners(0) ->mutable_filter_chains(0) - ->mutable_filters(0) + ->mutable_filters(tcp_proxy_filter_index_) ->mutable_typed_config() ->PackFrom(tcp_proxy_); @@ -624,16 +632,23 @@ void TcpProxyMetadataMatchIntegrationTest::initialize() { } // Verifies successful connection. -void TcpProxyMetadataMatchIntegrationTest::expectEndpointToMatchRoute() { +void TcpProxyMetadataMatchIntegrationTest::expectEndpointToMatchRoute( + std::function initial_data_cb) { IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); - ASSERT_TRUE(tcp_client->write("hello")); + std::string expected_upstream_data; + if (initial_data_cb) { + expected_upstream_data = initial_data_cb(*tcp_client); + } else { + expected_upstream_data = "hello"; + ASSERT_TRUE(tcp_client->write(expected_upstream_data)); + } FakeRawConnectionPtr fake_upstream_connection; ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); - ASSERT_TRUE(fake_upstream_connection->waitForData(5)); + ASSERT_TRUE(fake_upstream_connection->waitForData(expected_upstream_data.length())); ASSERT_TRUE(fake_upstream_connection->write("world")); tcp_client->waitForData("world"); ASSERT_TRUE(tcp_client->write("hello", true)); - ASSERT_TRUE(fake_upstream_connection->waitForData(10)); + ASSERT_TRUE(fake_upstream_connection->waitForData(5 + expected_upstream_data.length())); ASSERT_TRUE(fake_upstream_connection->waitForHalfClose()); ASSERT_TRUE(fake_upstream_connection->write("", true)); ASSERT_TRUE(fake_upstream_connection->waitForDisconnect()); @@ -643,9 +658,10 @@ void TcpProxyMetadataMatchIntegrationTest::expectEndpointToMatchRoute() { } // Verifies connection failure. -void TcpProxyMetadataMatchIntegrationTest::expectEndpointNotToMatchRoute() { +void TcpProxyMetadataMatchIntegrationTest::expectEndpointNotToMatchRoute( + const std::string& write_data) { IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy")); - ASSERT_TRUE(tcp_client->write("hello", false, false)); + ASSERT_TRUE(tcp_client->write(write_data, false, false)); // TODO(yskopets): 'tcp_client->waitForDisconnect();' gets stuck indefinitely on Linux builds, // e.g. on 'envoy-linux (bazel compile_time_options)' and 'envoy-linux (bazel release)' @@ -843,6 +859,134 @@ TEST_P(TcpProxyMetadataMatchIntegrationTest, expectEndpointNotToMatchRoute(); } +class InjectDynamicMetadata : public Network::ReadFilter { +public: + explicit InjectDynamicMetadata(const std::string& key) : key_(key) {} + + Network::FilterStatus onData(Buffer::Instance& data, bool) override { + if (!metadata_set_) { + // To allow testing a write that returns `StopIteration`, only proceed + // when more than 1 byte is received. + if (data.length() < 2) { + ASSERT(data.length() == 1); + + // Echo data back to test can verify it was received. + Buffer::OwnedImpl copy(data); + read_callbacks_->connection().write(copy, false); + return Network::FilterStatus::StopIteration; + } + + ProtobufWkt::Value val; + val.set_string_value(data.toString()); + + ProtobufWkt::Struct& map = + (*read_callbacks_->connection() + .streamInfo() + .dynamicMetadata() + .mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB]; + (*map.mutable_fields())[key_] = val; + + // Put this back in the state that TcpProxy expects. + read_callbacks_->connection().readDisable(true); + + metadata_set_ = true; + } + return Network::FilterStatus::Continue; + } + + Network::FilterStatus onNewConnection() override { + // TcpProxy disables read; must re-enable so we can read headers. + read_callbacks_->connection().readDisable(false); + + // Stop until we read the value and can set the metadata for TcpProxy. + return Network::FilterStatus::StopIteration; + } + + void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override { + read_callbacks_ = &callbacks; + } + + const std::string key_; + Network::ReadFilterCallbacks* read_callbacks_{}; + bool metadata_set_{false}; +}; + +class InjectDynamicMetadataFactory : public Extensions::NetworkFilters::Common::FactoryBase< + test::integration::tcp_proxy::InjectDynamicMetadata> { +public: + InjectDynamicMetadataFactory() : FactoryBase("test.inject_dynamic_metadata") {} + +private: + Network::FilterFactoryCb + createFilterFactoryFromProtoTyped(const test::integration::tcp_proxy::InjectDynamicMetadata& cfg, + Server::Configuration::FactoryContext&) override { + std::string key = cfg.key(); + return [key = std::move(key)](Network::FilterManager& filter_manager) -> void { + filter_manager.addReadFilter(std::make_shared(key)); + }; + } +}; + +class TcpProxyDynamicMetadataMatchIntegrationTest : public TcpProxyMetadataMatchIntegrationTest { +public: + TcpProxyDynamicMetadataMatchIntegrationTest() : TcpProxyMetadataMatchIntegrationTest(1) { + config_helper_.addNetworkFilter(R"EOF( + name: test.inject_dynamic_metadata + typed_config: + "@type": type.googleapis.com/test.integration.tcp_proxy.InjectDynamicMetadata + key: role +)EOF"); + } + + InjectDynamicMetadataFactory factory_; + Registry::InjectFactory register_factory_{ + factory_}; +}; + +INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxyDynamicMetadataMatchIntegrationTest, + testing::ValuesIn(getProtocolTestParams()), protocolTestParamsToString); + +TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataMatch) { + tcp_proxy_.set_stat_prefix("tcp_stats"); + + // Note: role isn't set here; it will be set in the dynamic metadata. + tcp_proxy_.mutable_metadata_match()->MergeFrom( + lbMetadata({{"version", "v1"}, {"stage", "prod"}})); + auto* cluster_0 = tcp_proxy_.mutable_weighted_clusters()->add_clusters(); + cluster_0->set_name("cluster_0"); + cluster_0->set_weight(1); + endpoint_metadata_ = lbMetadata({{"role", "primary"}, {"version", "v1"}, {"stage", "prod"}}); + + initialize(); + + expectEndpointToMatchRoute([](IntegrationTcpClient& tcp_client) -> std::string { + // Break the write into two; validate that the first is received before sending the second. This + // validates that a downstream filter can use this functionality, even if it can't make a + // decision after the first `onData()`. + EXPECT_TRUE(tcp_client.write("p", false)); + tcp_client.waitForData("p"); + tcp_client.clearData(); + EXPECT_TRUE(tcp_client.write("rimary", false)); + return "primary"; + }); +} + +TEST_P(TcpProxyDynamicMetadataMatchIntegrationTest, DynamicMetadataNonMatch) { + tcp_proxy_.set_stat_prefix("tcp_stats"); + + // Note: role isn't set here; it will be set in the dynamic metadata. + tcp_proxy_.mutable_metadata_match()->MergeFrom( + lbMetadata({{"version", "v1"}, {"stage", "prod"}})); + auto* cluster_0 = tcp_proxy_.mutable_weighted_clusters()->add_clusters(); + cluster_0->set_name("cluster_0"); + cluster_0->set_weight(1); + endpoint_metadata_ = lbMetadata({{"role", "primary"}, {"version", "v1"}, {"stage", "prod"}}); + + initialize(); + + expectEndpointNotToMatchRoute("does_not_match_role_primary"); +} + INSTANTIATE_TEST_SUITE_P(TcpProxyIntegrationTestParams, TcpProxySslIntegrationTest, testing::ValuesIn(getProtocolTestParams()), protocolTestParamsToString); diff --git a/test/integration/tcp_proxy_integration_test.proto b/test/integration/tcp_proxy_integration_test.proto new file mode 100644 index 0000000000000..6167ab80c903a --- /dev/null +++ b/test/integration/tcp_proxy_integration_test.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package test.integration.tcp_proxy; + +message InjectDynamicMetadata { + string key = 1; +}