Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ To define metadata that a suitable upstream host must match, use one of the foll
and :ref:`ClusterWeight.metadata_match<envoy_v3_api_field_extensions.filters.network.tcp_proxy.v3.TcpProxy.WeightedCluster.ClusterWeight.metadata_match>`
to define required metadata for a weighted upstream cluster (metadata from the latter will be merged on top of the former).

In addition, dynamic metadata can be set by earlier network filters on the `StreamInfo`. Setting the dynamic metadata
must happen before `onNewConnection()` is called on the `TcpProxy` filter to affect load balancing.

.. _config_network_filters_tcp_proxy_stats:

Statistics
Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ New Features
* stats: allow configuring histogram buckets for stats sinks and admin endpoints that support it.
* tap: added :ref:`generic body matcher<envoy_v3_api_msg_config.tap.v3.HttpGenericBodyMatch>` to scan http requests and responses for text or hex patterns.
* tcp: switched the TCP connection pool to the new "shared" connection pool, sharing a common code base with HTTP and HTTP/2. Any unexpected behavioral changes can be temporarily reverted by setting `envoy.reloadable_features.new_tcp_connection_pool` to false.
* tcp_proxy: allow earlier network filters to set metadataMatchCriteria on the connection StreamInfo to influence load balancing.
* watchdog: support randomizing the watchdog's kill timeout to prevent synchronized kills via a maximium jitter parameter :ref:`max_kill_timeout_jitter<envoy_v3_api_field_config.bootstrap.v3.Watchdog.max_kill_timeout_jitter>`.
* watchdog: supports an extension point where actions can be registered to fire on watchdog events such as miss, megamiss, kill and multikill. See ref:`watchdog actions<envoy_v3_api_field_config.bootstrap.v3.Watchdog.actions>`.
* xds: added :ref:`extension config discovery<envoy_v3_api_msg_config.core.v3.ExtensionConfigSource>` support for HTTP filters.
Expand Down
19 changes: 19 additions & 0 deletions source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,25 @@ void Filter::onPoolReady(Http::RequestEncoder& request_encoder,
info.downstreamSslConnection());
}

const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() {
const Router::MetadataMatchCriteria* route_criteria =
(route_ != nullptr) ? route_->metadataMatchCriteria() : nullptr;

const auto& request_metadata = getStreamInfo().dynamicMetadata().filter_metadata();
const auto filter_it = request_metadata.find(Envoy::Config::MetadataFilters::get().ENVOY_LB);

if (filter_it != request_metadata.end() && route_criteria != nullptr) {
metadata_match_criteria_ = route_criteria->mergeMatchCriteria(filter_it->second);
return metadata_match_criteria_.get();
} else if (filter_it != request_metadata.end()) {
metadata_match_criteria_ =
std::make_unique<Router::MetadataMatchCriteriaImpl>(filter_it->second);
return metadata_match_criteria_.get();
} else {
return route_criteria;
}
}

void Filter::onConnectTimeout() {
ENVOY_CONN_LOG(debug, "connect timeout", read_callbacks_->connection());
read_callbacks_->upstreamHost()->outlierDetector().putResult(
Expand Down
10 changes: 2 additions & 8 deletions source/common/tcp_proxy/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,7 @@ class Filter : public Network::ReadFilter,
Ssl::ConnectionInfoConstSharedPtr ssl_info);

// Upstream::LoadBalancerContext
const Router::MetadataMatchCriteria* metadataMatchCriteria() override {
if (route_) {
return route_->metadataMatchCriteria();
}
return nullptr;
}

// Upstream::LoadBalancerContext
const Router::MetadataMatchCriteria* metadataMatchCriteria() override;
absl::optional<uint64_t> computeHashKey() override {
auto hash_policy = config_->hashPolicy();
if (hash_policy) {
Expand Down Expand Up @@ -376,6 +369,7 @@ class Filter : public Network::ReadFilter,
// read filter.
std::unique_ptr<GenericUpstream> upstream_;
RouteConstSharedPtr route_;
Router::MetadataMatchCriteriaConstPtr metadata_match_criteria_;
Network::TransportSocketOptionsSharedPtr transport_socket_options_;
uint32_t connect_attempts_{};
bool connecting_{};
Expand Down
94 changes: 94 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,100 @@ TEST_F(TcpProxyTest, WeightedClusterWithMetadataMatch) {
}
}

// Test that metadata match criteria provided on the StreamInfo is used.
TEST_F(TcpProxyTest, StreamInfoDynamicMetadata) {
configure(defaultConfig());

ProtobufWkt::Value val;
val.set_string_value("val");

envoy::config::core::v3::Metadata metadata;
ProtobufWkt::Struct& map =
(*metadata.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB];
(*map.mutable_fields())["test"] = val;
EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata())
.WillOnce(ReturnRef(metadata));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

Upstream::LoadBalancerContext* context;

EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
.WillOnce(DoAll(SaveArg<2>(&context), Return(nullptr)));
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());

EXPECT_NE(nullptr, context);

const auto effective_criteria = context->metadataMatchCriteria();
EXPECT_NE(nullptr, effective_criteria);

const auto& effective_criterions = effective_criteria->metadataMatchCriteria();
EXPECT_EQ(1, effective_criterions.size());

EXPECT_EQ("test", effective_criterions[0]->name());
EXPECT_EQ(HashedValue(val), effective_criterions[0]->value());
}

// Test that if both streamInfo and configuration add metadata match criteria, they
// are merged.
TEST_F(TcpProxyTest, StreamInfoDynamicMetadataAndConfigMerged) {
const std::string yaml = R"EOF(
stat_prefix: name
weighted_clusters:
clusters:
- name: cluster1
weight: 1
metadata_match:
filter_metadata:
envoy.lb:
k0: v0
k1: from_config
)EOF";

config_ = std::make_shared<Config>(constructConfigFromYaml(yaml, factory_context_));

ProtobufWkt::Value v0, v1, v2;
v0.set_string_value("v0");
v1.set_string_value("from_streaminfo"); // 'v1' is overridden with this value by streamInfo.
v2.set_string_value("v2");
HashedValue hv0(v0), hv1(v1), hv2(v2);

envoy::config::core::v3::Metadata metadata;
ProtobufWkt::Struct& map =
(*metadata.mutable_filter_metadata())[Envoy::Config::MetadataFilters::get().ENVOY_LB];
(*map.mutable_fields())["k1"] = v1;
(*map.mutable_fields())["k2"] = v2;
EXPECT_CALL(filter_callbacks_.connection_.stream_info_, dynamicMetadata())
.WillOnce(ReturnRef(metadata));

filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
filter_->initializeReadFilterCallbacks(filter_callbacks_);

Upstream::LoadBalancerContext* context;

EXPECT_CALL(factory_context_.cluster_manager_, tcpConnPoolForCluster(_, _, _))
.WillOnce(DoAll(SaveArg<2>(&context), Return(nullptr)));
EXPECT_EQ(Network::FilterStatus::StopIteration, filter_->onNewConnection());

EXPECT_NE(nullptr, context);

const auto effective_criteria = context->metadataMatchCriteria();
EXPECT_NE(nullptr, effective_criteria);

const auto& effective_criterions = effective_criteria->metadataMatchCriteria();
EXPECT_EQ(3, effective_criterions.size());

EXPECT_EQ("k0", effective_criterions[0]->name());
EXPECT_EQ(hv0, effective_criterions[0]->value());

EXPECT_EQ("k1", effective_criterions[1]->name());
EXPECT_EQ(hv1, effective_criterions[1]->value());

EXPECT_EQ("k2", effective_criterions[2]->name());
EXPECT_EQ(hv2, effective_criterions[2]->value());
}

TEST_F(TcpProxyTest, DEPRECATED_FEATURE_TEST(DisconnectBeforeData)) {
configure(defaultConfig());
filter_ = std::make_unique<Filter>(config_, factory_context_.cluster_manager_);
Expand Down
8 changes: 8 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,11 @@ envoy_cc_test(
],
)

envoy_proto_library(
name = "tcp_proxy_integration_proto",
srcs = [":tcp_proxy_integration_test.proto"],
)

envoy_cc_test(
name = "tcp_proxy_integration_test",
srcs = [
Expand All @@ -991,17 +996,20 @@ envoy_cc_test(
tags = ["fails_on_windows"],
deps = [
":integration_lib",
":tcp_proxy_integration_proto_cc_proto",
"//source/common/config:api_version_lib",
"//source/common/event:dispatcher_includes",
"//source/common/event:dispatcher_lib",
"//source/common/network:utility_lib",
"//source/extensions/access_loggers/file:config",
"//source/extensions/filters/network/common:factory_base_lib",
"//source/extensions/filters/network/tcp_proxy:config",
"//source/extensions/transport_sockets/tls:config",
"//source/extensions/transport_sockets/tls:context_config_lib",
"//source/extensions/transport_sockets/tls:context_lib",
"//test/mocks/runtime:runtime_mocks",
"//test/mocks/secret:secret_mocks",
"//test/test_common:registry_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/config/cluster/v3:pkg_cc_proto",
Expand Down
3 changes: 2 additions & 1 deletion test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string*
};
ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes);
if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
return AssertionFailure() << "Timed out waiting for data.";
return AssertionFailure() << fmt::format(
"Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes);
}
if (data != nullptr) {
*data = data_;
Expand Down
Loading