Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/envoy/http/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ class AsyncClient {
parent_context = v;
return *this;
}
// Set dynamic metadata of async stream. If a metadata record with filter name 'envoy.lb' is
// provided, metadata match criteria of async stream route will be overridden by the metadata
// and then used by the subset load balancer.
StreamOptions& setMetadata(const envoy::config::core::v3::Metadata& m) {
metadata = m;
return *this;
}

// For gmock test
bool operator==(const StreamOptions& src) const {
Expand All @@ -230,6 +237,8 @@ class AsyncClient {

// Provides parent context. Currently, this holds stream info from the caller.
ParentContext parent_context;

envoy::config::core::v3::Metadata metadata;
};

/**
Expand Down Expand Up @@ -261,6 +270,10 @@ class AsyncClient {
StreamOptions::setParentContext(v);
return *this;
}
RequestOptions& setMetadata(const envoy::config::core::v3::Metadata& m) {
StreamOptions::setMetadata(m);
return *this;
}
RequestOptions& setParentSpan(Tracing::Span& parent_span) {
parent_span_ = &parent_span;
return *this;
Expand Down
3 changes: 3 additions & 0 deletions source/common/http/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
route_(std::make_shared<RouteImpl>(parent_.cluster_->name(), options.timeout,
options.hash_policy)),
send_xff_(options.send_xff) {

stream_info_.dynamicMetadata().MergeFrom(options.metadata);

if (options.buffer_body_for_retry) {
buffered_body_ = std::make_unique<Buffer::OwnedImpl>();
}
Expand Down
91 changes: 91 additions & 0 deletions test/common/http/async_client_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,97 @@ TEST_F(AsyncClientImplTest, BasicHashPolicy) {
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, WithoutMetadata) {
message_->body().add("test body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](ResponseDecoder& decoder,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _))
.WillOnce(
Invoke([&](Upstream::ResourcePriority, absl::optional<Http::Protocol>,
Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* {
EXPECT_EQ(context->metadataMatchCriteria(), nullptr);
return &cm_.thread_local_cluster_.conn_pool_;
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));

AsyncClient::RequestOptions options;
envoy::config::core::v3::Metadata metadata;
metadata.mutable_filter_metadata()->insert(
{"fake_test_domain", MessageUtil::keyValueStruct("fake_test_key", "fake_test_value")});
options.setMetadata(metadata);

auto* request = client_.send(std::move(message_), callbacks_, options);
EXPECT_NE(request, nullptr);

expectSuccess(request, 200);

ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, WithMetadata) {
message_->body().add("test body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](ResponseDecoder& decoder,
ConnectionPool::Callbacks& callbacks) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _))
.WillOnce(
Invoke([&](Upstream::ResourcePriority, absl::optional<Http::Protocol>,
Upstream::LoadBalancerContext* context) -> Http::ConnectionPool::Instance* {
EXPECT_NE(context->metadataMatchCriteria(), nullptr);
EXPECT_EQ(context->metadataMatchCriteria()->metadataMatchCriteria().at(0)->name(),
"fake_test_key");
return &cm_.thread_local_cluster_.conn_pool_;
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));

AsyncClient::RequestOptions options;
envoy::config::core::v3::Metadata metadata;
metadata.mutable_filter_metadata()->insert(
{Envoy::Config::MetadataFilters::get().ENVOY_LB,
MessageUtil::keyValueStruct("fake_test_key", "fake_test_value")});
options.setMetadata(metadata);

auto* request = client_.send(std::move(message_), callbacks_, options);
EXPECT_NE(request, nullptr);

expectSuccess(request, 200);

ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, Retry) {
ON_CALL(runtime_.snapshot_, featureEnabled("upstream.use_retry", 100))
.WillByDefault(Return(true));
Expand Down