Skip to content
Merged
1 change: 1 addition & 0 deletions source/extensions/upstreams/http/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ envoy_cc_library(
hdrs = [
"upstream_request.h",
],
visibility = ["//visibility:public"],
Comment thread
alyssawilk marked this conversation as resolved.
deps = [
"//include/envoy/http:codes_interface",
"//include/envoy/http:conn_pool_interface",
Expand Down
1 change: 1 addition & 0 deletions source/extensions/upstreams/http/tcp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ envoy_cc_library(
hdrs = [
"upstream_request.h",
],
visibility = ["//visibility:public"],
deps = [
"//include/envoy/http:codes_interface",
"//include/envoy/http:filter_interface",
Expand Down
16 changes: 16 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1581,3 +1581,19 @@ envoy_cc_test(
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)

envoy_cc_test(
name = "cluster_upstream_extension_integration_test",
srcs = [
"cluster_upstream_extension_integration_test.cc",
],
deps = [
":http_integration_lib",
"//source/common/config:api_version_lib",
"//source/common/protobuf",
"//test/integration/upstreams:per_host_upstream_config",
"//test/test_common:utility_lib",
"@envoy_api//envoy/config/bootstrap/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/http_connection_manager/v3:pkg_cc_proto",
],
)
97 changes: 97 additions & 0 deletions test/integration/cluster_upstream_extension_integration_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
#include "envoy/registry/registry.h"
#include "envoy/router/router.h"

#include "common/buffer/buffer_impl.h"

#include "test/integration/http_integration.h"
#include "test/integration/upstreams/per_host_upstream_config.h"
#include "test/test_common/registry.h"

#include "fake_upstream.h"
Comment thread
lambdai marked this conversation as resolved.
Outdated
#include "gtest/gtest.h"

namespace Envoy {

namespace {
class ClusterUpstreamExtensionIntegrationTest
: public testing::TestWithParam<Network::Address::IpVersion>,
public HttpIntegrationTest {
public:
ClusterUpstreamExtensionIntegrationTest()
: HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam()) {}

void SetUp() override {
Comment thread
lambdai marked this conversation as resolved.
Outdated
setDownstreamProtocol(Http::CodecClient::Type::HTTP1);
setUpstreamProtocol(FakeHttpConnection::Type::HTTP1);
}

void populateMetadataTestData(envoy::config::core::v3::Metadata& metadata, const std::string& k1,
Comment thread
lambdai marked this conversation as resolved.
Outdated
const std::string& k2, const std::string& value) {

ProtobufWkt::Struct struct_obj;
(*struct_obj.mutable_fields())[k2] = ValueUtil::stringValue(value);

(*metadata.mutable_filter_metadata())[k1] = struct_obj;
}

void initialize() override {
config_helper_.addConfigModifier([this](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0);
cluster->mutable_upstream_config()->set_name("envoy.filters.connection_pools.http.per_host");
cluster->mutable_upstream_config()->mutable_typed_config();
populateMetadataTestData(*cluster->mutable_metadata(), "foo", "bar", "cluster-value");
populateMetadataTestData(*cluster->mutable_load_assignment()
->mutable_endpoints(0)
->mutable_lb_endpoints(0)
->mutable_metadata(),
"foo", "bar", "host-value");
});
HttpIntegrationTest::initialize();
}
PerHostGenericConnPoolFactory per_host_upstream_factory_;
};

INSTANTIATE_TEST_SUITE_P(IpVersions, ClusterUpstreamExtensionIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

TEST_P(ClusterUpstreamExtensionIntegrationTest,
VerifyRequestHeadersAreRewrittenByClusterAndHostMetadata) {
Comment thread
lambdai marked this conversation as resolved.
initialize();
Registry::InjectFactory<Router::GenericConnPoolFactory> registration(per_host_upstream_factory_);

codec_client_ = makeHttpConnection(lookupPort("http"));
auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_);
Comment thread
lambdai marked this conversation as resolved.
Outdated

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_));
upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true);
EXPECT_TRUE(upstream_request_->complete());

{
const auto header_values = upstream_request_->headers().get(Http::LowerCaseString("X-foo"));
ASSERT_EQ(1, header_values.size());
EXPECT_EQ("foo-common", header_values[0]->value().getStringView());
}
{
const auto cluster_header_values =
upstream_request_->headers().get(Http::LowerCaseString("X-cluster-foo"));
ASSERT_EQ(1, cluster_header_values.size());
EXPECT_EQ("cluster-value", cluster_header_values[0]->value().getStringView());
}
{
const auto host_header_values =
upstream_request_->headers().get(Http::LowerCaseString("X-host-foo"));
ASSERT_EQ(1, host_header_values.size());
EXPECT_EQ("host-value", host_header_values[0]->value().getStringView());
}

response->waitForEndStream();
ASSERT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
}
} // namespace
} // namespace Envoy
52 changes: 52 additions & 0 deletions test/integration/upstreams/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
load(
"//bazel:envoy_build_system.bzl",
"envoy_cc_extension",
"envoy_cc_library",
"envoy_extension_package",
)

licenses(["notice"]) # Apache 2

#envoy_package()
Comment thread
lambdai marked this conversation as resolved.
Outdated

envoy_extension_package()

envoy_cc_extension(
name = "per_host_upstream_config",
srcs = [
"per_host_upstream_config.cc",
],
hdrs = [
"per_host_upstream_config.h",
],
security_posture = "robust_to_untrusted_downstream",
visibility = ["//visibility:public"],
deps = [
":per_host_upstream_request_lib",
"//source/extensions/upstreams/http/http:upstream_request_lib",
"//source/extensions/upstreams/http/tcp:upstream_request_lib",
],
)

envoy_cc_library(
name = "per_host_upstream_request_lib",
srcs = [
"per_host_upstream_request.cc",
],
hdrs = [
"per_host_upstream_request.h",
],
deps = [
"//include/envoy/http:codes_interface",
"//include/envoy/http:conn_pool_interface",
"//include/envoy/http:filter_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//include/envoy/upstream:upstream_interface",
"//source/common/common:assert_lib",
"//source/common/http:header_map_lib",
"//source/common/http:headers_lib",
"//source/common/router:router_lib",
"//source/common/upstream:load_balancer_lib",
"//source/extensions/upstreams/http/http:upstream_request_lib",
],
)
19 changes: 19 additions & 0 deletions test/integration/upstreams/per_host_upstream_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "test/integration/upstreams/per_host_upstream_config.h"

#include "test/integration/upstreams/per_host_upstream_request.h"

namespace Envoy {

Router::GenericConnPoolPtr PerHostGenericConnPoolFactory::createGenericConnPool(
Comment thread
lambdai marked this conversation as resolved.
Outdated
Upstream::ClusterManager& cm, bool is_connect, const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const {
if (is_connect) {
return nullptr;
}
auto ret =
Comment thread
lambdai marked this conversation as resolved.
Outdated
std::make_unique<PerHostHttpConnPool>(cm, is_connect, route_entry, downstream_protocol, ctx);
return (ret->valid() ? std::move(ret) : nullptr);
}

} // namespace Envoy
24 changes: 24 additions & 0 deletions test/integration/upstreams/per_host_upstream_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "envoy/router/router.h"

namespace Envoy {

/**
* Config registration for the HttpConnPool. @see Router::GenericConnPoolFactory
*/
class PerHostGenericConnPoolFactory : public Router::GenericConnPoolFactory {
public:
std::string name() const override { return "envoy.filters.connection_pools.http.per_host"; }
std::string category() const override { return "envoy.upstreams"; }
Router::GenericConnPoolPtr
createGenericConnPool(Upstream::ClusterManager& cm, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) const override;

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return std::make_unique<ProtobufWkt::Struct>();
}
};
} // namespace Envoy
50 changes: 50 additions & 0 deletions test/integration/upstreams/per_host_upstream_request.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "test/integration/upstreams/per_host_upstream_request.h"

#include "common/router/router.h"

using Envoy::Router::GenericConnectionPoolCallbacks;

namespace Envoy {

void PerHostHttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
// It's possible for a reset to happen inline within the newStream() call. In this case, we
// might get deleted inline as well. Only write the returned handle out if it is not nullptr to
// deal with this case.
Comment thread
lambdai marked this conversation as resolved.
Outdated
Envoy::Http::ConnectionPool::Cancellable* handle =
conn_pool_->newStream(callbacks->upstreamToDownstream(), *this);
if (handle) {
conn_pool_stream_handle_ = handle;
}
Comment thread
lambdai marked this conversation as resolved.
Outdated
}

bool PerHostHttpConnPool::cancelAnyPendingStream() {
if (conn_pool_stream_handle_) {
conn_pool_stream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
conn_pool_stream_handle_ = nullptr;
return true;
}
return false;
}

absl::optional<Envoy::Http::Protocol> PerHostHttpConnPool::protocol() const {
return conn_pool_->protocol();
}

void PerHostHttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) {
callbacks_->onPoolFailure(reason, transport_failure_reason, host);
Comment thread
lambdai marked this conversation as resolved.
Outdated
}

void PerHostHttpConnPool::onPoolReady(Envoy::Http::RequestEncoder& request_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) {
conn_pool_stream_handle_ = nullptr;
auto upstream = std::make_unique<PerHostHttpUpstream>(callbacks_->upstreamToDownstream(),
&request_encoder, host);
callbacks_->onPoolReady(std::move(upstream), host,
request_encoder.getStream().connectionLocalAddress(), info);
}

} // namespace Envoy
110 changes: 110 additions & 0 deletions test/integration/upstreams/per_host_upstream_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#pragma once

#include <memory>

#include "envoy/http/conn_pool.h"
#include "envoy/http/metadata_interface.h"
#include "envoy/http/protocol.h"
#include "envoy/upstream/cluster_manager.h"
#include "envoy/upstream/host_description.h"

#include "common/http/header_map_impl.h"
#include "common/router/upstream_request.h"

#include "extensions/upstreams/http/http/upstream_request.h"

namespace Envoy {

class PerHostHttpConnPool : public Router::GenericConnPool,
Comment thread
lambdai marked this conversation as resolved.
Outdated
public Envoy::Http::ConnectionPool::Callbacks {
public:
// GenericConnPool
PerHostHttpConnPool(Upstream::ClusterManager& cm, bool is_connect,
const Router::RouteEntry& route_entry,
absl::optional<Envoy::Http::Protocol> downstream_protocol,
Upstream::LoadBalancerContext* ctx) {
ASSERT(!is_connect);
conn_pool_ = cm.httpConnPoolForCluster(route_entry.clusterName(), route_entry.priority(),
downstream_protocol, ctx);
}
void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override;
bool cancelAnyPendingStream() override;
absl::optional<Envoy::Http::Protocol> protocol() const override;

// Http::ConnectionPool::Callbacks
void onPoolFailure(ConnectionPool::PoolFailureReason reason,
absl::string_view transport_failure_reason,
Upstream::HostDescriptionConstSharedPtr host) override;
void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder,
Upstream::HostDescriptionConstSharedPtr host,
const StreamInfo::StreamInfo& info) override;
Upstream::HostDescriptionConstSharedPtr host() const override { return conn_pool_->host(); }

bool valid() { return conn_pool_ != nullptr; }

private:
// Points to the actual connection pool to create streams from.
Envoy::Http::ConnectionPool::Instance* conn_pool_{};
Envoy::Http::ConnectionPool::Cancellable* conn_pool_stream_handle_{};
Router::GenericConnectionPoolCallbacks* callbacks_{};
};

class PerHostHttpUpstream : public Router::GenericUpstream {
public:
PerHostHttpUpstream(Router::UpstreamToDownstream& upstream_request,
Envoy::Http::RequestEncoder* encoder,
Upstream::HostDescriptionConstSharedPtr host)
: sub_upstream_(upstream_request, encoder), host_(host) {}

// GenericUpstream
void encodeData(Buffer::Instance& data, bool end_stream) override {
sub_upstream_.encodeData(data, end_stream);
}

void encodeMetadata(const Envoy::Http::MetadataMapVector& metadata_map_vector) override {
sub_upstream_.encodeMetadata(metadata_map_vector);
}

void encodeHeaders(const Envoy::Http::RequestHeaderMap& headers, bool end_stream) override {
auto dup = Envoy::Http::RequestHeaderMapImpl::create();
Envoy::Http::HeaderMapImpl::copyFrom(*dup, headers);
dup->setCopy(Envoy::Http::LowerCaseString("X-foo"), "foo-common");
if (auto filter_metadata = host_->cluster().metadata().filter_metadata().find("foo");
filter_metadata != host_->cluster().metadata().filter_metadata().end()) {
const ProtobufWkt::Struct& data_struct = filter_metadata->second;
const auto& fields = data_struct.fields();
if (auto iter = fields.find("bar"); iter != fields.end()) {
if (iter->second.kind_case() == ProtobufWkt::Value::kStringValue) {
dup->setCopy(Envoy::Http::LowerCaseString("X-cluster-foo"), iter->second.string_value());
}
}
}
if (host_->metadata() != nullptr) {
if (auto filter_metadata = host_->metadata()->filter_metadata().find("foo");
filter_metadata != host_->metadata()->filter_metadata().end()) {
const ProtobufWkt::Struct& data_struct = filter_metadata->second;
const auto& fields = data_struct.fields();
if (auto iter = fields.find("bar"); iter != fields.end()) {
if (iter->second.kind_case() == ProtobufWkt::Value::kStringValue) {
dup->setCopy(Envoy::Http::LowerCaseString("X-host-foo"), iter->second.string_value());
}
}
}
}
sub_upstream_.encodeHeaders(*dup, end_stream);
}
Comment thread
lambdai marked this conversation as resolved.
Outdated

void encodeTrailers(const Envoy::Http::RequestTrailerMap& trailers) override {
sub_upstream_.encodeTrailers(trailers);
}

void readDisable(bool disable) override { sub_upstream_.readDisable(disable); }

void resetStream() override { sub_upstream_.resetStream(); }

private:
Extensions::Upstreams::Http::Http::HttpUpstream sub_upstream_;
Upstream::HostDescriptionConstSharedPtr host_{};
Comment thread
lambdai marked this conversation as resolved.
Outdated
};

} // namespace Envoy