diff --git a/CODEOWNERS b/CODEOWNERS index ca989c164e4b0..6f341bef43b95 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -43,6 +43,8 @@ extensions/filters/common/original_src @snowp @klarose /*/extensions/transport_sockets/common @alyssawilk @wez470 # starttls transport socket /*/extensions/transport_sockets/starttls @cpakulski @lizan +# proxy transport socket +/*extensions/transport_sockets/http_11_proxy @alyssawilk @ryantheoptimist # internal upstream transport socket /*/extensions/transport_sockets/internal_upstream @kyessenov @lambdai # sni_cluster extension diff --git a/api/BUILD b/api/BUILD index 7ae40630173c3..a7d3bb72e8662 100644 --- a/api/BUILD +++ b/api/BUILD @@ -255,6 +255,7 @@ proto_library( "//envoy/extensions/stat_sinks/graphite_statsd/v3:pkg", "//envoy/extensions/stat_sinks/wasm/v3:pkg", "//envoy/extensions/transport_sockets/alts/v3:pkg", + "//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg", "//envoy/extensions/transport_sockets/internal_upstream/v3:pkg", "//envoy/extensions/transport_sockets/proxy_protocol/v3:pkg", "//envoy/extensions/transport_sockets/quic/v3:pkg", diff --git a/api/envoy/extensions/transport_sockets/http_11_proxy/v3/BUILD b/api/envoy/extensions/transport_sockets/http_11_proxy/v3/BUILD new file mode 100644 index 0000000000000..1c1a6f6b44235 --- /dev/null +++ b/api/envoy/extensions/transport_sockets/http_11_proxy/v3/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto b/api/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto new file mode 100644 index 0000000000000..99c2e451047b3 --- /dev/null +++ b/api/envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package envoy.extensions.transport_sockets.http_11_proxy.v3; + +import "envoy/config/core/v3/base.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.transport_sockets.http_11_proxy.v3"; +option java_outer_classname = "UpstreamHttp11ConnectProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/http_11_proxy/v3;http_11_proxyv3"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Upstream HTTP/1.1 Proxy] +// [#extension: envoy.transport_sockets.http_11_proxy] + +// Configuration for HTTP/1.1 proxy transport sockets. +// This is intended for use in Envoy Mobile, though may eventually be extended +// for upstream Envoy use. +// If this transport socket is configured, and an intermediate filter adds the +// stream info necessary for proxying to the stream info (as the test filter +// does :repo:`here `) then +// +// * Upstream connections will be directed to the specified proxy address rather +// than the host's address +// * Upstream TLS connections will have a raw HTTP/1.1 CONNECT header prefaced +// to the payload, and 200 response stripped (if less than 200 bytes) +// * Plaintext HTTP/1.1 connections will be sent with a fully qualified URL. +// +// This transport socket is not compatible with HTTP/3, plaintext HTTP/2, or raw TCP. +message Http11ProxyUpstreamTransport { + // The underlying transport socket being wrapped. + config.core.v3.TransportSocket transport_socket = 1 [(validate.rules).message = {required: true}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index da23a1e78e646..a873cfc24c737 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -196,6 +196,7 @@ proto_library( "//envoy/extensions/stat_sinks/graphite_statsd/v3:pkg", "//envoy/extensions/stat_sinks/wasm/v3:pkg", "//envoy/extensions/transport_sockets/alts/v3:pkg", + "//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg", "//envoy/extensions/transport_sockets/internal_upstream/v3:pkg", "//envoy/extensions/transport_sockets/proxy_protocol/v3:pkg", "//envoy/extensions/transport_sockets/quic/v3:pkg", diff --git a/envoy/network/transport_socket.h b/envoy/network/transport_socket.h index d471e7eee88b2..55470c4719f6c 100644 --- a/envoy/network/transport_socket.h +++ b/envoy/network/transport_socket.h @@ -3,6 +3,7 @@ #include #include "envoy/buffer/buffer.h" +#include "envoy/common/optref.h" #include "envoy/common/pure.h" #include "envoy/network/io_handle.h" #include "envoy/network/listen_socket.h" @@ -234,6 +235,22 @@ class TransportSocketOptions { */ virtual absl::optional proxyProtocolOptions() const PURE; + // Information for use by the http_11_proxy transport socket. + struct Http11ProxyInfo { + Http11ProxyInfo(std::string hostname, Network::Address::InstanceConstSharedPtr address) + : hostname(hostname), proxy_address(address) {} + // The hostname of the original request, to be used in CONNECT request if + // the underlying transport is TLS. + std::string hostname; + // The address of the proxy, where connections should be routed to. + Network::Address::InstanceConstSharedPtr proxy_address; + }; + + /** + * @return any proxy information if sending to an intermediate proxy over HTTP/1.1. + */ + virtual OptRef http11ProxyInfo() const PURE; + /** * @return filter state objects from the downstream request or connection * that are marked as shared with the upstream connection. diff --git a/source/common/http/codec_client.cc b/source/common/http/codec_client.cc index ba2f9d87d8b40..10590a94191b7 100644 --- a/source/common/http/codec_client.cc +++ b/source/common/http/codec_client.cc @@ -169,22 +169,30 @@ void CodecClient::onData(Buffer::Instance& data) { CodecClientProd::CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection, Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher, - Random::RandomGenerator& random_generator) - : NoConnectCodecClientProd(type, std::move(connection), host, dispatcher, random_generator) { + Random::RandomGenerator& random_generator, + const Network::TransportSocketOptionsConstSharedPtr& options) + : NoConnectCodecClientProd(type, std::move(connection), host, dispatcher, random_generator, + options) { connect(); } -NoConnectCodecClientProd::NoConnectCodecClientProd(CodecType type, - Network::ClientConnectionPtr&& connection, - Upstream::HostDescriptionConstSharedPtr host, - Event::Dispatcher& dispatcher, - Random::RandomGenerator& random_generator) +NoConnectCodecClientProd::NoConnectCodecClientProd( + CodecType type, Network::ClientConnectionPtr&& connection, + Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher, + Random::RandomGenerator& random_generator, + const Network::TransportSocketOptionsConstSharedPtr& options) : CodecClient(type, std::move(connection), host, dispatcher) { switch (type) { case CodecType::HTTP1: { + // If the transport socket indicates this is being proxied, inform the HTTP/1.1 codec. It will + // send fully qualified URLs iff the underlying transport is plaintext. + bool proxied = false; + if (options && options->http11ProxyInfo().has_value()) { + proxied = true; + } codec_ = std::make_unique( *connection_, host->cluster().http1CodecStats(), *this, host->cluster().http1Settings(), - host->cluster().maxResponseHeadersCount()); + host->cluster().maxResponseHeadersCount(), proxied); break; } case CodecType::HTTP2: { diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 05e43c22a5dec..7356b51c599a7 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -281,8 +281,8 @@ class NoConnectCodecClientProd : public CodecClient { public: NoConnectCodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection, Upstream::HostDescriptionConstSharedPtr host, - Event::Dispatcher& dispatcher, - Random::RandomGenerator& random_generator); + Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator, + const Network::TransportSocketOptionsConstSharedPtr& options); }; /** @@ -292,7 +292,8 @@ class CodecClientProd : public NoConnectCodecClientProd { public: CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection, Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher, - Random::RandomGenerator& random_generator); + Random::RandomGenerator& random_generator, + const Network::TransportSocketOptionsConstSharedPtr& options); }; } // namespace Http diff --git a/source/common/http/http1/codec_impl.cc b/source/common/http/http1/codec_impl.cc index 56a9fdbc60bce..c2f3425c80876 100644 --- a/source/common/http/http1/codec_impl.cc +++ b/source/common/http/http1/codec_impl.cc @@ -1325,13 +1325,15 @@ void ServerConnectionImpl::ActiveRequest::dumpState(std::ostream& os, int indent ClientConnectionImpl::ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks&, const Http1Settings& settings, - const uint32_t max_response_headers_count) + const uint32_t max_response_headers_count, + bool passing_through_proxy) : ConnectionImpl(connection, stats, settings, MessageType::Response, MAX_RESPONSE_HEADERS_KB, max_response_headers_count), owned_output_buffer_(connection.dispatcher().getWatermarkFactory().createBuffer( [&]() -> void { this->onBelowLowWatermark(); }, [&]() -> void { this->onAboveHighWatermark(); }, - []() -> void { /* TODO(adisuissa): handle overflow watermark */ })) { + []() -> void { /* TODO(adisuissa): handle overflow watermark */ })), + passing_through_proxy_(passing_through_proxy) { owned_output_buffer_->setWatermarks(connection.bufferLimit()); // Inform parent output_buffer_ = owned_output_buffer_.get(); diff --git a/source/common/http/http1/codec_impl.h b/source/common/http/http1/codec_impl.h index 8bbdd3496e32b..e8713b6db9a0e 100644 --- a/source/common/http/http1/codec_impl.h +++ b/source/common/http/http1/codec_impl.h @@ -197,6 +197,7 @@ class ConnectionImpl : public virtual Connection, * @return Network::Connection& the backing network connection. */ Network::Connection& connection() { return connection_; } + const Network::Connection& connection() const { return connection_; } /** * Called when the active encoder has completed encoding the outbound half of the stream. @@ -230,7 +231,7 @@ class ConnectionImpl : public virtual Connection, virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {} CodecStats& stats() { return stats_; } bool enableTrailers() const { return codec_settings_.enable_trailers_; } - bool sendFullyQualifiedUrl() const { return codec_settings_.send_fully_qualified_url_; } + virtual bool sendFullyQualifiedUrl() const { return codec_settings_.send_fully_qualified_url_; } HeaderKeyFormatterOptConstRef formatter() const { return makeOptRefFromPtr(encode_only_header_key_formatter_.get()); } @@ -563,7 +564,8 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { public: ClientConnectionImpl(Network::Connection& connection, CodecStats& stats, ConnectionCallbacks& callbacks, const Http1Settings& settings, - const uint32_t max_response_headers_count); + const uint32_t max_response_headers_count, + bool passing_through_proxy = false); // Http::ClientConnection RequestEncoder& newStream(ResponseDecoder& response_decoder) override; @@ -578,6 +580,13 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { bool cannotHaveBody(); + bool sendFullyQualifiedUrl() const override { + // Send fully qualified URLs either if the parent connection is configured to do so or this + // stream is passing through a proxy and the underlying transport is plaintext. + return ConnectionImpl::sendFullyQualifiedUrl() || + (passing_through_proxy_ && !connection().ssl()); + } + // ParserCallbacks. Status onUrlBase(const char*, size_t) override { return okStatus(); } Status onStatusBase(const char* data, size_t length) override; @@ -649,6 +658,10 @@ class ClientConnectionImpl : public ClientConnection, public ConnectionImpl { // The default limit of 80 KiB is the vanilla http_parser behaviour. static constexpr uint32_t MAX_RESPONSE_HEADERS_KB = 80; + + // True if the upstream connection is pointed at an HTTP/1.1 proxy, and + // plaintext HTTP should be sent with fully qualified URLs. + bool passing_through_proxy_ = false; }; } // namespace Http1 diff --git a/source/common/http/http1/conn_pool.cc b/source/common/http/http1/conn_pool.cc index ed0788f87a4d3..de6ac0a988a24 100644 --- a/source/common/http/http1/conn_pool.cc +++ b/source/common/http/http1/conn_pool.cc @@ -104,9 +104,9 @@ allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_ return std::make_unique(*pool, absl::nullopt); }, [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) { - CodecClientPtr codec{new CodecClientProd(CodecType::HTTP1, std::move(data.connection_), - data.host_description_, pool->dispatcher(), - pool->randomGenerator())}; + CodecClientPtr codec{new CodecClientProd( + CodecType::HTTP1, std::move(data.connection_), data.host_description_, + pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())}; return codec; }, std::vector{Protocol::Http11}, absl::nullopt, nullptr); diff --git a/source/common/http/http2/conn_pool.cc b/source/common/http/http2/conn_pool.cc index 722b5a42783d2..8879d75282b6b 100644 --- a/source/common/http/http2/conn_pool.cc +++ b/source/common/http/http2/conn_pool.cc @@ -61,9 +61,9 @@ allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_ return std::make_unique(*pool, absl::nullopt); }, [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) { - CodecClientPtr codec{new CodecClientProd(CodecType::HTTP2, std::move(data.connection_), - data.host_description_, pool->dispatcher(), - pool->randomGenerator())}; + CodecClientPtr codec{new CodecClientProd( + CodecType::HTTP2, std::move(data.connection_), data.host_description_, + pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())}; return codec; }, std::vector{Protocol::Http2}, origin, cache); diff --git a/source/common/http/http3/conn_pool.cc b/source/common/http/http3/conn_pool.cc index eae7ee295fb8d..35a1d7ace3f1c 100644 --- a/source/common/http/http3/conn_pool.cc +++ b/source/common/http/http3/conn_pool.cc @@ -191,10 +191,10 @@ allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_ "envoy.reloadable_features.postpone_h3_client_connect_to_next_loop") ? std::make_unique( CodecType::HTTP3, std::move(data.connection_), data.host_description_, - pool->dispatcher(), pool->randomGenerator()) - : std::make_unique(CodecType::HTTP3, std::move(data.connection_), - data.host_description_, pool->dispatcher(), - pool->randomGenerator()); + pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions()) + : std::make_unique( + CodecType::HTTP3, std::move(data.connection_), data.host_description_, + pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions()); return codec; }, std::vector{Protocol::Http3}, connect_callback, quic_info); diff --git a/source/common/http/mixed_conn_pool.cc b/source/common/http/mixed_conn_pool.cc index bafca5d1ab4e1..f03c65394056d 100644 --- a/source/common/http/mixed_conn_pool.cc +++ b/source/common/http/mixed_conn_pool.cc @@ -24,7 +24,8 @@ CodecClientPtr HttpConnPoolImplMixed::createCodecClient(Upstream::Host::CreateConnectionData& data) { auto protocol = protocol_ == Protocol::Http11 ? CodecType::HTTP1 : CodecType::HTTP2; CodecClientPtr codec{new CodecClientProd(protocol, std::move(data.connection_), - data.host_description_, dispatcher_, random_generator_)}; + data.host_description_, dispatcher_, random_generator_, + transportSocketOptions())}; return codec; } diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 5fef57791c75f..d7bc7244bbd96 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -453,6 +453,7 @@ envoy_cc_library( hdrs = ["transport_socket_options_impl.h"], deps = [ ":application_protocol_lib", + ":filter_state_proxy_info_lib", ":proxy_protocol_filter_state_lib", ":upstream_server_name_lib", ":upstream_subject_alt_names_lib", @@ -465,6 +466,17 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "filter_state_proxy_info_lib", + srcs = ["filter_state_proxy_info.cc"], + hdrs = ["filter_state_proxy_info.h"], + deps = [ + "//envoy/network:address_interface", + "//envoy/stream_info:filter_state_interface", + "//source/common/common:macros", + ], +) + envoy_cc_library( name = "upstream_server_name_lib", srcs = ["upstream_server_name.cc"], diff --git a/source/common/network/filter_state_proxy_info.cc b/source/common/network/filter_state_proxy_info.cc new file mode 100644 index 0000000000000..0f43ef5e17341 --- /dev/null +++ b/source/common/network/filter_state_proxy_info.cc @@ -0,0 +1,11 @@ +#include "source/common/network/filter_state_proxy_info.h" + +namespace Envoy { +namespace Network { + +const std::string& Http11ProxyInfoFilterState::key() { + CONSTRUCT_ON_FIRST_USE(std::string, "envoy.network.transport_socket.http_11_proxy.info"); +} + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/filter_state_proxy_info.h b/source/common/network/filter_state_proxy_info.h new file mode 100644 index 0000000000000..e06beeccbd488 --- /dev/null +++ b/source/common/network/filter_state_proxy_info.h @@ -0,0 +1,34 @@ +#pragma once + +#include "envoy/network/address.h" +#include "envoy/stream_info/filter_state.h" + +#include "absl/strings/string_view.h" + +namespace Envoy { +namespace Network { + +/** + * Information which filters can add if they detect the stream should go + * upstream through an HTTP/1.1 proxy. + */ +class Http11ProxyInfoFilterState : public StreamInfo::FilterState::Object { +public: + // Returns the key for looking up the Http11ProxyInfoFilterState in the FilterState. + static const std::string& key(); + + Http11ProxyInfoFilterState(absl::string_view hostname, + Network::Address::InstanceConstSharedPtr address) + : hostname_(hostname), address_(address) {} + Network::Address::InstanceConstSharedPtr address() const { return address_; } + const std::string& hostname() const { return hostname_; } + +private: + // The hostname of this individual request. + const std::string hostname_; + // The address of the proxy. + const Network::Address::InstanceConstSharedPtr address_; +}; + +} // namespace Network +} // namespace Envoy diff --git a/source/common/network/transport_socket_options_impl.cc b/source/common/network/transport_socket_options_impl.cc index 7494093ecfa17..7af368b765d10 100644 --- a/source/common/network/transport_socket_options_impl.cc +++ b/source/common/network/transport_socket_options_impl.cc @@ -11,6 +11,7 @@ #include "source/common/common/scalar_to_byte_vector.h" #include "source/common/common/utility.h" #include "source/common/network/application_protocol.h" +#include "source/common/network/filter_state_proxy_info.h" #include "source/common/network/proxy_protocol_filter_state.h" #include "source/common/network/upstream_server_name.h" #include "source/common/network/upstream_subject_alt_names.h" @@ -59,6 +60,7 @@ TransportSocketOptionsUtility::fromFilterState(const StreamInfo::FilterState& fi std::vector subject_alt_names; std::vector alpn_fallback; absl::optional proxy_protocol_options; + std::unique_ptr proxy_info; bool needs_transport_socket_options = false; if (auto typed_data = filter_state.getDataReadOnly(UpstreamServerName::key()); @@ -88,6 +90,14 @@ TransportSocketOptionsUtility::fromFilterState(const StreamInfo::FilterState& fi needs_transport_socket_options = true; } + if (auto typed_data = filter_state.getDataReadOnly( + Http11ProxyInfoFilterState::key()); + typed_data != nullptr) { + proxy_info = std::make_unique(typed_data->hostname(), + typed_data->address()); + needs_transport_socket_options = true; + } + StreamInfo::FilterState::ObjectsPtr objects = filter_state.objectsSharedWithUpstreamConnection(); if (!objects->empty()) { needs_transport_socket_options = true; @@ -96,7 +106,8 @@ TransportSocketOptionsUtility::fromFilterState(const StreamInfo::FilterState& fi if (needs_transport_socket_options) { return std::make_shared( server_name, std::move(subject_alt_names), std::move(application_protocols), - std::move(alpn_fallback), proxy_protocol_options, std::move(objects)); + std::move(alpn_fallback), proxy_protocol_options, std::move(objects), + std::move(proxy_info)); } else { return nullptr; } diff --git a/source/common/network/transport_socket_options_impl.h b/source/common/network/transport_socket_options_impl.h index fb5ff4f2bac73..2390abface8ab 100644 --- a/source/common/network/transport_socket_options_impl.h +++ b/source/common/network/transport_socket_options_impl.h @@ -1,5 +1,6 @@ #pragma once +#include "envoy/common/optref.h" #include "envoy/network/proxy_protocol.h" #include "envoy/network/transport_socket.h" #include "envoy/stream_info/filter_state.h" @@ -29,6 +30,9 @@ class AlpnDecoratingTransportSocketOptions : public TransportSocketOptions { absl::optional proxyProtocolOptions() const override { return inner_options_->proxyProtocolOptions(); } + OptRef http11ProxyInfo() const override { + return inner_options_->http11ProxyInfo(); + } const StreamInfo::FilterState::Objects& downstreamSharedFilterStateObjects() const override { return inner_options_->downstreamSharedFilterStateObjects(); } @@ -46,14 +50,16 @@ class TransportSocketOptionsImpl : public TransportSocketOptions { std::vector&& override_alpn = {}, std::vector&& fallback_alpn = {}, absl::optional proxy_proto_options = absl::nullopt, StreamInfo::FilterState::ObjectsPtr filter_state_objects = - std::make_unique()) + std::make_unique(), + std::unique_ptr&& proxy_info = nullptr) : override_server_name_(override_server_name.empty() ? absl::nullopt : absl::optional(override_server_name)), override_verify_san_list_{std::move(override_verify_san_list)}, override_alpn_list_{std::move(override_alpn)}, alpn_fallback_{std::move(fallback_alpn)}, proxy_protocol_options_(proxy_proto_options), - filter_state_objects_(std::move(filter_state_objects)) {} + filter_state_objects_(std::move(filter_state_objects)), proxy_info_(std::move(proxy_info)) { + } // Network::TransportSocketOptions const absl::optional& serverNameOverride() const override { @@ -71,6 +77,12 @@ class TransportSocketOptionsImpl : public TransportSocketOptions { absl::optional proxyProtocolOptions() const override { return proxy_protocol_options_; } + OptRef http11ProxyInfo() const override { + if (!proxy_info_) { + return {}; + } + return {*proxy_info_}; + } const StreamInfo::FilterState::Objects& downstreamSharedFilterStateObjects() const override { return *filter_state_objects_; } @@ -82,6 +94,8 @@ class TransportSocketOptionsImpl : public TransportSocketOptions { const std::vector alpn_fallback_; const absl::optional proxy_protocol_options_; const StreamInfo::FilterState::ObjectsPtr filter_state_objects_; + const StreamInfo::FilterStateSharedPtr filter_state_; + std::unique_ptr proxy_info_; }; class TransportSocketOptionsUtility { diff --git a/source/common/tcp_proxy/tcp_proxy.h b/source/common/tcp_proxy/tcp_proxy.h index 74406a610c09d..97849dd734436 100644 --- a/source/common/tcp_proxy/tcp_proxy.h +++ b/source/common/tcp_proxy/tcp_proxy.h @@ -356,7 +356,6 @@ class Filter : public Network::ReadFilter, return {}; } - const Network::Connection* downstreamConnection() const override { return &read_callbacks_->connection(); } diff --git a/source/common/upstream/health_checker_impl.cc b/source/common/upstream/health_checker_impl.cc index 6a8382a825ed0..2755535adc204 100644 --- a/source/common/upstream/health_checker_impl.cc +++ b/source/common/upstream/health_checker_impl.cc @@ -456,7 +456,8 @@ HttpHealthCheckerImpl::codecClientType(const envoy::type::v3::CodecClientType& t Http::CodecClient* ProdHttpHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) { return new Http::CodecClientProd(codec_client_type_, std::move(data.connection_), - data.host_description_, dispatcher_, random_generator_); + data.host_description_, dispatcher_, random_generator_, + transportSocketOptions()); } TcpHealthCheckMatcher::MatchSegments TcpHealthCheckMatcher::loadProtoBytes( @@ -932,7 +933,7 @@ Http::CodecClientPtr ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) { return std::make_unique( Http::CodecType::HTTP2, std::move(data.connection_), data.host_description_, dispatcher_, - random_generator_); + random_generator_, transportSocketOptions()); } std::ostream& operator<<(std::ostream& out, HealthState state) { diff --git a/source/common/upstream/upstream_impl.cc b/source/common/upstream/upstream_impl.cc index c3e165862cf78..0d5c9608b505e 100644 --- a/source/common/upstream/upstream_impl.cc +++ b/source/common/upstream/upstream_impl.cc @@ -279,6 +279,15 @@ Network::UpstreamTransportSocketFactory& HostDescriptionImpl::resolveTransportSo Host::CreateConnectionData HostImpl::createConnection( Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const { + // If the transport socket options indicate the connection should be + // redirected to a proxy, create the TCP connection to the proxy's address not + // the host's address. + if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) { + return createConnection( + dispatcher, cluster(), transport_socket_options->http11ProxyInfo()->proxy_address, + {transport_socket_options->http11ProxyInfo()->proxy_address}, transportSocketFactory(), + options, transport_socket_options, shared_from_this()); + } return createConnection(dispatcher, cluster(), address(), addressList(), transportSocketFactory(), options, transport_socket_options, shared_from_this()); } diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 9391e98b5704d..12d64d11b49aa 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -205,6 +205,7 @@ EXTENSIONS = { # "envoy.transport_sockets.alts": "//source/extensions/transport_sockets/alts:config", + "envoy.transport_sockets.http_11_proxy": "//source/extensions/transport_sockets/http_11_proxy:upstream_config", "envoy.transport_sockets.upstream_proxy_protocol": "//source/extensions/transport_sockets/proxy_protocol:upstream_config", "envoy.transport_sockets.raw_buffer": "//source/extensions/transport_sockets/raw_buffer:config", "envoy.transport_sockets.tap": "//source/extensions/transport_sockets/tap:config", diff --git a/source/extensions/extensions_metadata.yaml b/source/extensions/extensions_metadata.yaml index 66e64edf47767..1752f4d02285b 100644 --- a/source/extensions/extensions_metadata.yaml +++ b/source/extensions/extensions_metadata.yaml @@ -1011,6 +1011,13 @@ envoy.transport_sockets.upstream_proxy_protocol: status: stable type_urls: - envoy.extensions.transport_sockets.proxy_protocol.v3.ProxyProtocolUpstreamTransport +envoy.transport_sockets.http_11_proxy: + categories: + - envoy.transport_sockets.upstream + security_posture: unknown + status: alpha + type_urls: + - envoy.extensions.transport_sockets.http_11_proxy.v3.Http11ProxyUpstreamTransport envoy.upstreams.http.generic: categories: - envoy.upstreams diff --git a/source/extensions/transport_sockets/http_11_proxy/BUILD b/source/extensions/transport_sockets/http_11_proxy/BUILD new file mode 100644 index 0000000000000..4e74a546c0f95 --- /dev/null +++ b/source/extensions/transport_sockets/http_11_proxy/BUILD @@ -0,0 +1,39 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_extension_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_extension_package() + +envoy_cc_extension( + name = "upstream_config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":connect", + "//envoy/network:transport_socket_interface", + "//envoy/registry", + "//envoy/server:transport_socket_config_interface", + "//source/common/config:utility_lib", + "@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg_cc_proto", + ], +) + +envoy_cc_library( + name = "connect", + srcs = ["connect.cc"], + hdrs = ["connect.h"], + deps = [ + "//envoy/network:connection_interface", + "//envoy/network:transport_socket_interface", + "//source/common/buffer:buffer_lib", + "//source/common/common:scalar_to_byte_vector_lib", + "//source/common/common:utility_lib", + "//source/common/network:address_lib", + "//source/extensions/transport_sockets/common:passthrough_lib", + ], +) diff --git a/source/extensions/transport_sockets/http_11_proxy/config.cc b/source/extensions/transport_sockets/http_11_proxy/config.cc new file mode 100644 index 0000000000000..e8b01d11a29b2 --- /dev/null +++ b/source/extensions/transport_sockets/http_11_proxy/config.cc @@ -0,0 +1,42 @@ +#include "source/extensions/transport_sockets/http_11_proxy/config.h" + +#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h" +#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.validate.h" +#include "envoy/registry/registry.h" + +#include "source/common/config/utility.h" +#include "source/extensions/transport_sockets/http_11_proxy/connect.h" + +namespace Envoy { +namespace Extensions { +namespace TransportSockets { +namespace Http11Connect { + +Network::UpstreamTransportSocketFactoryPtr +UpstreamHttp11ConnectSocketConfigFactory::createTransportSocketFactory( + const Protobuf::Message& message, + Server::Configuration::TransportSocketFactoryContext& context) { + const auto& outer_config = MessageUtil::downcastAndValidate< + const envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport&>( + message, context.messageValidationVisitor()); + auto& inner_config_factory = Config::Utility::getAndCheckFactory< + Server::Configuration::UpstreamTransportSocketConfigFactory>(outer_config.transport_socket()); + ProtobufTypes::MessagePtr inner_factory_config = Config::Utility::translateToFactoryConfig( + outer_config.transport_socket(), context.messageValidationVisitor(), inner_config_factory); + auto inner_transport_factory = + inner_config_factory.createTransportSocketFactory(*inner_factory_config, context); + return std::make_unique(std::move(inner_transport_factory)); +} + +ProtobufTypes::MessagePtr UpstreamHttp11ConnectSocketConfigFactory::createEmptyConfigProto() { + return std::make_unique< + envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport>(); +} + +REGISTER_FACTORY(UpstreamHttp11ConnectSocketConfigFactory, + Server::Configuration::UpstreamTransportSocketConfigFactory); + +} // namespace Http11Connect +} // namespace TransportSockets +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/transport_sockets/http_11_proxy/config.h b/source/extensions/transport_sockets/http_11_proxy/config.h new file mode 100644 index 0000000000000..b8580a6296d21 --- /dev/null +++ b/source/extensions/transport_sockets/http_11_proxy/config.h @@ -0,0 +1,27 @@ +#pragma once + +#include "envoy/server/transport_socket_config.h" + +namespace Envoy { +namespace Extensions { +namespace TransportSockets { +namespace Http11Connect { + +/** + * Config registration for the upstream HTTP/1.1 proxy transport socket factory. + * @see TransportSocketConfigFactory. + */ +class UpstreamHttp11ConnectSocketConfigFactory + : public Server::Configuration::UpstreamTransportSocketConfigFactory { +public: + std::string name() const override { return "envoy.transport_sockets.http_11_proxy"; } + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + Network::UpstreamTransportSocketFactoryPtr createTransportSocketFactory( + const Protobuf::Message& config, + Server::Configuration::TransportSocketFactoryContext& context) override; +}; + +} // namespace Http11Connect +} // namespace TransportSockets +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/transport_sockets/http_11_proxy/connect.cc b/source/extensions/transport_sockets/http_11_proxy/connect.cc new file mode 100644 index 0000000000000..7f458dceb3832 --- /dev/null +++ b/source/extensions/transport_sockets/http_11_proxy/connect.cc @@ -0,0 +1,139 @@ +#include "source/extensions/transport_sockets/http_11_proxy/connect.h" + +#include + +#include "envoy/network/transport_socket.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/scalar_to_byte_vector.h" +#include "source/common/common/utility.h" +#include "source/common/network/address_impl.h" + +namespace Envoy { +namespace Extensions { +namespace TransportSockets { +namespace Http11Connect { + +UpstreamHttp11ConnectSocket::UpstreamHttp11ConnectSocket( + Network::TransportSocketPtr&& transport_socket, + Network::TransportSocketOptionsConstSharedPtr options) + : PassthroughSocket(std::move(transport_socket)), options_(options) { + if (options_ && options_->http11ProxyInfo() && transport_socket_->ssl()) { + header_buffer_.add( + absl::StrCat("CONNECT ", options_->http11ProxyInfo()->hostname, ":443 HTTP/1.1\r\n\r\n")); + need_to_strip_connect_response_ = true; + } +} + +void UpstreamHttp11ConnectSocket::setTransportSocketCallbacks( + Network::TransportSocketCallbacks& callbacks) { + transport_socket_->setTransportSocketCallbacks(callbacks); + callbacks_ = &callbacks; +} + +Network::IoResult UpstreamHttp11ConnectSocket::doWrite(Buffer::Instance& buffer, bool end_stream) { + if (header_buffer_.length() > 0) { + return writeHeader(); + } + if (!need_to_strip_connect_response_) { + // Don't pass events up until the connect response is read because TLS reads + // kick off writes which don't pass through the transport socket. + return transport_socket_->doWrite(buffer, end_stream); + } + return Network::IoResult{Network::PostIoAction::KeepOpen, 0, false}; +} + +Network::IoResult UpstreamHttp11ConnectSocket::doRead(Buffer::Instance& buffer) { + if (need_to_strip_connect_response_) { + // Limit the CONNECT response headers to an arbitrary 200 bytes. + constexpr uint32_t MAX_RESPONSE_HEADER_SIZE = 200; + char peek_buf[MAX_RESPONSE_HEADER_SIZE]; + Api::IoCallUint64Result result = + callbacks_->ioHandle().recv(peek_buf, MAX_RESPONSE_HEADER_SIZE, MSG_PEEK); + if (!result.ok() && result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { + return {Network::PostIoAction::Close, 0, false}; + } + absl::string_view peek_data(peek_buf, result.return_value_); + size_t index = peek_data.find("\r\n\r\n"); + if (index == absl::string_view::npos) { + if (result.return_value_ == MAX_RESPONSE_HEADER_SIZE) { + ENVOY_CONN_LOG(trace, "failed to receive CONNECT headers within {} bytes", + callbacks_->connection(), MAX_RESPONSE_HEADER_SIZE); + return {Network::PostIoAction::Close, 0, false}; + } + return Network::IoResult{Network::PostIoAction::KeepOpen, 0, false}; + } + result = callbacks_->ioHandle().read(buffer, index + 4); + if (!result.ok() || result.return_value_ != index + 4) { + ENVOY_CONN_LOG(trace, "failed to drain CONNECT header", callbacks_->connection()); + return {Network::PostIoAction::Close, 0, false}; + } + // Note this is not in any way proper HTTP/1.1 parsing. + // Before this is used with any untrusted upstream, proper checks should + // be done rather than this. + if (!absl::StartsWith(peek_data, "HTTP/1.1 200")) { + ENVOY_CONN_LOG(trace, "Response does not match strict connect checks", + callbacks_->connection()); + return {Network::PostIoAction::Close, 0, false}; + } + buffer.drain(buffer.length()); + need_to_strip_connect_response_ = false; + } + return transport_socket_->doRead(buffer); +} + +Network::IoResult UpstreamHttp11ConnectSocket::writeHeader() { + Network::PostIoAction action = Network::PostIoAction::KeepOpen; + uint64_t bytes_written = 0; + do { + if (header_buffer_.length() == 0) { + break; + } + + Api::IoCallUint64Result result = callbacks_->ioHandle().write(header_buffer_); + + if (!result.ok()) { + ENVOY_CONN_LOG(trace, "write error: {}", callbacks_->connection(), + result.err_->getErrorDetails()); + if (result.err_->getErrorCode() != Api::IoError::IoErrorCode::Again) { + action = Network::PostIoAction::Close; + } + break; + } + ENVOY_CONN_LOG(trace, "write returns: {}", callbacks_->connection(), result.return_value_); + bytes_written += result.return_value_; + } while (true); + + return {action, bytes_written, false}; +} + +UpstreamHttp11ConnectSocketFactory::UpstreamHttp11ConnectSocketFactory( + Network::UpstreamTransportSocketFactoryPtr transport_socket_factory) + : PassthroughFactory(std::move(transport_socket_factory)) {} + +Network::TransportSocketPtr UpstreamHttp11ConnectSocketFactory::createTransportSocket( + Network::TransportSocketOptionsConstSharedPtr options, + std::shared_ptr host) const { + auto inner_socket = transport_socket_factory_->createTransportSocket(options, host); + if (inner_socket == nullptr) { + return nullptr; + } + return std::make_unique(std::move(inner_socket), options); +} + +void UpstreamHttp11ConnectSocketFactory::hashKey( + std::vector& key, Network::TransportSocketOptionsConstSharedPtr options) const { + PassthroughFactory::hashKey(key, options); + if (options && options->http11ProxyInfo().has_value()) { + pushScalarToByteVector( + StringUtil::CaseInsensitiveHash()(options->http11ProxyInfo()->proxy_address->asString()), + key); + pushScalarToByteVector(StringUtil::CaseInsensitiveHash()(options->http11ProxyInfo()->hostname), + key); + } +} + +} // namespace Http11Connect +} // namespace TransportSockets +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/transport_sockets/http_11_proxy/connect.h b/source/extensions/transport_sockets/http_11_proxy/connect.h new file mode 100644 index 0000000000000..fb51f70f96f8c --- /dev/null +++ b/source/extensions/transport_sockets/http_11_proxy/connect.h @@ -0,0 +1,54 @@ +#pragma once + +#include "envoy/network/connection.h" +#include "envoy/network/transport_socket.h" + +#include "source/common/buffer/buffer_impl.h" +#include "source/common/common/logger.h" +#include "source/extensions/transport_sockets/common/passthrough.h" + +namespace Envoy { +namespace Extensions { +namespace TransportSockets { +namespace Http11Connect { + +// If the transport socket options contain http11ProxyInfo and the transport is +// secure, this will prepend a CONNECT request to the outbound data and strip +// the CONNECT response from the inbound data. +class UpstreamHttp11ConnectSocket : public TransportSockets::PassthroughSocket, + public Logger::Loggable { +public: + UpstreamHttp11ConnectSocket(Network::TransportSocketPtr&& transport_socket, + Network::TransportSocketOptionsConstSharedPtr options); + + void setTransportSocketCallbacks(Network::TransportSocketCallbacks& callbacks) override; + Network::IoResult doWrite(Buffer::Instance& buffer, bool end_stream) override; + Network::IoResult doRead(Buffer::Instance& buffer) override; + +private: + void generateHeader(); + Network::IoResult writeHeader(); + + Network::TransportSocketOptionsConstSharedPtr options_; + Network::TransportSocketCallbacks* callbacks_{}; + Buffer::OwnedImpl header_buffer_{}; + bool need_to_strip_connect_response_{}; +}; + +class UpstreamHttp11ConnectSocketFactory : public PassthroughFactory { +public: + UpstreamHttp11ConnectSocketFactory( + Network::UpstreamTransportSocketFactoryPtr transport_socket_factory); + + // Network::TransportSocketFactory + Network::TransportSocketPtr + createTransportSocket(Network::TransportSocketOptionsConstSharedPtr options, + std::shared_ptr host) const override; + void hashKey(std::vector& key, + Network::TransportSocketOptionsConstSharedPtr options) const override; +}; + +} // namespace Http11Connect +} // namespace TransportSockets +} // namespace Extensions +} // namespace Envoy diff --git a/test/common/http/http1/codec_impl_test.cc b/test/common/http/http1/codec_impl_test.cc index 8f024e4d3feae..eafd5a9551ffd 100644 --- a/test/common/http/http1/codec_impl_test.cc +++ b/test/common/http/http1/codec_impl_test.cc @@ -2118,7 +2118,8 @@ class Http1ClientConnectionImplTest : public Http1CodecTestBase { public: void initialize() { codec_ = std::make_unique( - connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_); + connection_, http1CodecStats(), callbacks_, codec_settings_, max_response_headers_count_, + /* passing_through_proxy=*/false); } void readDisableOnRequestEncoder(RequestEncoder* request_encoder, bool disable) { diff --git a/test/extensions/transport_sockets/http_11_proxy/BUILD b/test/extensions/transport_sockets/http_11_proxy/BUILD new file mode 100644 index 0000000000000..ed9fa885ae5fc --- /dev/null +++ b/test/extensions/transport_sockets/http_11_proxy/BUILD @@ -0,0 +1,40 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_extension_cc_test( + name = "connect_test", + srcs = ["connect_test.cc"], + extension_names = ["envoy.transport_sockets.http_11_proxy"], + deps = [ + "//source/extensions/transport_sockets/http_11_proxy:connect", + "//test/mocks/buffer:buffer_mocks", + "//test/mocks/network:io_handle_mocks", + "//test/mocks/network:network_mocks", + "//test/mocks/network:transport_socket_mocks", + ], +) + +envoy_extension_cc_test( + name = "connect_integration_test", + srcs = ["connect_integration_test.cc"], + extension_names = ["envoy.transport_sockets.http_11_proxy"], + deps = [ + "//source/extensions/filters/network/tcp_proxy:config", + "//source/extensions/transport_sockets/http_11_proxy:upstream_config", + "//test/integration:http_integration_lib", + "//test/integration:integration_lib", + "//test/integration/filters:header_to_proxy_filter_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/transport_sockets/http_11_proxy/v3:pkg_cc_proto", + ], +) diff --git a/test/extensions/transport_sockets/http_11_proxy/connect_integration_test.cc b/test/extensions/transport_sockets/http_11_proxy/connect_integration_test.cc new file mode 100644 index 0000000000000..99863f8e86492 --- /dev/null +++ b/test/extensions/transport_sockets/http_11_proxy/connect_integration_test.cc @@ -0,0 +1,293 @@ +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/core/v3/health_check.pb.h" +#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h" + +#include "test/integration/http_integration.h" +#include "test/integration/integration.h" + +namespace Envoy { +namespace { + +class Http11ConnectHttpIntegrationTest : public testing::TestWithParam, + public HttpIntegrationTest { +public: + Http11ConnectHttpIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP1, GetParam()) { + upstream_tls_ = true; + } + + void TearDown() override { + test_server_.reset(); + fake_upstream_connection_.reset(); + fake_upstreams_.clear(); + } + + void initialize() override { + config_helper_.addFilter("{ name: header-to-proxy-filter }"); + if (upstream_tls_) { + config_helper_.configureUpstreamTls(false, false); + } + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* transport_socket = + bootstrap.mutable_static_resources()->mutable_clusters(0)->mutable_transport_socket(); + envoy::config::core::v3::TransportSocket inner_socket; + inner_socket.CopyFrom(*transport_socket); + if (inner_socket.name().empty()) { + inner_socket.set_name("envoy.transport_sockets.raw_buffer"); + } + transport_socket->set_name("envoy.transport_sockets.upstream_http_11_proxy"); + envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport + transport; + transport.mutable_transport_socket()->MergeFrom(inner_socket); + transport_socket->mutable_typed_config()->PackFrom(transport); + + auto* cluster = bootstrap.mutable_static_resources()->mutable_clusters(0); + + ConfigHelper::HttpProtocolOptions protocol_options; + protocol_options.mutable_upstream_http_protocol_options()->set_auto_sni(true); + protocol_options.mutable_explicit_http_config()->mutable_http_protocol_options(); + ConfigHelper::setProtocolOptions(*cluster, protocol_options); + }); + BaseIntegrationTest::initialize(); + if (upstream_tls_) { + addFakeUpstream(createUpstreamTlsContext(upstreamConfig()), Http::CodecType::HTTP1, false); + addFakeUpstream(createUpstreamTlsContext(upstreamConfig()), Http::CodecType::HTTP1, false); + // Read disable the fake upstreams, so we can rawRead rather than read data and decrypt. + fake_upstreams_[1]->setDisableAllAndDoNotEnable(true); + fake_upstreams_[2]->setDisableAllAndDoNotEnable(true); + } else { + addFakeUpstream(Http::CodecType::HTTP1); + addFakeUpstream(Http::CodecType::HTTP1); + } + } + + void stripConnectUpgradeAndRespond() { + // Strip the CONNECT upgrade. + std::string prefix_data; + ASSERT_TRUE(fake_upstream_connection_->waitForInexactRawData("\r\n\r\n", &prefix_data)); + EXPECT_EQ("CONNECT sni.lyft.com:443 HTTP/1.1\r\n\r\n", prefix_data); + + // Ship the CONNECT response. + fake_upstream_connection_->writeRawData("HTTP/1.1 200 OK\r\n\r\n"); + } +}; + +INSTANTIATE_TEST_SUITE_P(IpVersions, Http11ConnectHttpIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +// Test that with no connect-proxy header, the transport socket is a no-op. +TEST_P(Http11ConnectHttpIntegrationTest, NoHeader) { + initialize(); + + // With no connect-proxy header, the original request gets proxied to fake upstream 0. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("foo"), "bar"); + default_response_headers_.setCopy(Envoy::Http::LowerCaseString("foo"), "bar"); + codec_client_ = makeHttpConnection(lookupPort("http")); + auto response = + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0); + + ASSERT_TRUE(response->complete()); + EXPECT_EQ("200", response->headers().getStatusValue()); + ASSERT_FALSE(upstream_request_->headers().get(Http::LowerCaseString("foo")).empty()); + ASSERT_FALSE(response->headers().get(Http::LowerCaseString("foo")).empty()); + + // Second request reuses the connection. + sendRequestAndWaitForResponse(default_request_headers_, 0, default_response_headers_, 0); +} + +// If sending to an HTTP upstream, no CONNECT header will be appended but a +// fully qualified URL will be sent. +TEST_P(Http11ConnectHttpIntegrationTest, CleartextRequestResponse) { + upstream_tls_ = false; + initialize(); + + // Point at the second fake upstream. Envoy doesn't actually know about this one. + absl::string_view second_upstream_address(fake_upstreams_[1]->localAddress()->asStringView()); + codec_client_ = makeHttpConnection(lookupPort("http")); + // The connect-proxy header will be stripped by the header-to-proxy-filter and inserted as + // metadata. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("connect-proxy"), + second_upstream_address); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 1, due to the connect-proxy header. + FakeRawConnectionPtr fake_upstream_raw_connection_; + ASSERT_TRUE(fake_upstreams_[1]->waitForRawConnection(fake_upstream_raw_connection_)); + std::string observed_data; + ASSERT_TRUE(fake_upstream_raw_connection_->waitForData( + FakeRawConnection::waitForInexactMatch("\r\n\r\n"), &observed_data)); + // There should be no CONNECT header. + EXPECT_FALSE(absl::StrContains(observed_data, "CONNECT")); + // The proxied request should use a fully qualified URL. + EXPECT_TRUE(absl::StrContains(observed_data, "GET http://sni.lyft.com/test/long/url HTTP/1.1")) + << observed_data; + EXPECT_TRUE(absl::StrContains(observed_data, "host: sni.lyft.com")); + + // Send a response. + auto response2 = "HTTP/1.1 200 OK\r\ncontent-length: 0\r\nbar: eep\r\n\r\n"; + ASSERT_TRUE(fake_upstream_raw_connection_->write(response2, false)); + + // Wait for the response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); + ASSERT_FALSE(response->headers().get(Http::LowerCaseString("bar")).empty()); +} +// Test sending 2 requests to one proxy +TEST_P(Http11ConnectHttpIntegrationTest, TestMultipleRequestsSignleEndpoint) { + initialize(); + + // Point at the second fake upstream. Envoy doesn't actually know about this one. + absl::string_view second_upstream_address(fake_upstreams_[1]->localAddress()->asStringView()); + codec_client_ = makeHttpConnection(lookupPort("http")); + // The connect-proxy header will be stripped by the header-to-proxy-filter and inserted as + // metadata. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("connect-proxy"), + second_upstream_address); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 1, due to the connect-proxy header. + ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + + stripConnectUpgradeAndRespond(); + + // Enable reading on the new stream, and read the encapsulated request. + ASSERT_TRUE(fake_upstream_connection_->readDisable(false)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // Send the encapsulated response. + default_response_headers_.setCopy(Envoy::Http::LowerCaseString("bar"), "eep"); + upstream_request_->encodeHeaders(default_response_headers_, true); + + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); + // Make sure the upgrade headers were swallowed and the second were received. + ASSERT_FALSE(response->headers().get(Http::LowerCaseString("bar")).empty()); + + // Now send a second request, and make sure it goes to the same upstream. + response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 2, due to the connect-proxy header. + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(default_response_headers_, true); + + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +// Test sending requests to different proxies. +TEST_P(Http11ConnectHttpIntegrationTest, TestMultipleRequestsAndEndpoints) { + initialize(); + + // Point at the second fake upstream. Envoy doesn't actually know about this one. + absl::string_view second_upstream_address(fake_upstreams_[1]->localAddress()->asStringView()); + codec_client_ = makeHttpConnection(lookupPort("http")); + // The connect-proxy header will be stripped by the header-to-proxy-filter and inserted as + // metadata. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("connect-proxy"), + second_upstream_address); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 1, due to the connect-proxy header. + ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + + stripConnectUpgradeAndRespond(); + + // Enable reading on the new stream, and read the encapsulated request. + ASSERT_TRUE(fake_upstream_connection_->readDisable(false)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + + // Send the encapsulated response. + default_response_headers_.setCopy(Envoy::Http::LowerCaseString("bar"), "eep"); + upstream_request_->encodeHeaders(default_response_headers_, true); + + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); + // Make sure the upgrade headers were swallowed and the second were received. + ASSERT_FALSE(response->headers().get(Http::LowerCaseString("bar")).empty()); + + // Now send a second request, and make sure it goes to upstream 2. + absl::string_view third_upstream_address(fake_upstreams_[2]->localAddress()->asStringView()); + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("connect-proxy"), + third_upstream_address); + response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 2, due to the connect-proxy header. + ASSERT_TRUE(fake_upstreams_[2]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + stripConnectUpgradeAndRespond(); + + ASSERT_TRUE(fake_upstream_connection_->readDisable(false)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(default_response_headers_, true); + + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +// Test sending requests to different proxies. +TEST_P(Http11ConnectHttpIntegrationTest, TestMultipleRequestsSingleEndpoint) { + initialize(); + + // Point at the second fake upstream. Envoy doesn't actually know about this one. + absl::string_view second_upstream_address(fake_upstreams_[1]->localAddress()->asStringView()); + codec_client_ = makeHttpConnection(lookupPort("http")); + // The connect-proxy header will be stripped by the header-to-proxy-filter and inserted as + // metadata. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("connect-proxy"), + second_upstream_address); + auto response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 1, due to the connect-proxy header. + ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + stripConnectUpgradeAndRespond(); + + ASSERT_TRUE(fake_upstream_connection_->readDisable(false)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(default_response_headers_, true); + + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); + + // Now send a second request to the same fake upstream. Envoy will pipeline and reuse the + // connection so no need to strip the connect. + default_request_headers_.setCopy(Envoy::Http::LowerCaseString("request2"), "val2"); + response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + EXPECT_FALSE(upstream_request_->headers().get(Http::LowerCaseString("request2")).empty()); + + upstream_request_->encodeHeaders(default_response_headers_, true); + // Wait for the encapsulated response to be received. + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); + + // Now send a request without the connect-proxy header. Make sure it doesn't get pooled in. + default_request_headers_.remove(Envoy::Http::LowerCaseString("connect-proxy")); + response = codec_client_->makeHeaderOnlyRequest(default_request_headers_); + + // The request should be sent to fake upstream 0. + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + // No encapsulation. + EXPECT_TRUE(upstream_request_->waitForEndStream(*dispatcher_)); + upstream_request_->encodeHeaders(default_response_headers_, true); + ASSERT_TRUE(response->waitForEndStream()); + EXPECT_EQ("200", response->headers().getStatusValue()); +} + +// TODO(alyssawilk) test with Dynamic Forward Proxy, and make sure we will skip the DNS lookup in +// case DNS to those endpoints is disallowed. + +} // namespace +} // namespace Envoy diff --git a/test/extensions/transport_sockets/http_11_proxy/connect_test.cc b/test/extensions/transport_sockets/http_11_proxy/connect_test.cc new file mode 100644 index 0000000000000..c439813e04906 --- /dev/null +++ b/test/extensions/transport_sockets/http_11_proxy/connect_test.cc @@ -0,0 +1,389 @@ +#include "source/common/buffer/buffer_impl.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/filter_state_proxy_info.h" +#include "source/common/network/transport_socket_options_impl.h" +#include "source/extensions/transport_sockets/http_11_proxy/connect.h" + +#include "test/mocks/buffer/mocks.h" +#include "test/mocks/network/io_handle.h" +#include "test/mocks/network/mocks.h" +#include "test/mocks/network/transport_socket.h" +#include "test/mocks/ssl/mocks.h" +#include "test/test_common/environment.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::AnyNumber; +using testing::ByMove; +using testing::Const; +using testing::InSequence; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace TransportSockets { +namespace Http11Connect { +namespace { + +class Http11ConnectTest : public testing::TestWithParam { +public: + Http11ConnectTest() = default; + void initialize(bool no_proxy_protocol = false) { + std::string address_string = + absl::StrCat(Network::Test::getLoopbackAddressUrlString(GetParam()), ":1234"); + Network::Address::InstanceConstSharedPtr address = + Network::Utility::parseInternetAddressAndPort(address_string); + auto info = + std::make_unique("www.foo.com", address); + if (no_proxy_protocol) { + info.reset(); + } + + options_ = std::make_shared( + "", std::vector{}, std::vector{}, std::vector{}, + absl::nullopt, nullptr, std::move(info)); + + setAddress(); + auto inner_socket = std::make_unique>(); + inner_socket_ = inner_socket.get(); + EXPECT_CALL(*inner_socket_, ssl()).Times(AnyNumber()).WillRepeatedly(Return(ssl_)); + EXPECT_CALL(Const(*inner_socket_), ssl()).Times(AnyNumber()).WillRepeatedly(Return(ssl_)); + + ON_CALL(transport_callbacks_, ioHandle()).WillByDefault(ReturnRef(io_handle_)); + connect_socket_ = + std::make_unique(std::move(inner_socket), options_); + connect_socket_->setTransportSocketCallbacks(transport_callbacks_); + connect_socket_->onConnected(); + } + + void setAddress() { + std::string address_string = + absl::StrCat(Network::Test::getLoopbackAddressUrlString(GetParam()), ":1234"); + auto address = Network::Utility::parseInternetAddressAndPort(address_string); + transport_callbacks_.connection_.stream_info_.filterState()->setData( + "envoy.network.transport_socket.http_11_proxy.address", + std::make_unique("www.foo.com", address), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::FilterChain); + } + + Network::TransportSocketOptionsConstSharedPtr options_; + NiceMock* inner_socket_; + NiceMock io_handle_; + std::unique_ptr connect_socket_; + NiceMock transport_callbacks_; + Buffer::OwnedImpl connect_data_{"CONNECT www.foo.com:443 HTTP/1.1\r\n\r\n"}; + Ssl::ConnectionInfoConstSharedPtr ssl_{ + std::make_shared>()}; +}; + +// Test injects CONNECT only once +TEST_P(Http11ConnectTest, InjectesHeaderOnlyOnce) { + initialize(); + + EXPECT_CALL(io_handle_, write(BufferStringEqual(connect_data_.toString()))) + .WillOnce(Invoke([&](Buffer::Instance& buffer) { + auto length = buffer.length(); + buffer.drain(length); + return Api::IoCallUint64Result(length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + Buffer::OwnedImpl msg("initial data"); + + Network::IoResult rc1 = connect_socket_->doWrite(msg, false); + // Only the connect will be written initially. All other writes should be + // buffered in the Network::Connection buffer until the connect has been + // processed. + EXPECT_EQ(connect_data_.length(), rc1.bytes_processed_); + Network::IoResult rc2 = connect_socket_->doWrite(msg, false); + EXPECT_EQ(0, rc2.bytes_processed_); + + EXPECT_CALL(*inner_socket_, onConnected()); + connect_socket_->onConnected(); +} + +// Test the socket is a no-op if there's no header proto. +TEST_P(Http11ConnectTest, NoInjectHeaderProtoAbsent) { + initialize(true); + + EXPECT_CALL(io_handle_, write(_)).Times(0); + Buffer::OwnedImpl msg("initial data"); + Buffer::OwnedImpl msg2("new data"); + + { + InSequence s; + EXPECT_CALL(*inner_socket_, doWrite(BufferEqual(&msg), false)) + .WillOnce(Return(Network::IoResult{Network::PostIoAction::KeepOpen, msg.length(), false})); + EXPECT_CALL(*inner_socket_, doWrite(BufferEqual(&msg2), false)) + .WillOnce(Return(Network::IoResult{Network::PostIoAction::KeepOpen, msg2.length(), false})); + } + + Network::IoResult rc1 = connect_socket_->doWrite(msg, false); + EXPECT_EQ(msg.length(), rc1.bytes_processed_); + Network::IoResult rc2 = connect_socket_->doWrite(msg2, false); + EXPECT_EQ(msg2.length(), rc2.bytes_processed_); + + EXPECT_CALL(*inner_socket_, onConnected()); + connect_socket_->onConnected(); + + // Make sure the response path is a no-op as well. + EXPECT_CALL(io_handle_, recv(_, _, _)).Times(0); + EXPECT_CALL(io_handle_, read(_, _)).Times(0); + Buffer::OwnedImpl buffer(""); + connect_socket_->doRead(buffer); +} + +TEST_P(Http11ConnectTest, NoInjectTlsAbsent) { + ssl_.reset(); + initialize(false); + + EXPECT_CALL(io_handle_, write(_)).Times(0); + Buffer::OwnedImpl msg("initial data"); + Buffer::OwnedImpl msg2("new data"); + + { + InSequence s; + EXPECT_CALL(*inner_socket_, doWrite(BufferEqual(&msg), false)) + .WillOnce(Return(Network::IoResult{Network::PostIoAction::KeepOpen, msg.length(), false})); + EXPECT_CALL(*inner_socket_, doWrite(BufferEqual(&msg2), false)) + .WillOnce(Return(Network::IoResult{Network::PostIoAction::KeepOpen, msg2.length(), false})); + } + + Network::IoResult rc1 = connect_socket_->doWrite(msg, false); + EXPECT_EQ(msg.length(), rc1.bytes_processed_); + Network::IoResult rc2 = connect_socket_->doWrite(msg2, false); + EXPECT_EQ(msg2.length(), rc2.bytes_processed_); + + EXPECT_CALL(*inner_socket_, onConnected()); + connect_socket_->onConnected(); + + // Make sure the response path is a no-op as well. + EXPECT_CALL(io_handle_, recv(_, _, _)).Times(0); + EXPECT_CALL(io_handle_, read(_, _)).Times(0); + Buffer::OwnedImpl buffer(""); + connect_socket_->doRead(buffer); +} + +// Test returns KeepOpen action when write error is EAGAIN +TEST_P(Http11ConnectTest, ReturnsKeepOpenWhenWriteErrorIsAgain) { + initialize(); + + Buffer::OwnedImpl msg("initial data"); + { + InSequence s; + EXPECT_CALL(io_handle_, write(BufferStringEqual(connect_data_.toString()))) + .WillOnce(Invoke([&](Buffer::Instance&) { + return Api::IoCallUint64Result( + 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), + Network::IoSocketError::deleteIoError)); + })); + EXPECT_CALL(io_handle_, write(BufferStringEqual(connect_data_.toString()))) + .WillOnce(Invoke([&](Buffer::Instance& buffer) { + auto length = buffer.length(); + buffer.drain(length); + return Api::IoCallUint64Result(length, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + } + + Network::IoResult rc = connect_socket_->doWrite(msg, false); + EXPECT_EQ(Network::PostIoAction::KeepOpen, rc.action_); + rc = connect_socket_->doWrite(msg, false); + EXPECT_EQ(Network::PostIoAction::KeepOpen, rc.action_); +} + +// Test returns Close action when write error is not EAGAIN +TEST_P(Http11ConnectTest, ReturnsCloseWhenWriteErrorIsNotAgain) { + initialize(); + + Buffer::OwnedImpl msg("initial data"); + { + InSequence s; + EXPECT_CALL(io_handle_, write(_)).WillOnce(Invoke([&](Buffer::Instance&) { + return Api::IoCallUint64Result(0, Api::IoErrorPtr(new Network::IoSocketError(EADDRNOTAVAIL), + Network::IoSocketError::deleteIoError)); + })); + } + + Network::IoResult rc = connect_socket_->doWrite(msg, false); + EXPECT_EQ(Network::PostIoAction::Close, rc.action_); +} + +// Test stripping the header. +TEST_P(Http11ConnectTest, StipsHeaderOnce) { + initialize(); + + std::string connect("HTTP/1.1 200 OK\r\n\r\n"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Invoke([&initial_data](void* buffer, size_t, int) { + memcpy(buffer, initial_data.data(), initial_data.length()); + return Api::IoCallUint64Result(initial_data.length(), + Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + absl::optional expected_bytes(connect.length()); + EXPECT_CALL(io_handle_, read(_, expected_bytes)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + connect.length(), Api::IoErrorPtr(nullptr, [](Api::IoError*) {}))))); + EXPECT_CALL(*inner_socket_, doRead(_)) + .WillOnce(Return(Network::IoResult{Network::PostIoAction::KeepOpen, 1, false})); + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::KeepOpen, result.action_); +} + +TEST_P(Http11ConnectTest, InsufficientData) { + initialize(); + + std::string connect("HTTP/1.1 200 OK\r\n\r"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Invoke([&initial_data](void* buffer, size_t, int) { + memcpy(buffer, initial_data.data(), initial_data.length()); + return Api::IoCallUint64Result(initial_data.length(), + Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + absl::optional expected_bytes(connect.length()); + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::KeepOpen, result.action_); +} + +TEST_P(Http11ConnectTest, PeekFail) { + initialize(); + + std::string connect("HTTP/1.1 200 OK\r\n\r\n"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Return(ByMove( + Api::IoCallUint64Result({}, Api::IoErrorPtr(new Network::IoSocketError(EADDRNOTAVAIL), + Network::IoSocketError::deleteIoError))))); + EXPECT_CALL(io_handle_, read(_, _)).Times(0); + EXPECT_CALL(*inner_socket_, doRead(_)).Times(0); + + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::Close, result.action_); +} + +// Test read fail after successful peek +TEST_P(Http11ConnectTest, ReadFail) { + initialize(); + + std::string connect("HTTP/1.1 200 OK\r\n\r\n"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Invoke([&initial_data](void* buffer, size_t, int) { + memcpy(buffer, initial_data.data(), initial_data.length()); + return Api::IoCallUint64Result(initial_data.length(), + Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + absl::optional expected_bytes(connect.length()); + EXPECT_CALL(io_handle_, read(_, expected_bytes)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + connect.length(), Api::IoErrorPtr(new Network::IoSocketError(EADDRNOTAVAIL), + Network::IoSocketError::deleteIoError))))); + EXPECT_CALL(*inner_socket_, doRead(_)).Times(0); + + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::Close, result.action_); +} + +// Test short read after successful peek. +TEST_P(Http11ConnectTest, ShortRead) { + initialize(); + + std::string connect("HTTP/1.1 200 OK\r\n\r\n"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Invoke([&initial_data](void* buffer, size_t, int) { + memcpy(buffer, initial_data.data(), initial_data.length()); + return Api::IoCallUint64Result(initial_data.length(), + Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + absl::optional expected_bytes(connect.length()); + EXPECT_CALL(io_handle_, read(_, expected_bytes)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + connect.length() - 1, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}))))); + EXPECT_CALL(*inner_socket_, doRead(_)).Times(0); + + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::Close, result.action_); +} + +// If headers exceed 200 bytes, read fails. +TEST_P(Http11ConnectTest, LongHeaders) { + initialize(); + + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)).WillOnce(Invoke([](void* buffer, size_t, int) { + memset(buffer, 0, 200); + return Api::IoCallUint64Result(200, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + EXPECT_CALL(io_handle_, read(_, _)).Times(0); + EXPECT_CALL(*inner_socket_, doRead(_)).Times(0); + + Buffer::OwnedImpl buffer(""); + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::Close, result.action_); +} + +// If response is not 200 OK, read fails. +TEST_P(Http11ConnectTest, InvalidResponse) { + initialize(); + + std::string connect("HTTP/1.1 404 Not Found\r\n\r\n"); + std::string initial_data(connect + "follow up data"); + EXPECT_CALL(io_handle_, recv(_, 200, MSG_PEEK)) + .WillOnce(Invoke([&initial_data](void* buffer, size_t, int) { + memcpy(buffer, initial_data.data(), initial_data.length()); + return Api::IoCallUint64Result(initial_data.length(), + Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + })); + absl::optional expected_bytes(connect.length()); + EXPECT_CALL(io_handle_, read(_, expected_bytes)) + .WillOnce(Return(ByMove(Api::IoCallUint64Result( + connect.length(), Api::IoErrorPtr(nullptr, [](Api::IoError*) {}))))); + + EXPECT_CALL(*inner_socket_, doRead(_)).Times(0); + + Buffer::OwnedImpl buffer(""); + EXPECT_LOG_CONTAINS("trace", "Response does not match strict connect checks", { + auto result = connect_socket_->doRead(buffer); + EXPECT_EQ(Network::PostIoAction::Close, result.action_); + }); +} + +INSTANTIATE_TEST_SUITE_P(IpVersions, Http11ConnectTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + +class SocketFactoryTest : public testing::Test { +public: + void initialize() { + auto inner_factory = std::make_unique>(); + inner_factory_ = inner_factory.get(); + factory_ = std::make_unique(std::move(inner_factory)); + } + + NiceMock* inner_factory_; + std::unique_ptr factory_; +}; + +// Test createTransportSocket returns nullptr if inner call returns nullptr +TEST_F(SocketFactoryTest, CreateSocketReturnsNullWhenInnerFactoryReturnsNull) { + initialize(); + EXPECT_CALL(*inner_factory_, createTransportSocket(_, _)).WillOnce(testing::ReturnNull()); + ASSERT_EQ(nullptr, factory_->createTransportSocket(nullptr, nullptr)); +} + +} // namespace +} // namespace Http11Connect +} // namespace TransportSockets +} // namespace Extensions +} // namespace Envoy diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index 292ec4d32f5f7..e50c71aaf6fc9 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -11,6 +11,7 @@ #include "source/common/http/http1/codec_impl.h" #include "source/common/http/http2/codec_impl.h" #include "source/common/network/address_impl.h" +#include "source/common/network/connection_impl.h" #include "source/common/network/listen_socket_impl.h" #include "source/common/network/socket_option_factory.h" #include "source/common/network/utility.h" @@ -265,6 +266,21 @@ AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, return succeeded; } +testing::AssertionResult +FakeStream::waitForData(Event::Dispatcher& client_dispatcher, + const FakeStream::ValidatorFunction& data_validator, + std::chrono::milliseconds timeout) { + absl::MutexLock lock(&lock_); + if (!waitForWithDispatcherRun( + time_system_, lock_, + [this, data_validator]() + ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(body_.toString()); }, + client_dispatcher, timeout)) { + return AssertionFailure() << "Timed out waiting for data."; + } + return AssertionSuccess(); +} + AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher, milliseconds timeout) { absl::MutexLock lock(&lock_); @@ -599,6 +615,9 @@ bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, // initialization is complete. connection.detectEarlyCloseWhenReadDisabled(false); connection.readDisable(true); + if (disable_and_do_not_enable_) { + dynamic_cast(&connection)->ioHandle().enableFileEvents(0); + } } auto connection_wrapper = std::make_unique(connection); @@ -752,6 +771,18 @@ AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connect }); } +void FakeUpstream::convertFromRawToHttp(FakeRawConnectionPtr& raw_connection, + FakeHttpConnectionPtr& connection) { + absl::MutexLock lock(&lock_); + SharedConnectionWrapper& shared_connection = raw_connection->sharedConnection(); + + connection = std::make_unique( + *this, shared_connection, http_type_, time_system_, config_.max_request_headers_kb_, + config_.max_request_headers_count_, config_.headers_with_underscores_action_); + connection->initialize(); + raw_connection.release(); +} + SharedConnectionWrapper& FakeUpstream::consumeConnection() { ASSERT(!new_connections_.empty()); auto* const connection_wrapper = new_connections_.front().get(); @@ -761,7 +792,7 @@ SharedConnectionWrapper& FakeUpstream::consumeConnection() { connection_wrapper->setParented(); connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); if (read_disable_on_new_connection_ && connection_wrapper->connected() && - http_type_ != Http::CodecType::HTTP3) { + http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) { // Re-enable read and early close detection. auto& connection = connection_wrapper->connection(); connection.detectEarlyCloseWhenReadDisabled(true); @@ -916,4 +947,41 @@ Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& da data.drain(data.length()); return Network::FilterStatus::StopIteration; } + +ABSL_MUST_USE_RESULT +testing::AssertionResult +FakeHttpConnection::waitForInexactRawData(const char* data, std::string* out, + std::chrono::milliseconds timeout) { + absl::MutexLock lock(&lock_); + const auto reached = [this, data, &out]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { + char peek_buf[200]; + auto result = dynamic_cast(&connection()) + ->ioHandle() + .recv(peek_buf, 200, MSG_PEEK); + ASSERT(result.ok()); + absl::string_view peek_data(peek_buf, result.return_value_); + size_t index = peek_data.find(data); + if (index != absl::string_view::npos) { + Buffer::OwnedImpl buffer; + *out = std::string(peek_data.data(), index + 4); + auto result = dynamic_cast(&connection()) + ->ioHandle() + .recv(peek_buf, index + 4, 0); + return true; + } + return false; + }; + if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { + return AssertionFailure() << "timed out waiting for raw data"; + } + return AssertionSuccess(); +} + +void FakeHttpConnection::writeRawData(absl::string_view data) { + Buffer::OwnedImpl buffer(data); + Api::IoCallUint64Result result = + dynamic_cast(&connection())->ioHandle().write(buffer); + ASSERT(result.ok()); +} + } // namespace Envoy diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 86a655bc7dda8..4312e11bb1a35 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -139,6 +139,12 @@ class FakeStream : public Http::RequestDecoder, waitForData(Event::Dispatcher& client_dispatcher, absl::string_view body, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + using ValidatorFunction = const std::function; + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForData(Event::Dispatcher& client_dispatcher, const ValidatorFunction& data_validator, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + ABSL_MUST_USE_RESULT testing::AssertionResult waitForEndStream( Event::Dispatcher& client_dispatcher, @@ -413,6 +419,7 @@ class FakeConnectionBase : public Logger::Loggable { bool connected() const { return shared_connection_.connected(); } void postToConnectionThread(std::function cb); + SharedConnectionWrapper& sharedConnection() { return shared_connection_; } protected: FakeConnectionBase(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system) @@ -473,6 +480,13 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo // Update the maximum number of concurrent streams. void updateConcurrentStreams(uint64_t max_streams); + ABSL_MUST_USE_RESULT + testing::AssertionResult + waitForInexactRawData(const char* data, std::string* out = nullptr, + std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); + + void writeRawData(absl::string_view data); + private: struct ReadFilter : public Network::ReadFilterBaseImpl { ReadFilter(FakeHttpConnection& parent) : parent_(parent) {} @@ -640,6 +654,9 @@ class FakeUpstream : Logger::Loggable, return socket_->connectionInfoProvider().localAddress(); } + void convertFromRawToHttp(FakeRawConnectionPtr& raw_connection, + FakeHttpConnectionPtr& connection); + virtual std::unique_ptr makeRawConnection(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system) { @@ -678,6 +695,7 @@ class FakeUpstream : Logger::Loggable, Network::UdpReadFilterCallbacks& callbacks) override; void setReadDisableOnNewConnection(bool value) { read_disable_on_new_connection_ = value; } + void setDisableAllAndDoNotEnable(bool value) { disable_and_do_not_enable_ = value; } Event::TestTimeSystem& timeSystem() { return time_system_; } // Stops the dispatcher loop and joins the listening thread. @@ -869,7 +887,11 @@ class FakeUpstream : Logger::Loggable, std::list consumed_connections_ ABSL_GUARDED_BY(lock_); std::list quic_connections_ ABSL_GUARDED_BY(lock_); const FakeUpstreamConfig config_; + // Normally connections are read disabled until a fake raw or http connection + // is created, and are then read enabled. Setting these true skips both these. bool read_disable_on_new_connection_; + // Setting this true disables all events and does not re-enable as the above does. + bool disable_and_do_not_enable_{}; const bool enable_half_close_; FakeListener listener_; const Network::FilterChainSharedPtr filter_chain_; diff --git a/test/integration/filters/BUILD b/test/integration/filters/BUILD index 5d3ff51b45fc9..e29e28a2cc8e5 100644 --- a/test/integration/filters/BUILD +++ b/test/integration/filters/BUILD @@ -51,6 +51,22 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "header_to_proxy_filter_lib", + srcs = [ + "header_to_proxy_filter.cc", + ], + deps = [ + ":common_lib", + "//envoy/http:filter_interface", + "//envoy/registry", + "//envoy/server:filter_config_interface", + "//source/common/network:filter_state_proxy_info_lib", + "//source/extensions/filters/http/common:pass_through_filter_lib", + "//test/extensions/filters/http/common:empty_http_filter_config_lib", + ], +) + envoy_cc_test_library( name = "local_reply_during_encoding_filter_lib", srcs = [ diff --git a/test/integration/filters/header_to_proxy_filter.cc b/test/integration/filters/header_to_proxy_filter.cc new file mode 100644 index 0000000000000..8dd9ba16f4249 --- /dev/null +++ b/test/integration/filters/header_to_proxy_filter.cc @@ -0,0 +1,54 @@ +#include + +#include "envoy/http/filter.h" +#include "envoy/registry/registry.h" +#include "envoy/server/filter_config.h" + +#include "source/common/network/filter_state_proxy_info.h" +#include "source/common/network/utility.h" +#include "source/extensions/filters/http/common/pass_through_filter.h" + +#include "test/extensions/filters/http/common/empty_http_filter_config.h" + +namespace Envoy { + +// A filter that looks for a specific header, and takes the address from that +// header and inserts proxy override metadata. +class HeaderToProxyFilter : public Http::PassThroughFilter { +public: + Http::FilterHeadersStatus decodeHeaders(Http::RequestHeaderMap& request_headers, bool) override { + auto connect_proxy = Http::LowerCaseString("connect-proxy"); + auto hostname = request_headers.getHostValue(); + ASSERT(!hostname.empty()); + if (!request_headers.get(connect_proxy).empty()) { + std::string address_string(request_headers.get(connect_proxy)[0]->value().getStringView()); + auto address = Network::Utility::parseInternetAddressAndPort(address_string); + decoder_callbacks_->streamInfo().filterState()->setData( + Network::Http11ProxyInfoFilterState::key(), + std::make_unique(hostname, address), + StreamInfo::FilterState::StateType::ReadOnly, + StreamInfo::FilterState::LifeSpan::FilterChain); + request_headers.remove(connect_proxy); + } + return Http::FilterHeadersStatus::Continue; + } +}; + +class HeaderToProxyFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig { +public: + HeaderToProxyFilterConfig() : EmptyHttpFilterConfig("header-to-proxy-filter") {} + + Http::FilterFactoryCb createFilter(const std::string&, + Server::Configuration::FactoryContext&) override { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamFilter(std::make_shared<::Envoy::HeaderToProxyFilter>()); + }; + } +}; + +// perform static registration +static Registry::RegisterFactory + register_; + +} // namespace Envoy diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index 1f0a69f0933c4..db506f278d732 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -85,7 +85,7 @@ IntegrationCodecClient::IntegrationCodecClient( Event::Dispatcher& dispatcher, Random::RandomGenerator& random, Network::ClientConnectionPtr&& conn, Upstream::HostDescriptionConstSharedPtr host_description, Http::CodecType type, bool wait_till_connected) - : CodecClientProd(type, std::move(conn), host_description, dispatcher, random), + : CodecClientProd(type, std::move(conn), host_description, dispatcher, random, nullptr), dispatcher_(dispatcher), callbacks_(*this, wait_till_connected), codec_callbacks_(*this), codec_client_callbacks_(*this) { connection_->addConnectionCallbacks(callbacks_); diff --git a/test/integration/utility.cc b/test/integration/utility.cc index faf211a082578..2a984f5dad340 100644 --- a/test/integration/utility.cc +++ b/test/integration/utility.cc @@ -193,6 +193,7 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt Filesystem::fileSystemForTest(), random_generator, bootstrap); Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread")); TestConnectionCallbacks connection_callbacks(*dispatcher); + Network::TransportSocketOptionsConstSharedPtr options; std::shared_ptr cluster{new NiceMock()}; Upstream::HostDescriptionConstSharedPtr host_description{Upstream::makeTestHostDescription( @@ -204,7 +205,7 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt dispatcher->createClientConnection( addr, Network::Address::InstanceConstSharedPtr(), Network::Test::createRawBufferSocket(), nullptr, nullptr), - host_description, *dispatcher, random); + host_description, *dispatcher, random, options); return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, client); } @@ -231,7 +232,8 @@ IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPt static_cast(addr->ip()->port())), *dispatcher, addr, local_address, quic_stat_names, {}, mock_stats_store, nullptr, nullptr); connection->addConnectionCallbacks(connection_callbacks); - Http::CodecClientProd client(type, std::move(connection), host_description, *dispatcher, random); + Http::CodecClientProd client(type, std::move(connection), host_description, *dispatcher, random, + options); // Quic connection needs to finish handshake. dispatcher->run(Event::Dispatcher::RunType::Block); return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, client);