-
Notifications
You must be signed in to change notification settings - Fork 5.3k
tcp: towards pluggable upstreams #13331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8d27809
557ea5f
9b0e88f
67a9dfe
b45cc60
85c47c0
2d2024b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -225,7 +225,7 @@ Filter::~Filter() { | |
| access_log->log(nullptr, nullptr, nullptr, getStreamInfo()); | ||
| } | ||
|
|
||
| ASSERT(upstream_handle_ == nullptr); | ||
| ASSERT(generic_conn_pool_ == nullptr); | ||
| ASSERT(upstream_ == nullptr); | ||
| } | ||
|
|
||
|
|
@@ -442,24 +442,17 @@ Network::FilterStatus Filter::initializeUpstreamConnection() { | |
|
|
||
| bool Filter::maybeTunnel(const std::string& cluster_name) { | ||
| if (!config_->tunnelingConfig()) { | ||
| Tcp::ConnectionPool::Instance* conn_pool = cluster_manager_.tcpConnPoolForCluster( | ||
| cluster_name, Upstream::ResourcePriority::Default, this); | ||
| if (conn_pool) { | ||
| generic_conn_pool_ = | ||
| std::make_unique<TcpConnPool>(cluster_name, cluster_manager_, this, *upstream_callbacks_); | ||
| if (generic_conn_pool_->valid()) { | ||
| connecting_ = true; | ||
| connect_attempts_++; | ||
|
|
||
| // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a | ||
| // valid connection handle. If newConnection fails inline it may result in attempting to | ||
| // select a new host, and a recursive call to initializeUpstreamConnection. In this case the | ||
| // first call to newConnection will return null and the inner call will persist. | ||
| Tcp::ConnectionPool::Cancellable* handle = conn_pool->newConnection(*this); | ||
| if (handle) { | ||
| ASSERT(upstream_handle_.get() == nullptr); | ||
| upstream_handle_ = std::make_shared<TcpConnectionHandle>(handle); | ||
| } | ||
| generic_conn_pool_->newStream(this); | ||
| // Because we never return open connections to the pool, this either has a handle waiting on | ||
| // connection completion, or onPoolFailure has been invoked. Either way, stop iteration. | ||
| return true; | ||
| } else { | ||
| generic_conn_pool_.reset(); | ||
| } | ||
| } else { | ||
| auto* cluster = cluster_manager_.get(cluster_name); | ||
|
|
@@ -474,28 +467,23 @@ bool Filter::maybeTunnel(const std::string& cluster_name) { | |
| "http2_protocol_options on the cluster."); | ||
| return false; | ||
| } | ||
| Http::ConnectionPool::Instance* conn_pool = cluster_manager_.httpConnPoolForCluster( | ||
| cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, this); | ||
| if (conn_pool) { | ||
| upstream_ = std::make_unique<HttpUpstream>(*upstream_callbacks_, | ||
| config_->tunnelingConfig()->hostname()); | ||
| HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get()); | ||
| Http::ConnectionPool::Cancellable* cancellable = | ||
| conn_pool->newStream(http_upstream->responseDecoder(), *this); | ||
| if (cancellable) { | ||
| ASSERT(upstream_handle_.get() == nullptr); | ||
| upstream_handle_ = std::make_shared<HttpConnectionHandle>(cancellable); | ||
| } | ||
|
|
||
| generic_conn_pool_ = std::make_unique<HttpConnPool>(cluster_name, cluster_manager_, this, | ||
| config_->tunnelingConfig()->hostname(), | ||
| *upstream_callbacks_); | ||
| if (generic_conn_pool_->valid()) { | ||
| generic_conn_pool_->newStream(this); | ||
| return true; | ||
| } else { | ||
| generic_conn_pool_.reset(); | ||
| } | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
| void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| upstream_handle_.reset(); | ||
|
|
||
| void Filter::onGenericPoolFailure(ConnectionPool::PoolFailureReason reason, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| generic_conn_pool_.reset(); | ||
| read_callbacks_->upstreamHost(host); | ||
| getStreamInfo().onUpstreamHostSelected(host); | ||
|
|
||
|
|
@@ -518,44 +506,22 @@ void Filter::onPoolFailure(ConnectionPool::PoolFailureReason reason, | |
| } | ||
| } | ||
|
|
||
| void Filter::onPoolReadyBase(Upstream::HostDescriptionConstSharedPtr& host, | ||
| const Network::Address::InstanceConstSharedPtr& local_address, | ||
| Ssl::ConnectionInfoConstSharedPtr ssl_info) { | ||
| upstream_handle_.reset(); | ||
| void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info, | ||
| std::unique_ptr<GenericUpstream>&& upstream, | ||
| Upstream::HostDescriptionConstSharedPtr& host, | ||
| const Network::Address::InstanceConstSharedPtr& local_address, | ||
| Ssl::ConnectionInfoConstSharedPtr ssl_info) { | ||
| upstream_ = std::move(upstream); | ||
| generic_conn_pool_.reset(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to say something about the invariants on generic_conn_pool_ and upstream_. It seems that upstream_ should be null as we enter this method and when we create a generic_conn_pool_, and generic_conn_pool_ should be null when upstream_ is not null. |
||
| read_callbacks_->upstreamHost(host); | ||
| getStreamInfo().onUpstreamHostSelected(host); | ||
| getStreamInfo().setUpstreamLocalAddress(local_address); | ||
| getStreamInfo().setUpstreamSslConnection(ssl_info); | ||
| onUpstreamConnection(); | ||
| read_callbacks_->continueReading(); | ||
| } | ||
|
|
||
| void Filter::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); | ||
|
|
||
| upstream_ = std::make_unique<TcpUpstream>(std::move(conn_data), *upstream_callbacks_); | ||
| onPoolReadyBase(host, latched_data->connection().localAddress(), | ||
| latched_data->connection().streamInfo().downstreamSslConnection()); | ||
| read_callbacks_->connection().streamInfo().setUpstreamFilterState( | ||
| latched_data->connection().streamInfo().filterState()); | ||
| } | ||
|
|
||
| void Filter::onPoolFailure(ConnectionPool::PoolFailureReason failure, absl::string_view, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| onPoolFailure(failure, host); | ||
| } | ||
|
|
||
| void Filter::onPoolReady(Http::RequestEncoder& request_encoder, | ||
| Upstream::HostDescriptionConstSharedPtr host, | ||
| const StreamInfo::StreamInfo& info) { | ||
| Http::RequestEncoder* latched_encoder = &request_encoder; | ||
| HttpUpstream* http_upstream = static_cast<HttpUpstream*>(upstream_.get()); | ||
| http_upstream->setRequestEncoder(request_encoder, | ||
| host->transportSocketFactory().implementsSecureTransport()); | ||
|
|
||
| onPoolReadyBase(host, latched_encoder->getStream().connectionLocalAddress(), | ||
| info.downstreamSslConnection()); | ||
| if (info) { | ||
| read_callbacks_->connection().streamInfo().setUpstreamFilterState(info->filterState()); | ||
| } | ||
| } | ||
|
|
||
| const Router::MetadataMatchCriteria* Filter::metadataMatchCriteria() { | ||
|
|
@@ -624,12 +590,11 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { | |
| disableIdleTimer(); | ||
| } | ||
| } | ||
| if (upstream_handle_) { | ||
| if (generic_conn_pool_) { | ||
| if (event == Network::ConnectionEvent::LocalClose || | ||
| event == Network::ConnectionEvent::RemoteClose) { | ||
| // Cancel the conn pool request and close any excess pending requests. | ||
| upstream_handle_->cancel(); | ||
| upstream_handle_.reset(); | ||
| generic_conn_pool_.reset(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,7 @@ | ||
| #include "common/tcp_proxy/upstream.h" | ||
|
|
||
| #include "envoy/upstream/cluster_manager.h" | ||
|
|
||
| #include "common/http/header_map_impl.h" | ||
| #include "common/http/headers.h" | ||
| #include "common/http/utility.h" | ||
|
|
@@ -152,5 +154,99 @@ void HttpUpstream::doneWriting() { | |
| } | ||
| } | ||
|
|
||
| TcpConnPool::TcpConnPool(const std::string& cluster_name, Upstream::ClusterManager& cluster_manager, | ||
| Upstream::LoadBalancerContext* context, | ||
| Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) | ||
| : upstream_callbacks_(upstream_callbacks) { | ||
| conn_pool_ = cluster_manager.tcpConnPoolForCluster(cluster_name, | ||
| Upstream::ResourcePriority::Default, context); | ||
| } | ||
|
|
||
| TcpConnPool::~TcpConnPool() { | ||
| if (upstream_handle_ != nullptr) { | ||
| upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess); | ||
| } | ||
| } | ||
|
|
||
| bool TcpConnPool::valid() const { return conn_pool_ != nullptr; } | ||
|
|
||
| void TcpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { | ||
| callbacks_ = callbacks; | ||
| // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a | ||
| // valid connection handle. If newConnection fails inline it may result in attempting to | ||
| // select a new host, and a recursive call to initializeUpstreamConnection. In this case the | ||
| // first call to newConnection will return null and the inner call will persist. | ||
| Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*this); | ||
| if (handle) { | ||
| ASSERT(upstream_handle_ == nullptr); | ||
| upstream_handle_ = handle; | ||
| } | ||
| } | ||
|
|
||
| void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| upstream_handle_ = nullptr; | ||
| callbacks_->onGenericPoolFailure(reason, host); | ||
| } | ||
|
|
||
| void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: missing newline above. |
||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| upstream_handle_ = nullptr; | ||
| Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); | ||
| Network::Connection& connection = conn_data->connection(); | ||
|
|
||
| auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_); | ||
| callbacks_->onGenericPoolReady(&connection.streamInfo(), std::move(upstream), host, | ||
| latched_data->connection().localAddress(), | ||
| latched_data->connection().streamInfo().downstreamSslConnection()); | ||
| } | ||
|
|
||
| HttpConnPool::HttpConnPool(const std::string& cluster_name, | ||
| Upstream::ClusterManager& cluster_manager, | ||
| Upstream::LoadBalancerContext* context, std::string hostname, | ||
| Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) | ||
| : hostname_(hostname), upstream_callbacks_(upstream_callbacks) { | ||
| conn_pool_ = cluster_manager.httpConnPoolForCluster( | ||
| cluster_name, Upstream::ResourcePriority::Default, absl::nullopt, context); | ||
| } | ||
|
|
||
| HttpConnPool::~HttpConnPool() { | ||
| if (upstream_handle_ != nullptr) { | ||
| // Because HTTP connections are generally shorter lived and have a higher probability of use | ||
| // before going idle, they are closed with Default rather than CloseExcess. | ||
| upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default); | ||
| } | ||
| } | ||
|
|
||
| bool HttpConnPool::valid() const { return conn_pool_ != nullptr; } | ||
|
|
||
| void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) { | ||
| callbacks_ = callbacks; | ||
| upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, hostname_); | ||
| Tcp::ConnectionPool::Cancellable* handle = | ||
| conn_pool_->newStream(upstream_->responseDecoder(), *this); | ||
| if (handle != nullptr) { | ||
| upstream_handle_ = handle; | ||
| } | ||
| } | ||
|
|
||
| void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl::string_view, | ||
| Upstream::HostDescriptionConstSharedPtr host) { | ||
| upstream_handle_ = nullptr; | ||
| callbacks_->onGenericPoolFailure(reason, host); | ||
| } | ||
|
|
||
| void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: missing newline above. |
||
| Upstream::HostDescriptionConstSharedPtr host, | ||
| const StreamInfo::StreamInfo& info) { | ||
| upstream_handle_ = nullptr; | ||
| Http::RequestEncoder* latched_encoder = &request_encoder; | ||
| upstream_->setRequestEncoder(request_encoder, | ||
| host->transportSocketFactory().implementsSecureTransport()); | ||
| callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, | ||
| latched_encoder->getStream().connectionLocalAddress(), | ||
| info.downstreamSslConnection()); | ||
| } | ||
|
|
||
| } // namespace TcpProxy | ||
| } // namespace Envoy | ||
Uh oh!
There was an error while loading. Please reload this page.