diff --git a/source/extensions/filters/network/thrift_proxy/BUILD b/source/extensions/filters/network/thrift_proxy/BUILD index be42dd471b500..8b6e66a083881 100644 --- a/source/extensions/filters/network/thrift_proxy/BUILD +++ b/source/extensions/filters/network/thrift_proxy/BUILD @@ -47,6 +47,7 @@ envoy_cc_extension( ":framed_transport_lib", ":header_transport_lib", ":protocol_interface", + ":protocol_options_config_lib", ":twitter_protocol_lib", ":unframed_transport_lib", "//envoy/registry", @@ -265,6 +266,15 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "protocol_options_config_lib", + hdrs = ["protocol_options_config.h"], + deps = [ + ":thrift_lib", + "//envoy/upstream:upstream_interface", + ], +) + envoy_cc_library( name = "thrift_lib", hdrs = ["thrift.h"], diff --git a/source/extensions/filters/network/thrift_proxy/config.h b/source/extensions/filters/network/thrift_proxy/config.h index 8cc5d722071f4..1eea3cc9e6c8f 100644 --- a/source/extensions/filters/network/thrift_proxy/config.h +++ b/source/extensions/filters/network/thrift_proxy/config.h @@ -9,6 +9,7 @@ #include "source/extensions/filters/network/common/factory_base.h" #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" #include "source/extensions/filters/network/thrift_proxy/filters/filter.h" +#include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h" #include "source/extensions/filters/network/thrift_proxy/router/router_impl.h" #include "source/extensions/filters/network/well_known_names.h" diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.h b/source/extensions/filters/network/thrift_proxy/conn_manager.h index f4e19152d0aa1..93c690fb96f55 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.h +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.h @@ -42,17 +42,6 @@ class Config { virtual uint64_t maxRequestsPerConnection() const PURE; }; -/** - * Extends Upstream::ProtocolOptionsConfig with Thrift-specific cluster options. - */ -class ProtocolOptionsConfig : public Upstream::ProtocolOptionsConfig { -public: - ~ProtocolOptionsConfig() override = default; - - virtual TransportType transport(TransportType downstream_transport) const PURE; - virtual ProtocolType protocol(ProtocolType downstream_protocol) const PURE; -}; - /** * ConnectionManager is a Network::Filter that will perform Thrift request handling on a connection. */ diff --git a/source/extensions/filters/network/thrift_proxy/protocol_options_config.h b/source/extensions/filters/network/thrift_proxy/protocol_options_config.h new file mode 100644 index 0000000000000..e6e1b1952c4ab --- /dev/null +++ b/source/extensions/filters/network/thrift_proxy/protocol_options_config.h @@ -0,0 +1,26 @@ +#pragma once + +#include "envoy/upstream/upstream.h" + +#include "source/extensions/filters/network/thrift_proxy/thrift.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ThriftProxy { + +/** + * Extends Upstream::ProtocolOptionsConfig with Thrift-specific cluster options. + */ +class ProtocolOptionsConfig : public Upstream::ProtocolOptionsConfig { +public: + ~ProtocolOptionsConfig() override = default; + + virtual TransportType transport(TransportType downstream_transport) const PURE; + virtual ProtocolType protocol(ProtocolType downstream_protocol) const PURE; +}; + +} // namespace ThriftProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/thrift_proxy/router/BUILD b/source/extensions/filters/network/thrift_proxy/router/BUILD index 7eb78e9adb33d..7809f34af81b2 100644 --- a/source/extensions/filters/network/thrift_proxy/router/BUILD +++ b/source/extensions/filters/network/thrift_proxy/router/BUILD @@ -30,8 +30,12 @@ envoy_cc_library( "//envoy/router:router_interface", "//envoy/tcp:conn_pool_interface", "//source/common/buffer:buffer_lib", + "//source/common/common:logger_lib", + "//source/extensions/filters/network:well_known_names", + "//source/extensions/filters/network/thrift_proxy:app_exception_lib", "//source/extensions/filters/network/thrift_proxy:metadata_lib", "//source/extensions/filters/network/thrift_proxy:protocol_converter_lib", + "//source/extensions/filters/network/thrift_proxy:protocol_options_config_lib", ], ) @@ -52,6 +56,7 @@ envoy_cc_library( deps = [ ":router_interface", "//envoy/tcp:conn_pool_interface", + "//source/common/common:logger_lib", "//source/extensions/filters/network/thrift_proxy:app_exception_lib", "//source/extensions/filters/network/thrift_proxy:conn_manager_lib", "//source/extensions/filters/network/thrift_proxy:thrift_object_interface", @@ -72,11 +77,9 @@ envoy_cc_library( "//envoy/upstream:cluster_manager_interface", "//envoy/upstream:load_balancer_interface", "//envoy/upstream:thread_local_cluster_interface", - "//source/common/common:logger_lib", "//source/common/http:header_utility_lib", "//source/common/router:metadatamatchcriteria_lib", "//source/common/upstream:load_balancer_lib", - "//source/extensions/filters/network:well_known_names", "//source/extensions/filters/network/thrift_proxy:app_exception_lib", "//source/extensions/filters/network/thrift_proxy:conn_manager_lib", "//source/extensions/filters/network/thrift_proxy:protocol_converter_lib", diff --git a/source/extensions/filters/network/thrift_proxy/router/router.h b/source/extensions/filters/network/thrift_proxy/router/router.h index 603a08c4c0159..6d9a3511878a8 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router.h +++ b/source/extensions/filters/network/thrift_proxy/router/router.h @@ -8,8 +8,12 @@ #include "envoy/tcp/conn_pool.h" #include "source/common/buffer/buffer_impl.h" +#include "source/common/common/logger.h" +#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" #include "source/extensions/filters/network/thrift_proxy/metadata.h" #include "source/extensions/filters/network/thrift_proxy/protocol_converter.h" +#include "source/extensions/filters/network/thrift_proxy/protocol_options_config.h" +#include "source/extensions/filters/network/well_known_names.h" namespace Envoy { namespace Extensions { @@ -101,7 +105,7 @@ struct RouterStats { /** * This interface is used by an upstream request to communicate its state. */ -class RequestOwner : public ProtocolConverter { +class RequestOwner : public ProtocolConverter, public Logger::Loggable { public: RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, Stats::Scope& scope) @@ -181,6 +185,11 @@ class RequestOwner : public ProtocolConverter { */ Upstream::ClusterManager& clusterManager() { return cluster_manager_; } + /** + * @return Upstream::Cluster& the upstream cluster associated with the request. + */ + const Upstream::ClusterInfo& cluster() const { return *cluster_; } + /** * Common stats. */ @@ -269,6 +278,85 @@ class RequestOwner : public ProtocolConverter { recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value); } +protected: + struct UpstreamRequestInfo { + bool passthrough_supported; + TransportType transport; + ProtocolType protocol; + absl::optional conn_pool_data; + }; + + struct PrepareUpstreamRequestResult { + absl::optional exception; + absl::optional upstream_request_info; + }; + + PrepareUpstreamRequestResult prepareUpstreamRequest(const std::string& cluster_name, + MessageMetadataSharedPtr& metadata, + TransportType transport, + ProtocolType protocol, + Upstream::LoadBalancerContext* lb_context) { + Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name); + if (!cluster) { + ENVOY_LOG(debug, "unknown cluster '{}'", cluster_name); + stats().unknown_cluster_.inc(); + return {AppException(AppExceptionType::InternalError, + fmt::format("unknown cluster '{}'", cluster_name)), + absl::nullopt}; + } + + cluster_ = cluster->info(); + ENVOY_LOG(debug, "cluster '{}' match for method '{}'", cluster_name, metadata->methodName()); + + switch (metadata->messageType()) { + case MessageType::Call: + incRequestCall(*cluster_); + break; + + case MessageType::Oneway: + incRequestOneWay(*cluster_); + break; + + default: + incRequestInvalid(*cluster_); + break; + } + + if (cluster_->maintenanceMode()) { + stats().upstream_rq_maintenance_mode_.inc(); + return {AppException(AppExceptionType::InternalError, + fmt::format("maintenance mode for cluster '{}'", cluster_name)), + absl::nullopt}; + } + + const std::shared_ptr options = + cluster_->extensionProtocolOptionsTyped( + NetworkFilterNames::get().ThriftProxy); + + const TransportType final_transport = options ? options->transport(transport) : transport; + ASSERT(final_transport != TransportType::Auto); + + const ProtocolType final_protocol = options ? options->protocol(protocol) : protocol; + ASSERT(final_protocol != ProtocolType::Auto); + + auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, lb_context); + if (!conn_pool_data) { + stats().no_healthy_upstream_.inc(); + return {AppException(AppExceptionType::InternalError, + fmt::format("no healthy upstream for '{}'", cluster_name)), + absl::nullopt}; + } + + const auto passthrough_supported = + transport == TransportType::Framed && final_transport == TransportType::Framed && + protocol == final_protocol && final_protocol != ProtocolType::Twitter; + UpstreamRequestInfo result = {passthrough_supported, final_transport, final_protocol, + conn_pool_data}; + return {absl::nullopt, result}; + } + + Upstream::ClusterInfoConstSharedPtr cluster_; + private: void incClusterScopeCounter(const Upstream::ClusterInfo& cluster, const Stats::StatNameVec& names) const { diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index f0a5ef11bb1af..46067ec210f7a 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -9,7 +9,6 @@ #include "source/common/common/utility.h" #include "source/common/router/metadatamatchcriteria_impl.h" #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" -#include "source/extensions/filters/network/well_known_names.h" #include "absl/strings/match.h" @@ -192,6 +191,7 @@ void Router::onDestroy() { void Router::setDecoderFilterCallbacks(ThriftFilters::DecoderFilterCallbacks& callbacks) { callbacks_ = &callbacks; + upstream_response_callbacks_ = std::make_unique(callbacks_); // TODO(zuercher): handle buffer limits } @@ -225,68 +225,11 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { route_entry_ = route_->routeEntry(); const std::string& cluster_name = route_entry_->clusterName(); - Upstream::ThreadLocalCluster* cluster = clusterManager().getThreadLocalCluster(cluster_name); - if (!cluster) { - ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, cluster_name); - stats().unknown_cluster_.inc(); - callbacks_->sendLocalReply(AppException(AppExceptionType::InternalError, - fmt::format("unknown cluster '{}'", cluster_name)), - true); - return FilterStatus::StopIteration; - } - - cluster_ = cluster->info(); - ENVOY_STREAM_LOG(debug, "cluster '{}' match for method '{}'", *callbacks_, cluster_name, - metadata->methodName()); - switch (metadata->messageType()) { - case MessageType::Call: - incRequestCall(*cluster_); - break; - - case MessageType::Oneway: - incRequestOneWay(*cluster_); - break; - - default: - incRequestInvalid(*cluster_); - break; - } - - if (cluster_->maintenanceMode()) { - stats().upstream_rq_maintenance_mode_.inc(); - callbacks_->sendLocalReply( - AppException(AppExceptionType::InternalError, - fmt::format("maintenance mode for cluster '{}'", cluster_name)), - true); - return FilterStatus::StopIteration; - } - - const std::shared_ptr options = - cluster_->extensionProtocolOptionsTyped( - NetworkFilterNames::get().ThriftProxy); - - const TransportType transport = options - ? options->transport(callbacks_->downstreamTransportType()) - : callbacks_->downstreamTransportType(); - ASSERT(transport != TransportType::Auto); - - const ProtocolType protocol = options ? options->protocol(callbacks_->downstreamProtocolType()) - : callbacks_->downstreamProtocolType(); - ASSERT(protocol != ProtocolType::Auto); - - if (callbacks_->downstreamTransportType() == TransportType::Framed && - transport == TransportType::Framed && callbacks_->downstreamProtocolType() == protocol && - protocol != ProtocolType::Twitter) { - passthrough_supported_ = true; - } - - auto conn_pool_data = cluster->tcpConnPool(Upstream::ResourcePriority::Default, this); - if (!conn_pool_data) { - stats().no_healthy_upstream_.inc(); - callbacks_->sendLocalReply( - AppException(AppExceptionType::InternalError, - fmt::format("no healthy upstream for '{}'", cluster_name)), - true); + auto prepare_result = + prepareUpstreamRequest(cluster_name, metadata, callbacks_->downstreamTransportType(), + callbacks_->downstreamProtocolType(), this); + if (prepare_result.exception.has_value()) { + callbacks_->sendLocalReply(prepare_result.exception.value(), true); return FilterStatus::StopIteration; } @@ -300,129 +243,30 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { } } + auto& upstream_req_info = prepare_result.upstream_request_info.value(); + passthrough_supported_ = upstream_req_info.passthrough_supported; upstream_request_ = - std::make_unique(*this, *conn_pool_data, metadata, transport, protocol); + std::make_unique(*this, *upstream_req_info.conn_pool_data, metadata, + upstream_req_info.transport, upstream_req_info.protocol); return upstream_request_->start(); } FilterStatus Router::messageEnd() { ProtocolConverter::messageEnd(); - - Buffer::OwnedImpl transport_buffer; - - upstream_request_->metadata_->setProtocol(upstream_request_->protocol_->type()); - - upstream_request_->transport_->encodeFrame(transport_buffer, *upstream_request_->metadata_, - upstream_request_buffer_); - - request_size_ += transport_buffer.length(); + request_size_ += upstream_request_->encodeAndWrite(upstream_request_buffer_); recordUpstreamRequestSize(*cluster_, request_size_); - - upstream_request_->conn_data_->connection().write(transport_buffer, false); - upstream_request_->onRequestComplete(); return FilterStatus::Continue; } void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) { - ASSERT(!upstream_request_->response_complete_); - - response_size_ += data.length(); - - if (upstream_request_->upgrade_response_ != nullptr) { - ENVOY_STREAM_LOG(trace, "reading upgrade response: {} bytes", *callbacks_, data.length()); - // Handle upgrade response. - if (!upstream_request_->upgrade_response_->onData(data)) { - // Wait for more data. - return; - } - - ENVOY_STREAM_LOG(debug, "upgrade response complete", *callbacks_); - upstream_request_->protocol_->completeUpgrade(*upstream_request_->conn_state_, - *upstream_request_->upgrade_response_); - - upstream_request_->upgrade_response_.reset(); - upstream_request_->onRequestStart(true); - } else { - ENVOY_STREAM_LOG(trace, "reading response: {} bytes", *callbacks_, data.length()); - - // Handle normal response. - if (!upstream_request_->response_started_) { - callbacks_->startUpstreamResponse(*upstream_request_->transport_, - *upstream_request_->protocol_); - upstream_request_->response_started_ = true; - } - - ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data); - if (status == ThriftFilters::ResponseStatus::Complete) { - ENVOY_STREAM_LOG(debug, "response complete", *callbacks_); - - recordUpstreamResponseSize(*cluster_, response_size_); - - switch (callbacks_->responseMetadata()->messageType()) { - case MessageType::Reply: - incResponseReply(*cluster_); - if (callbacks_->responseSuccess()) { - upstream_request_->upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::ExtOriginRequestSuccess); - incResponseReplySuccess(*cluster_); - } else { - upstream_request_->upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::ExtOriginRequestFailed); - incResponseReplyError(*cluster_); - } - break; - - case MessageType::Exception: - upstream_request_->upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::ExtOriginRequestFailed); - incResponseException(*cluster_); - break; - - default: - incResponseInvalidType(*cluster_); - break; - } - upstream_request_->onResponseComplete(); - cleanup(); - return; - } else if (status == ThriftFilters::ResponseStatus::Reset) { - // Note: invalid responses are not accounted in the response size histogram. - ENVOY_STREAM_LOG(debug, "upstream reset", *callbacks_); - upstream_request_->upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::ExtOriginRequestFailed); - upstream_request_->resetStream(); - return; - } - } - - if (end_stream) { - // Response is incomplete, but no more data is coming. - ENVOY_STREAM_LOG(debug, "response underflow", *callbacks_); - upstream_request_->onResponseComplete(); - upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + const bool done = + upstream_request_->handleUpstreamData(data, end_stream, *this, *upstream_response_callbacks_); + if (done) { cleanup(); } } -void Router::onEvent(Network::ConnectionEvent event) { - ASSERT(upstream_request_ && !upstream_request_->response_complete_); - - switch (event) { - case Network::ConnectionEvent::RemoteClose: - ENVOY_STREAM_LOG(debug, "upstream remote close", *callbacks_); - upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); - break; - case Network::ConnectionEvent::LocalClose: - ENVOY_STREAM_LOG(debug, "upstream local close", *callbacks_); - upstream_request_->onResetStream(ConnectionPool::PoolFailureReason::LocalConnectionFailure); - break; - default: - // Connected is consumed by the connection pool. - NOT_REACHED_GCOVR_EXCL_LINE; - } - - upstream_request_->releaseConnection(false); -} +void Router::onEvent(Network::ConnectionEvent event) { upstream_request_->onEvent(event); } const Network::Connection* Router::downstreamConnection() const { if (callbacks_ != nullptr) { diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index 5410028323b88..25575ecda7761 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -11,7 +11,6 @@ #include "envoy/tcp/conn_pool.h" #include "envoy/upstream/load_balancer.h" -#include "source/common/common/logger.h" #include "source/common/http/header_utility.h" #include "source/common/upstream/load_balancer_impl.h" #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" @@ -160,11 +159,29 @@ class RouteMatcher { std::vector routes_; }; +// Adapter from DecoderFilterCallbacks to UpstreamResponseCallbacks. +class UpstreamResponseCallbacksImpl : public UpstreamResponseCallbacks { +public: + UpstreamResponseCallbacksImpl(ThriftFilters::DecoderFilterCallbacks* callbacks) + : callbacks_(callbacks) {} + + void startUpstreamResponse(Transport& transport, Protocol& protocol) override { + callbacks_->startUpstreamResponse(transport, protocol); + } + ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override { + return callbacks_->upstreamData(buffer); + } + MessageMetadataSharedPtr responseMetadata() override { return callbacks_->responseMetadata(); } + bool responseSuccess() override { return callbacks_->responseSuccess(); } + +private: + ThriftFilters::DecoderFilterCallbacks* callbacks_{}; +}; + class Router : public Tcp::ConnectionPool::UpstreamCallbacks, public Upstream::LoadBalancerContextBase, public RequestOwner, - public ThriftFilters::DecoderFilter, - Logger::Loggable { + public ThriftFilters::DecoderFilter { public: Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, Stats::Scope& scope) @@ -213,16 +230,15 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, void cleanup(); ThriftFilters::DecoderFilterCallbacks* callbacks_{}; + std::unique_ptr upstream_response_callbacks_{}; RouteConstSharedPtr route_{}; const RouteEntry* route_entry_{}; - Upstream::ClusterInfoConstSharedPtr cluster_; std::unique_ptr upstream_request_; Buffer::OwnedImpl upstream_request_buffer_; bool passthrough_supported_ : 1; uint64_t request_size_{}; - uint64_t response_size_{}; }; } // namespace Router diff --git a/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc b/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc index 8ac684414efbe..62b26a7e06c05 100644 --- a/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc +++ b/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc @@ -100,6 +100,133 @@ void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_ onRequestStart(continue_decoding); } +void UpstreamRequest::handleUpgradeResponse(Buffer::Instance& data) { + ENVOY_LOG(trace, "reading upgrade response: {} bytes", data.length()); + if (!upgrade_response_->onData(data)) { + // Wait for more data. + return; + } + + ENVOY_LOG(debug, "upgrade response complete"); + protocol_->completeUpgrade(*conn_state_, *upgrade_response_); + upgrade_response_.reset(); + onRequestStart(true); +} + +ThriftFilters::ResponseStatus +UpstreamRequest::handleRegularResponse(Buffer::Instance& data, RequestOwner& owner, + UpstreamResponseCallbacks& callbacks) { + ENVOY_LOG(trace, "reading response: {} bytes", data.length()); + + if (!response_started_) { + callbacks.startUpstreamResponse(*transport_, *protocol_); + response_started_ = true; + } + + const auto& cluster = owner.cluster(); + + const auto status = callbacks.upstreamData(data); + if (status == ThriftFilters::ResponseStatus::Complete) { + ENVOY_LOG(debug, "response complete"); + + owner.recordUpstreamResponseSize(cluster, response_size_); + + switch (callbacks.responseMetadata()->messageType()) { + case MessageType::Reply: + owner.incResponseReply(cluster); + if (callbacks.responseSuccess()) { + upstream_host_->outlierDetector().putResult( + Upstream::Outlier::Result::ExtOriginRequestSuccess); + owner.incResponseReplySuccess(cluster); + } else { + upstream_host_->outlierDetector().putResult( + Upstream::Outlier::Result::ExtOriginRequestFailed); + owner.incResponseReplyError(cluster); + } + break; + + case MessageType::Exception: + upstream_host_->outlierDetector().putResult( + Upstream::Outlier::Result::ExtOriginRequestFailed); + owner.incResponseException(cluster); + break; + + default: + owner.incResponseInvalidType(cluster); + break; + } + onResponseComplete(); + } else if (status == ThriftFilters::ResponseStatus::Reset) { + // Note: invalid responses are not accounted in the response size histogram. + ENVOY_LOG(debug, "upstream reset"); + upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::ExtOriginRequestFailed); + resetStream(); + } + + return status; +} + +bool UpstreamRequest::handleUpstreamData(Buffer::Instance& data, bool end_stream, + RequestOwner& owner, + UpstreamResponseCallbacks& callbacks) { + ASSERT(!response_complete_); + + response_size_ += data.length(); + + if (upgrade_response_ != nullptr) { + handleUpgradeResponse(data); + } else { + const auto status = handleRegularResponse(data, owner, callbacks); + if (status != ThriftFilters::ResponseStatus::MoreData) { + return true; + } + } + + if (end_stream) { + // Response is incomplete, but no more data is coming. + ENVOY_LOG(debug, "response underflow"); + onResponseComplete(); + onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + return true; + } + + return false; +} + +void UpstreamRequest::onEvent(Network::ConnectionEvent event) { + ASSERT(!response_complete_); + + switch (event) { + case Network::ConnectionEvent::RemoteClose: + ENVOY_LOG(debug, "upstream remote close"); + onResetStream(ConnectionPool::PoolFailureReason::RemoteConnectionFailure); + break; + case Network::ConnectionEvent::LocalClose: + ENVOY_LOG(debug, "upstream local close"); + onResetStream(ConnectionPool::PoolFailureReason::LocalConnectionFailure); + break; + default: + // Connected is consumed by the connection pool. + NOT_REACHED_GCOVR_EXCL_LINE; + } + + releaseConnection(false); +} + +uint64_t UpstreamRequest::encodeAndWrite(Buffer::OwnedImpl& request_buffer) { + Buffer::OwnedImpl transport_buffer; + + metadata_->setProtocol(protocol_->type()); + transport_->encodeFrame(transport_buffer, *metadata_, request_buffer); + + uint64_t size = transport_buffer.length(); + + conn_data_->connection().write(transport_buffer, false); + onRequestComplete(); + + return size; +} + void UpstreamRequest::onRequestStart(bool continue_decoding) { auto& buffer = parent_.buffer(); parent_.initProtocolConverter(*protocol_, buffer); diff --git a/source/extensions/filters/network/thrift_proxy/router/upstream_request.h b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h index f5129ce583184..ebd93f86c2af3 100644 --- a/source/extensions/filters/network/thrift_proxy/router/upstream_request.h +++ b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h @@ -3,7 +3,9 @@ #include "envoy/common/time.h" #include "envoy/tcp/conn_pool.h" +#include "source/common/common/logger.h" #include "source/extensions/filters/network/thrift_proxy/decoder_events.h" +#include "source/extensions/filters/network/thrift_proxy/filters/filter.h" #include "source/extensions/filters/network/thrift_proxy/metadata.h" #include "source/extensions/filters/network/thrift_proxy/router/router.h" #include "source/extensions/filters/network/thrift_proxy/thrift.h" @@ -14,7 +16,18 @@ namespace NetworkFilters { namespace ThriftProxy { namespace Router { -struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { +class UpstreamResponseCallbacks { +public: + virtual ~UpstreamResponseCallbacks() = default; + + virtual void startUpstreamResponse(Transport& transport, Protocol& protocol) PURE; + virtual ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) PURE; + virtual MessageMetadataSharedPtr responseMetadata() PURE; + virtual bool responseSuccess() PURE; +}; + +struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks, + Logger::Loggable { UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, MessageMetadataSharedPtr& metadata, TransportType transport_type, ProtocolType protocol_type); @@ -31,6 +44,13 @@ struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, Upstream::HostDescriptionConstSharedPtr host) override; + bool handleUpstreamData(Buffer::Instance& data, bool end_stream, RequestOwner& owner, + UpstreamResponseCallbacks& callbacks); + void handleUpgradeResponse(Buffer::Instance& data); + ThriftFilters::ResponseStatus handleRegularResponse(Buffer::Instance& data, RequestOwner& owner, + UpstreamResponseCallbacks& callbacks); + uint64_t encodeAndWrite(Buffer::OwnedImpl& request_buffer); + void onEvent(Network::ConnectionEvent event); void onRequestStart(bool continue_decoding); void onRequestComplete(); void onResponseComplete(); @@ -56,6 +76,7 @@ struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { bool charged_response_timing_{false}; MonotonicTime downstream_request_complete_time_; + uint64_t response_size_{}; }; } // namespace Router diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index ada3531938a74..1b38ade61f648 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -203,10 +203,10 @@ class ThriftRouterTestBase { EXPECT_EQ(nullptr, router_->downstreamHeaders()); EXPECT_CALL(callbacks_, downstreamTransportType()) - .Times(2) + .Times(1) .WillRepeatedly(Return(TransportType::Framed)); EXPECT_CALL(callbacks_, downstreamProtocolType()) - .Times(2) + .Times(1) .WillRepeatedly(Return(ProtocolType::Binary)); mock_protocol_cb_ = [&](MockProtocol* protocol) -> void {