-
Notifications
You must be signed in to change notification settings - Fork 5.3k
router: adding CONNECT support to upstreams (not prod-ready) #10623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0cb59af
7f134d1
f1fe955
6ddcb00
21986a9
e8de80b
0729b25
500c4e5
0286031
771c1a6
b83f9bd
f56e2f0
1f2e9e5
9587888
67a753d
795231f
a847f91
ac38de1
9a82f12
c3193d2
92d833d
4434593
8363d5a
394c387
72b292a
e44bfbe
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,5 +7,5 @@ HTTP | |
| http_connection_management | ||
| http_filters | ||
| http_routing | ||
| websocket | ||
| upgrades | ||
| http_proxy | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,6 +115,16 @@ bool convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream | |
|
|
||
| constexpr uint64_t TimeoutPrecisionFactor = 100; | ||
|
|
||
| Http::ConnectionPool::Instance* | ||
| httpPool(absl::variant<Http::ConnectionPool::Instance*, Tcp::ConnectionPool::Instance*> pool) { | ||
| return absl::get<Http::ConnectionPool::Instance*>(pool); | ||
| } | ||
|
|
||
| Tcp::ConnectionPool::Instance* | ||
| tcpPool(absl::variant<Http::ConnectionPool::Instance*, Tcp::ConnectionPool::Instance*> pool) { | ||
| return absl::get<Tcp::ConnectionPool::Instance*>(pool); | ||
| } | ||
|
|
||
| const absl::string_view getPath(const Http::RequestHeaderMap& headers) { | ||
| return headers.Path() ? headers.Path()->value().getStringView() : ""; | ||
| } | ||
|
|
@@ -549,12 +559,10 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, | |
| } | ||
| } | ||
|
|
||
| Http::ConnectionPool::Instance* http_pool = getHttpConnPool(); | ||
| Upstream::HostDescriptionConstSharedPtr host; | ||
| Filter::HttpOrTcpPool conn_pool = createConnPool(host); | ||
|
|
||
| if (http_pool) { | ||
| host = http_pool->host(); | ||
| } else { | ||
| if (!host) { | ||
| sendNoHealthyUpstreamResponse(); | ||
| return Http::FilterHeadersStatus::StopIteration; | ||
| } | ||
|
|
@@ -644,8 +652,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, | |
| // Hang onto the modify_headers function for later use in handling upstream responses. | ||
| modify_headers_ = modify_headers; | ||
|
|
||
| UpstreamRequestPtr upstream_request = | ||
| std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*http_pool)); | ||
| UpstreamRequestPtr upstream_request = createUpstreamRequest(conn_pool); | ||
| upstream_request->moveIntoList(std::move(upstream_request), upstream_requests_); | ||
| upstream_requests_.front()->encodeHeaders(end_stream); | ||
| if (end_stream) { | ||
|
|
@@ -655,6 +662,38 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, | |
| return Http::FilterHeadersStatus::StopIteration; | ||
| } | ||
|
|
||
| Filter::HttpOrTcpPool Filter::createConnPool(Upstream::HostDescriptionConstSharedPtr& host) { | ||
| Filter::HttpOrTcpPool conn_pool; | ||
| bool should_tcp_proxy = route_entry_->connectConfig().has_value() && | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: const |
||
| downstream_headers_->Method()->value().getStringView() == | ||
| Http::Headers::get().MethodValues.Connect; | ||
|
|
||
| if (!should_tcp_proxy) { | ||
| conn_pool = getHttpConnPool(); | ||
| if (httpPool(conn_pool)) { | ||
| host = httpPool(conn_pool)->host(); | ||
| } | ||
| } else { | ||
| transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState( | ||
| *callbacks_->streamInfo().filterState()); | ||
| conn_pool = config_.cm_.tcpConnPoolForCluster(route_entry_->clusterName(), | ||
| Upstream::ResourcePriority::Default, this); | ||
| if (tcpPool(conn_pool)) { | ||
| host = tcpPool(conn_pool)->host(); | ||
| } | ||
| } | ||
| return conn_pool; | ||
| } | ||
|
|
||
| UpstreamRequestPtr Filter::createUpstreamRequest(Filter::HttpOrTcpPool conn_pool) { | ||
| if (absl::holds_alternative<Http::ConnectionPool::Instance*>(conn_pool)) { | ||
| return std::make_unique<UpstreamRequest>(*this, | ||
| std::make_unique<HttpConnPool>(*httpPool(conn_pool))); | ||
| } | ||
| return std::make_unique<UpstreamRequest>(*this, | ||
| std::make_unique<TcpConnPool>(tcpPool(conn_pool))); | ||
| } | ||
|
|
||
| Http::ConnectionPool::Instance* Filter::getHttpConnPool() { | ||
| // Choose protocol based on cluster configuration and downstream connection | ||
| // Note: Cluster may downgrade HTTP2 to HTTP1 based on runtime configuration. | ||
|
|
@@ -1454,19 +1493,15 @@ void Filter::doRetry() { | |
| attempt_count_++; | ||
| ASSERT(pending_retries_ > 0); | ||
| pending_retries_--; | ||
| UpstreamRequestPtr upstream_request; | ||
|
|
||
| Http::ConnectionPool::Instance* conn_pool = getHttpConnPool(); | ||
| if (conn_pool) { | ||
| upstream_request = | ||
| std::make_unique<UpstreamRequest>(*this, std::make_unique<HttpConnPool>(*conn_pool)); | ||
| } | ||
|
|
||
| if (!upstream_request) { | ||
| Upstream::HostDescriptionConstSharedPtr host; | ||
| Filter::HttpOrTcpPool conn_pool = createConnPool(host); | ||
| if (!host) { | ||
| sendNoHealthyUpstreamResponse(); | ||
| cleanup(); | ||
| return; | ||
| } | ||
| UpstreamRequestPtr upstream_request = createUpstreamRequest(conn_pool); | ||
|
|
||
| if (include_attempt_count_in_request_) { | ||
| downstream_headers_->setEnvoyAttemptCount(attempt_count_); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -202,6 +202,41 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba | |
| GenericConnectionPoolCallbacks* callbacks_{}; | ||
| }; | ||
|
|
||
| class TcpConnPool : public GenericConnPool, public Tcp::ConnectionPool::Callbacks { | ||
| public: | ||
| TcpConnPool(Tcp::ConnectionPool::Instance* conn_pool) : conn_pool_(conn_pool) {} | ||
|
|
||
| void newStream(GenericConnectionPoolCallbacks* callbacks) override { | ||
| callbacks_ = callbacks; | ||
| upstream_handle_ = conn_pool_->newConnection(*this); | ||
| } | ||
|
|
||
| bool cancelAnyPendingRequest() override { | ||
| if (upstream_handle_) { | ||
| upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); | ||
| upstream_handle_ = nullptr; | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| absl::optional<Http::Protocol> protocol() const override { return absl::nullopt; } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used for logging? Should we populate this? I guess there is the question of upstream access logging for terminated connect requests? Maybe a TODO for this? cc @kyessenov
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, it goes in the streaminfo. IMO if we're doing raw L4 proxying upstream the HTTP based protocol should not be set, since we're not doing HTTP/1 or HTTP/2. Thoughts?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend briefly chatting with @kyessenov about this. He has been trying to normalize all of this so we support other protocols including TCP, etc. so I think we should probably have some indication in the logs of what is happening here. Definitely fine to TODO/issue after you determine the best future course of action. |
||
|
|
||
| // Tcp::ConnectionPool::Callbacks | ||
| void onPoolFailure(ConnectionPool::PoolFailureReason reason, | ||
| Upstream::HostDescriptionConstSharedPtr host) override { | ||
| upstream_handle_ = nullptr; | ||
| callbacks_->onPoolFailure(reason, "", host); | ||
| } | ||
|
|
||
| void onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, | ||
| Upstream::HostDescriptionConstSharedPtr host) override; | ||
|
|
||
| private: | ||
| Tcp::ConnectionPool::Instance* conn_pool_; | ||
| Tcp::ConnectionPool::Cancellable* upstream_handle_{}; | ||
| GenericConnectionPoolCallbacks* callbacks_{}; | ||
| }; | ||
|
|
||
| // A generic API which covers common functionality between HTTP and TCP upstreams. | ||
| class GenericUpstream { | ||
| public: | ||
|
|
@@ -261,5 +296,29 @@ class HttpUpstream : public GenericUpstream, public Http::StreamCallbacks { | |
| Http::RequestEncoder* request_encoder_{}; | ||
| }; | ||
|
|
||
| class TcpUpstream : public GenericUpstream, public Tcp::ConnectionPool::UpstreamCallbacks { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which timeouts come into play with a CONNECT/WS session? I guess the downstream idle timeouts? Anything else? I think we will need to make sure that #10531 works with non-pass through CONNECT also? WDYT? cc @Shikugawa I haven't reviewed the tests yet but it would be great to make sure we have good integration coverage over all of the various timeout scenarios.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All HTTP timeouts may apply - we're 100% reusing the classes which do timeout logic.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you have to do them all I just wanted to make sure we are covered. @Shikugawa can make sure to cover CONNECT when the linked PR is finished for idle timeouts. |
||
| public: | ||
| TcpUpstream(UpstreamRequest* upstream_request, Tcp::ConnectionPool::ConnectionDataPtr&& upstream); | ||
|
|
||
| // GenericUpstream | ||
| void encodeData(Buffer::Instance& data, bool end_stream) override; | ||
| void encodeMetadata(const Http::MetadataMapVector&) override {} | ||
| void encodeHeaders(const Http::RequestHeaderMap&, bool end_stream) override; | ||
| void encodeTrailers(const Http::RequestTrailerMap&) override; | ||
| void readDisable(bool disable) override; | ||
| void resetStream() override; | ||
|
|
||
| // Tcp::ConnectionPool::UpstreamCallbacks | ||
| void onUpstreamData(Buffer::Instance& data, bool end_stream) override; | ||
| void onEvent(Network::ConnectionEvent event) override; | ||
| void onAboveWriteBufferHighWatermark() override; | ||
| void onBelowWriteBufferLowWatermark() override; | ||
|
|
||
| private: | ||
| UpstreamRequest* upstream_request_; | ||
| Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; | ||
| bool sent_headers_{}; | ||
| }; | ||
|
|
||
| } // namespace Router | ||
| } // namespace Envoy | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my own understanding, if we proxy CONNECT through, this is really no different from WebSocket, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, in that case it's just a blind upgrade where we proxy payload. Want me to comment something to that effect?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From reading this it sounds to me like if we proxy CONNECT we don't do anything special and just treat it like any other request? I think it could be clearer about what happens after the CONNECT is proxied/established.
I think it might also be useful to explain whether we support terminating the CONNECT and dropping down to L4 proxying. I don't think we do (?) but I think it's a use case people might be interested in and it might be worth calling out what our support is there if we don't already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think some clarifications would be helpful to explain the differences between WS/CONNECT and termination vs. pass through. With this change terminating WS would be pretty trivial so that could be a future enhancement for someone (might be worth tracking in an issue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I may have been steeped in the code too long but I'm not sure how I could make this more clear, but I think it's clear from Snow's comment that it's not as clear as I want it to be.
For connect we can basically treat it like normal L7 request, forwarding request headers and payload through, or we can terminate, strip connect headers, and forward the CONNECT payload on a raw TCP connection. I don't know how to explain that more clearly than I am doing so - I'll try for a little rewording in my next push but I'd totally appreciate any help making it clear what the options are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine and we can ship and iterate.