diff --git a/source/extensions/filters/network/thrift_proxy/router/BUILD b/source/extensions/filters/network/thrift_proxy/router/BUILD index ff8048656105e..7eb78e9adb33d 100644 --- a/source/extensions/filters/network/thrift_proxy/router/BUILD +++ b/source/extensions/filters/network/thrift_proxy/router/BUILD @@ -45,6 +45,21 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "upstream_request_lib", + srcs = ["upstream_request.cc"], + hdrs = ["upstream_request.h"], + deps = [ + ":router_interface", + "//envoy/tcp:conn_pool_interface", + "//source/extensions/filters/network/thrift_proxy:app_exception_lib", + "//source/extensions/filters/network/thrift_proxy:conn_manager_lib", + "//source/extensions/filters/network/thrift_proxy:thrift_object_interface", + "//source/extensions/filters/network/thrift_proxy:transport_interface", + "//source/extensions/filters/network/thrift_proxy/filters:filter_interface", + ], +) + envoy_cc_library( name = "router_lib", srcs = ["router_impl.cc"], @@ -52,6 +67,7 @@ envoy_cc_library( deps = [ ":router_interface", ":router_ratelimit_lib", + ":upstream_request_lib", "//envoy/tcp:conn_pool_interface", "//envoy/upstream:cluster_manager_interface", "//envoy/upstream:load_balancer_interface", diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 77e2e18970304..f0a5ef11bb1af 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -434,191 +434,6 @@ const Network::Connection* Router::downstreamConnection() const { void Router::cleanup() { upstream_request_.reset(); } -Router::UpstreamRequest::UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, - MessageMetadataSharedPtr& metadata, - TransportType transport_type, ProtocolType protocol_type) - : parent_(parent), conn_pool_data_(pool_data), metadata_(metadata), - transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()), - protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()), - request_complete_(false), response_started_(false), response_complete_(false) {} - -Router::UpstreamRequest::~UpstreamRequest() { - if (conn_pool_handle_) { - conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); - } -} - -FilterStatus Router::UpstreamRequest::start() { - Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this); - if (handle) { - // Pause while we wait for a connection. - conn_pool_handle_ = handle; - return FilterStatus::StopIteration; - } - - if (upgrade_response_ != nullptr) { - // Pause while we wait for an upgrade response. - return FilterStatus::StopIteration; - } - - if (upstream_host_ == nullptr) { - return FilterStatus::StopIteration; - } - - return FilterStatus::Continue; -} - -void Router::UpstreamRequest::releaseConnection(const bool close) { - if (conn_pool_handle_) { - conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); - conn_pool_handle_ = nullptr; - } - - conn_state_ = nullptr; - - // The event triggered by close will also release this connection so clear conn_data_ before - // closing. - auto conn_data = std::move(conn_data_); - if (close && conn_data != nullptr) { - conn_data->connection().close(Network::ConnectionCloseType::NoFlush); - } -} - -void Router::UpstreamRequest::resetStream() { releaseConnection(true); } - -void Router::UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, - absl::string_view, - Upstream::HostDescriptionConstSharedPtr host) { - conn_pool_handle_ = nullptr; - - // Mimic an upstream reset. - onUpstreamHostSelected(host); - onResetStream(reason); -} - -void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, - Upstream::HostDescriptionConstSharedPtr host) { - // Only invoke continueDecoding if we'd previously stopped the filter chain. - bool continue_decoding = conn_pool_handle_ != nullptr; - - onUpstreamHostSelected(host); - host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess); - - conn_data_ = std::move(conn_data); - conn_data_->addUpstreamCallbacks(parent_.upstreamCallbacks()); - conn_pool_handle_ = nullptr; - - conn_state_ = conn_data_->connectionStateTyped(); - if (conn_state_ == nullptr) { - conn_data_->setConnectionState(std::make_unique()); - conn_state_ = conn_data_->connectionStateTyped(); - } - - if (protocol_->supportsUpgrade()) { - auto& buffer = parent_.buffer(); - upgrade_response_ = protocol_->attemptUpgrade(*transport_, *conn_state_, buffer); - if (upgrade_response_ != nullptr) { - parent_.addSize(buffer.length()); - conn_data_->connection().write(buffer, false); - return; - } - } - - onRequestStart(continue_decoding); -} - -void Router::UpstreamRequest::onRequestStart(bool continue_decoding) { - auto& buffer = parent_.buffer(); - parent_.initProtocolConverter(*protocol_, buffer); - - metadata_->setSequenceId(conn_state_->nextSequenceId()); - parent_.convertMessageBegin(metadata_); - - if (continue_decoding) { - parent_.continueDecoding(); - } -} - -void Router::UpstreamRequest::onRequestComplete() { - Event::Dispatcher& dispatcher = parent_.dispatcher(); - downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); - request_complete_ = true; -} - -void Router::UpstreamRequest::onResponseComplete() { - chargeResponseTiming(); - response_complete_ = true; - conn_state_ = nullptr; - conn_data_.reset(); -} - -void Router::UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { - upstream_host_ = host; -} - -void Router::UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) { - if (metadata_->messageType() == MessageType::Oneway) { - // For oneway requests, we should not attempt a response. Reset the downstream to signal - // an error. - parent_.resetDownstreamConnection(); - return; - } - - chargeResponseTiming(); - - switch (reason) { - case ConnectionPool::PoolFailureReason::Overflow: - parent_.sendLocalReply(AppException(AppExceptionType::InternalError, - "thrift upstream request: too many connections"), - true); - break; - case ConnectionPool::PoolFailureReason::LocalConnectionFailure: - upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::LocalOriginConnectFailed); - // Should only happen if we closed the connection, due to an error condition, in which case - // we've already handled any possible downstream response. - parent_.resetDownstreamConnection(); - break; - case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: - case ConnectionPool::PoolFailureReason::Timeout: - if (reason == ConnectionPool::PoolFailureReason::Timeout) { - upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginTimeout); - } else if (reason == ConnectionPool::PoolFailureReason::RemoteConnectionFailure) { - upstream_host_->outlierDetector().putResult( - Upstream::Outlier::Result::LocalOriginConnectFailed); - } - - // TODO(zuercher): distinguish between these cases where appropriate (particularly timeout) - if (!response_started_) { - parent_.sendLocalReply(AppException(AppExceptionType::InternalError, - fmt::format("connection failure '{}'", - (upstream_host_ != nullptr) - ? upstream_host_->address()->asString() - : "to upstream")), - true); - return; - } - - // Error occurred after a partial response, propagate the reset to the downstream. - parent_.resetDownstreamConnection(); - break; - default: - NOT_REACHED_GCOVR_EXCL_LINE; - } -} - -void Router::UpstreamRequest::chargeResponseTiming() { - if (charged_response_timing_ || !request_complete_) { - return; - } - charged_response_timing_ = true; - Event::Dispatcher& dispatcher = parent_.dispatcher(); - const std::chrono::milliseconds response_time = - std::chrono::duration_cast( - dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); - parent_.recordResponseDuration(response_time.count(), Stats::Histogram::Unit::Milliseconds); -} - } // namespace Router } // namespace ThriftProxy } // namespace NetworkFilters diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index 502a2659cff6b..5410028323b88 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -18,6 +18,7 @@ #include "source/extensions/filters/network/thrift_proxy/filters/filter.h" #include "source/extensions/filters/network/thrift_proxy/router/router.h" #include "source/extensions/filters/network/thrift_proxy/router/router_ratelimit_impl.h" +#include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h" #include "source/extensions/filters/network/thrift_proxy/thrift_object.h" #include "absl/types/optional.h" @@ -209,50 +210,6 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, void onBelowWriteBufferLowWatermark() override {} private: - struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { - UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, - MessageMetadataSharedPtr& metadata, TransportType transport_type, - ProtocolType protocol_type); - ~UpstreamRequest() override; - - FilterStatus start(); - void resetStream(); - void releaseConnection(bool close); - - // Tcp::ConnectionPool::Callbacks - void onPoolFailure(ConnectionPool::PoolFailureReason reason, - absl::string_view transport_failure_reason, - Upstream::HostDescriptionConstSharedPtr host) override; - void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, - Upstream::HostDescriptionConstSharedPtr host) override; - - void onRequestStart(bool continue_decoding); - void onRequestComplete(); - void onResponseComplete(); - void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); - void onResetStream(ConnectionPool::PoolFailureReason reason); - void chargeResponseTiming(); - - RequestOwner& parent_; - Upstream::TcpPoolData& conn_pool_data_; - MessageMetadataSharedPtr metadata_; - - Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; - Tcp::ConnectionPool::ConnectionDataPtr conn_data_; - Upstream::HostDescriptionConstSharedPtr upstream_host_; - ThriftConnectionState* conn_state_{}; - TransportPtr transport_; - ProtocolPtr protocol_; - ThriftObjectPtr upgrade_response_; - - bool request_complete_ : 1; - bool response_started_ : 1; - bool response_complete_ : 1; - - bool charged_response_timing_{false}; - MonotonicTime downstream_request_complete_time_; - }; - void cleanup(); ThriftFilters::DecoderFilterCallbacks* callbacks_{}; diff --git a/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc b/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc new file mode 100644 index 0000000000000..8ac684414efbe --- /dev/null +++ b/source/extensions/filters/network/thrift_proxy/router/upstream_request.cc @@ -0,0 +1,199 @@ +#include "source/extensions/filters/network/thrift_proxy/router/upstream_request.h" + +#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ThriftProxy { +namespace Router { + +UpstreamRequest::UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, + MessageMetadataSharedPtr& metadata, TransportType transport_type, + ProtocolType protocol_type) + : parent_(parent), conn_pool_data_(pool_data), metadata_(metadata), + transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()), + protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()), + request_complete_(false), response_started_(false), response_complete_(false) {} + +UpstreamRequest::~UpstreamRequest() { + if (conn_pool_handle_) { + conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + } +} + +FilterStatus UpstreamRequest::start() { + Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.newConnection(*this); + if (handle) { + // Pause while we wait for a connection. + conn_pool_handle_ = handle; + return FilterStatus::StopIteration; + } + + if (upgrade_response_ != nullptr) { + // Pause while we wait for an upgrade response. + return FilterStatus::StopIteration; + } + + if (upstream_host_ == nullptr) { + return FilterStatus::StopIteration; + } + + return FilterStatus::Continue; +} + +void UpstreamRequest::releaseConnection(const bool close) { + if (conn_pool_handle_) { + conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); + conn_pool_handle_ = nullptr; + } + + conn_state_ = nullptr; + + // The event triggered by close will also release this connection so clear conn_data_ before + // closing. + auto conn_data = std::move(conn_data_); + if (close && conn_data != nullptr) { + conn_data->connection().close(Network::ConnectionCloseType::NoFlush); + } +} + +void UpstreamRequest::resetStream() { releaseConnection(true); } + +void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, + Upstream::HostDescriptionConstSharedPtr host) { + conn_pool_handle_ = nullptr; + + // Mimic an upstream reset. + onUpstreamHostSelected(host); + onResetStream(reason); +} + +void UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, + Upstream::HostDescriptionConstSharedPtr host) { + // Only invoke continueDecoding if we'd previously stopped the filter chain. + bool continue_decoding = conn_pool_handle_ != nullptr; + + onUpstreamHostSelected(host); + host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess); + + conn_data_ = std::move(conn_data); + conn_data_->addUpstreamCallbacks(parent_.upstreamCallbacks()); + conn_pool_handle_ = nullptr; + + conn_state_ = conn_data_->connectionStateTyped(); + if (conn_state_ == nullptr) { + conn_data_->setConnectionState(std::make_unique()); + conn_state_ = conn_data_->connectionStateTyped(); + } + + if (protocol_->supportsUpgrade()) { + auto& buffer = parent_.buffer(); + upgrade_response_ = protocol_->attemptUpgrade(*transport_, *conn_state_, buffer); + if (upgrade_response_ != nullptr) { + parent_.addSize(buffer.length()); + conn_data_->connection().write(buffer, false); + return; + } + } + + onRequestStart(continue_decoding); +} + +void UpstreamRequest::onRequestStart(bool continue_decoding) { + auto& buffer = parent_.buffer(); + parent_.initProtocolConverter(*protocol_, buffer); + + metadata_->setSequenceId(conn_state_->nextSequenceId()); + parent_.convertMessageBegin(metadata_); + + if (continue_decoding) { + parent_.continueDecoding(); + } +} + +void UpstreamRequest::onRequestComplete() { + Event::Dispatcher& dispatcher = parent_.dispatcher(); + downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime(); + request_complete_ = true; +} + +void UpstreamRequest::onResponseComplete() { + chargeResponseTiming(); + response_complete_ = true; + conn_state_ = nullptr; + conn_data_.reset(); +} + +void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) { + upstream_host_ = host; +} + +void UpstreamRequest::onResetStream(ConnectionPool::PoolFailureReason reason) { + if (metadata_->messageType() == MessageType::Oneway) { + // For oneway requests, we should not attempt a response. Reset the downstream to signal + // an error. + parent_.resetDownstreamConnection(); + return; + } + + chargeResponseTiming(); + + switch (reason) { + case ConnectionPool::PoolFailureReason::Overflow: + parent_.sendLocalReply(AppException(AppExceptionType::InternalError, + "thrift upstream request: too many connections"), + true); + break; + case ConnectionPool::PoolFailureReason::LocalConnectionFailure: + upstream_host_->outlierDetector().putResult( + Upstream::Outlier::Result::LocalOriginConnectFailed); + // Should only happen if we closed the connection, due to an error condition, in which case + // we've already handled any possible downstream response. + parent_.resetDownstreamConnection(); + break; + case ConnectionPool::PoolFailureReason::RemoteConnectionFailure: + case ConnectionPool::PoolFailureReason::Timeout: + if (reason == ConnectionPool::PoolFailureReason::Timeout) { + upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginTimeout); + } else if (reason == ConnectionPool::PoolFailureReason::RemoteConnectionFailure) { + upstream_host_->outlierDetector().putResult( + Upstream::Outlier::Result::LocalOriginConnectFailed); + } + + // TODO(zuercher): distinguish between these cases where appropriate (particularly timeout) + if (!response_started_) { + parent_.sendLocalReply(AppException(AppExceptionType::InternalError, + fmt::format("connection failure '{}'", + (upstream_host_ != nullptr) + ? upstream_host_->address()->asString() + : "to upstream")), + true); + return; + } + + // Error occurred after a partial response, propagate the reset to the downstream. + parent_.resetDownstreamConnection(); + break; + default: + NOT_REACHED_GCOVR_EXCL_LINE; + } +} + +void UpstreamRequest::chargeResponseTiming() { + if (charged_response_timing_ || !request_complete_) { + return; + } + charged_response_timing_ = true; + Event::Dispatcher& dispatcher = parent_.dispatcher(); + const std::chrono::milliseconds response_time = + std::chrono::duration_cast( + dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); + parent_.recordResponseDuration(response_time.count(), Stats::Histogram::Unit::Milliseconds); +} + +} // namespace Router +} // namespace ThriftProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/network/thrift_proxy/router/upstream_request.h b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h new file mode 100644 index 0000000000000..f5129ce583184 --- /dev/null +++ b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h @@ -0,0 +1,65 @@ +#pragma once + +#include "envoy/common/time.h" +#include "envoy/tcp/conn_pool.h" + +#include "source/extensions/filters/network/thrift_proxy/decoder_events.h" +#include "source/extensions/filters/network/thrift_proxy/metadata.h" +#include "source/extensions/filters/network/thrift_proxy/router/router.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" + +namespace Envoy { +namespace Extensions { +namespace NetworkFilters { +namespace ThriftProxy { +namespace Router { + +struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks { + UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data, + MessageMetadataSharedPtr& metadata, TransportType transport_type, + ProtocolType protocol_type); + ~UpstreamRequest() override; + + FilterStatus start(); + void resetStream(); + void releaseConnection(bool close); + + // Tcp::ConnectionPool::Callbacks + void onPoolFailure(ConnectionPool::PoolFailureReason reason, + absl::string_view transport_failure_reason, + Upstream::HostDescriptionConstSharedPtr host) override; + void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn, + Upstream::HostDescriptionConstSharedPtr host) override; + + void onRequestStart(bool continue_decoding); + void onRequestComplete(); + void onResponseComplete(); + void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host); + void onResetStream(ConnectionPool::PoolFailureReason reason); + void chargeResponseTiming(); + + RequestOwner& parent_; + Upstream::TcpPoolData& conn_pool_data_; + MessageMetadataSharedPtr metadata_; + + Tcp::ConnectionPool::Cancellable* conn_pool_handle_{}; + Tcp::ConnectionPool::ConnectionDataPtr conn_data_; + Upstream::HostDescriptionConstSharedPtr upstream_host_; + ThriftConnectionState* conn_state_{}; + TransportPtr transport_; + ProtocolPtr protocol_; + ThriftObjectPtr upgrade_response_; + + bool request_complete_ : 1; + bool response_started_ : 1; + bool response_complete_ : 1; + + bool charged_response_timing_{false}; + MonotonicTime downstream_request_complete_time_; +}; + +} // namespace Router +} // namespace ThriftProxy +} // namespace NetworkFilters +} // namespace Extensions +} // namespace Envoy