diff --git a/include/envoy/http/async_client.h b/include/envoy/http/async_client.h index 5121a3c7b969f..b38eabc2d76c0 100644 --- a/include/envoy/http/async_client.h +++ b/include/envoy/http/async_client.h @@ -205,6 +205,13 @@ class AsyncClient { parent_context = v; return *this; } + // Set dynamic metadata of async stream. If a metadata record with filter name 'envoy.lb' is + // provided, metadata match criteria of async stream route will be overridden by the metadata + // and then used by the subset load balancer. + StreamOptions& setMetadata(const envoy::config::core::v3::Metadata& m) { + metadata = m; + return *this; + } // For gmock test bool operator==(const StreamOptions& src) const { @@ -230,6 +237,8 @@ class AsyncClient { // Provides parent context. Currently, this holds stream info from the caller. ParentContext parent_context; + + envoy::config::core::v3::Metadata metadata; }; /** @@ -261,6 +270,10 @@ class AsyncClient { StreamOptions::setParentContext(v); return *this; } + RequestOptions& setMetadata(const envoy::config::core::v3::Metadata& m) { + StreamOptions::setMetadata(m); + return *this; + } RequestOptions& setParentSpan(Tracing::Span& parent_span) { parent_span_ = &parent_span; return *this; diff --git a/source/common/http/async_client_impl.cc b/source/common/http/async_client_impl.cc index ffe65b5ed62f5..044bf497b4c42 100644 --- a/source/common/http/async_client_impl.cc +++ b/source/common/http/async_client_impl.cc @@ -87,6 +87,9 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal route_(std::make_shared(parent_.cluster_->name(), options.timeout, options.hash_policy)), send_xff_(options.send_xff) { + + stream_info_.dynamicMetadata().MergeFrom(options.metadata); + if (options.buffer_body_for_retry) { buffered_body_ = std::make_unique(); } diff --git a/test/common/http/async_client_impl_test.cc b/test/common/http/async_client_impl_test.cc index b76107d395e26..d94600281661a 100644 --- a/test/common/http/async_client_impl_test.cc +++ b/test/common/http/async_client_impl_test.cc @@ -341,6 +341,97 @@ TEST_F(AsyncClientImplTest, BasicHashPolicy) { response_decoder_->decodeData(data, true); } +TEST_F(AsyncClientImplTest, WithoutMetadata) { + message_->body().add("test body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](ResponseDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) + .WillOnce( + Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { + EXPECT_EQ(context->metadataMatchCriteria(), nullptr); + return &cm_.thread_local_cluster_.conn_pool_; + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + + AsyncClient::RequestOptions options; + envoy::config::core::v3::Metadata metadata; + metadata.mutable_filter_metadata()->insert( + {"fake_test_domain", MessageUtil::keyValueStruct("fake_test_key", "fake_test_value")}); + options.setMetadata(metadata); + + auto* request = client_.send(std::move(message_), callbacks_, options); + EXPECT_NE(request, nullptr); + + expectSuccess(request, 200); + + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(data, true); +} + +TEST_F(AsyncClientImplTest, WithMetadata) { + message_->body().add("test body"); + Buffer::Instance& data = message_->body(); + + EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) + .WillOnce(Invoke([&](ResponseDecoder& decoder, + ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* { + callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_, + stream_info_, {}); + response_decoder_ = &decoder; + return nullptr; + })); + + EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _)) + .WillOnce( + Invoke([&](Upstream::ResourcePriority, absl::optional, + Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* { + EXPECT_NE(context->metadataMatchCriteria(), nullptr); + EXPECT_EQ(context->metadataMatchCriteria()->metadataMatchCriteria().at(0)->name(), + "fake_test_key"); + return &cm_.thread_local_cluster_.conn_pool_; + })); + + TestRequestHeaderMapImpl copy(message_->headers()); + copy.addCopy("x-envoy-internal", "true"); + copy.addCopy("x-forwarded-for", "127.0.0.1"); + copy.addCopy(":scheme", "http"); + + EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(©), false)); + + AsyncClient::RequestOptions options; + envoy::config::core::v3::Metadata metadata; + metadata.mutable_filter_metadata()->insert( + {Envoy::Config::MetadataFilters::get().ENVOY_LB, + MessageUtil::keyValueStruct("fake_test_key", "fake_test_value")}); + options.setMetadata(metadata); + + auto* request = client_.send(std::move(message_), callbacks_, options); + EXPECT_NE(request, nullptr); + + expectSuccess(request, 200); + + ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}}); + response_decoder_->decodeHeaders(std::move(response_headers), false); + response_decoder_->decodeData(data, true); +} + TEST_F(AsyncClientImplTest, Retry) { ON_CALL(runtime_.snapshot_, featureEnabled("upstream.use_retry", 100)) .WillByDefault(Return(true));