diff --git a/envoy/http/conn_pool.h b/envoy/http/conn_pool.h index 5ebbd8959c6d8..a188384bc63b5 100644 --- a/envoy/http/conn_pool.h +++ b/envoy/http/conn_pool.h @@ -43,7 +43,7 @@ class Callbacks { * @param protocol supplies the protocol associated with the stream, or absl::nullopt for raw TCP. */ virtual void onPoolReady(RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, + StreamInfo::StreamInfo& info, absl::optional protocol) PURE; }; diff --git a/envoy/router/router.h b/envoy/router/router.h index 09cc4040943ab..7af9376c75922 100644 --- a/envoy/router/router.h +++ b/envoy/router/router.h @@ -1260,7 +1260,7 @@ class GenericConnectionPoolCallbacks { virtual void onPoolReady(std::unique_ptr&& upstream, Upstream::HostDescriptionConstSharedPtr host, const Network::Address::InstanceConstSharedPtr& upstream_local_address, - const StreamInfo::StreamInfo& info, + StreamInfo::StreamInfo& info, absl::optional protocol) PURE; // @return the UpstreamToDownstream interface for this stream. diff --git a/envoy/stream_info/stream_info.h b/envoy/stream_info/stream_info.h index 20b22e15a03b3..a806f08ae71a7 100644 --- a/envoy/stream_info/stream_info.h +++ b/envoy/stream_info/stream_info.h @@ -721,6 +721,15 @@ class StreamInfo { downstream_info.setUpstreamBytesMeter(upstream_info.getUpstreamBytesMeter()); upstream_info.setDownstreamBytesMeter(downstream_info.getDownstreamBytesMeter()); } + + /** + * Filter State object to be shared between upstream and downstream filters. + * This is set for upstream connections. + * @param pointer to downstream connections filter state. + * @return pointer to filter state to be used by downstream connections. + */ + virtual const FilterStateSharedPtr& downstreamFilterState() const PURE; + virtual void setDownstreamFilterState(const FilterStateSharedPtr& filter_state) PURE; }; } // namespace StreamInfo diff --git a/source/common/http/codec_client.h b/source/common/http/codec_client.h index 093e28304402c..03dbc976cbe09 100644 --- a/source/common/http/codec_client.h +++ b/source/common/http/codec_client.h @@ -129,7 +129,7 @@ class CodecClient : protected Logger::Loggable, CodecType type() const { return type_; } // Note this is the L4 stream info, not L7. - const StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); } + StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); } protected: /** diff --git a/source/common/http/conn_pool_grid.cc b/source/common/http/conn_pool_grid.cc index 785402e81b813..10d9355580105 100644 --- a/source/common/http/conn_pool_grid.cc +++ b/source/common/http/conn_pool_grid.cc @@ -102,7 +102,7 @@ ConnectivityGrid::StreamCreationResult ConnectivityGrid::WrapperCallbacks::newSt void ConnectivityGrid::WrapperCallbacks::onConnectionAttemptReady( ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder, - Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, + Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info, absl::optional protocol) { ENVOY_LOG(trace, "{} pool successfully connected to host '{}'.", describePool(attempt->pool()), host->hostname()); @@ -137,7 +137,7 @@ void ConnectivityGrid::WrapperCallbacks::maybeMarkHttp3Broken() { void ConnectivityGrid::WrapperCallbacks::ConnectionAttemptCallbacks::onPoolReady( RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, absl::optional protocol) { + StreamInfo::StreamInfo& info, absl::optional protocol) { cancellable_ = nullptr; // Attempt succeeded and can no longer be cancelled. parent_.onConnectionAttemptReady(this, encoder, host, info, protocol); } diff --git a/source/common/http/conn_pool_grid.h b/source/common/http/conn_pool_grid.h index 0b87192852438..b283f0f2b301d 100644 --- a/source/common/http/conn_pool_grid.h +++ b/source/common/http/conn_pool_grid.h @@ -56,7 +56,7 @@ class ConnectivityGrid : public ConnectionPool::Instance, absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) override; void onPoolReady(RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, + StreamInfo::StreamInfo& info, absl::optional protocol) override; ConnectionPool::Instance& pool() { return **pool_it_; } @@ -94,7 +94,7 @@ class ConnectivityGrid : public ConnectionPool::Instance, // Called by a ConnectionAttempt when the underlying pool is ready. void onConnectionAttemptReady(ConnectionAttemptCallbacks* attempt, RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, + StreamInfo::StreamInfo& info, absl::optional protocol); private: diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 725054b32b957..588490869baea 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -412,7 +412,7 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason, void UpstreamRequest::onPoolReady( std::unique_ptr&& upstream, Upstream::HostDescriptionConstSharedPtr host, const Network::Address::InstanceConstSharedPtr& upstream_local_address, - const StreamInfo::StreamInfo& info, absl::optional protocol) { + StreamInfo::StreamInfo& info, absl::optional protocol) { // This may be called under an existing ScopeTrackerScopeState but it will unwind correctly. ScopeTrackerScopeState scope(&parent_.callbacks()->scope(), parent_.callbacks()->dispatcher()); ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks()); @@ -437,18 +437,19 @@ void UpstreamRequest::onPoolReady( StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo(); parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo()); - if (info.upstreamInfo().has_value()) { - auto& upstream_timing = info.upstreamInfo().value().get().upstreamTiming(); + if (info.upstreamInfo()) { + auto& upstream_timing = info.upstreamInfo()->upstreamTiming(); upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_; upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_; upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_; - upstream_info.setUpstreamNumStreams(info.upstreamInfo().value().get().upstreamNumStreams()); + upstream_info.setUpstreamNumStreams(info.upstreamInfo()->upstreamNumStreams()); } upstream_info.setUpstreamFilterState(std::make_shared( - info.filterState().parent()->parent(), StreamInfo::FilterState::LifeSpan::Request)); + info.filterState()->parent()->parent(), StreamInfo::FilterState::LifeSpan::Request)); upstream_info.setUpstreamLocalAddress(upstream_local_address); upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection()); + info.setDownstreamFilterState(parent_.callbacks()->streamInfo().filterState()); if (info.downstreamAddressProvider().connectionID().has_value()) { upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value()); diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 4f1064e90028a..87f6ebd95e442 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -82,8 +82,7 @@ class UpstreamRequest : public Logger::Loggable, void onPoolReady(std::unique_ptr&& upstream, Upstream::HostDescriptionConstSharedPtr host, const Network::Address::InstanceConstSharedPtr& upstream_local_address, - const StreamInfo::StreamInfo& info, - absl::optional protocol) override; + StreamInfo::StreamInfo& info, absl::optional protocol) override; UpstreamToDownstream& upstreamToDownstream() override { return *this; } void clearRequestEncoder(); diff --git a/source/common/stream_info/stream_info_impl.h b/source/common/stream_info/stream_info_impl.h index 71c2da970df53..af1f46f349823 100644 --- a/source/common/stream_info/stream_info_impl.h +++ b/source/common/stream_info/stream_info_impl.h @@ -308,6 +308,13 @@ struct StreamInfoImpl : public StreamInfo { ASSERT(downstream_bytes_meter_.get() == downstream_bytes_meter.get()); } + const FilterStateSharedPtr& downstreamFilterState() const override { + return downstream_filter_state_; + } + void setDownstreamFilterState(const FilterStateSharedPtr& filter_state) override { + downstream_filter_state_ = filter_state; + } + TimeSource& time_source_; const SystemTime start_time_; const MonotonicTime start_time_monotonic_; @@ -360,6 +367,7 @@ struct StreamInfoImpl : public StreamInfo { // Default construct the object because upstream stream is not constructed in some cases. BytesMeterSharedPtr upstream_bytes_meter_{std::make_shared()}; BytesMeterSharedPtr downstream_bytes_meter_; + FilterStateSharedPtr downstream_filter_state_; }; } // namespace StreamInfo diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index 4ce38e576ea6a..d197c21ec0552 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -470,6 +470,7 @@ void Filter::onGenericPoolReady(StreamInfo::StreamInfo* info, read_callbacks_->continueReading(); if (info) { upstream_info.setUpstreamFilterState(info->filterState()); + info->setDownstreamFilterState(getStreamInfo().filterState()); } } diff --git a/source/common/tcp_proxy/upstream.cc b/source/common/tcp_proxy/upstream.cc index 4fdb838540cda..1213d4e807afe 100644 --- a/source/common/tcp_proxy/upstream.cc +++ b/source/common/tcp_proxy/upstream.cc @@ -239,7 +239,7 @@ void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, absl: void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, absl::optional) { + StreamInfo::StreamInfo& info, absl::optional) { upstream_handle_ = nullptr; upstream_->setRequestEncoder(request_encoder, host->transportSocketFactory().implementsSecureTransport()); diff --git a/source/common/tcp_proxy/upstream.h b/source/common/tcp_proxy/upstream.h index fbae213d57fb4..30937ac9005b3 100644 --- a/source/common/tcp_proxy/upstream.h +++ b/source/common/tcp_proxy/upstream.h @@ -63,7 +63,7 @@ class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callba absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) override; void onPoolReady(Http::RequestEncoder& request_encoder, - Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, + Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info, absl::optional) override; class Callbacks { diff --git a/source/extensions/upstreams/http/http/upstream_request.cc b/source/extensions/upstreams/http/http/upstream_request.cc index 4f9c42448d8d2..9177a322cd0db 100644 --- a/source/extensions/upstreams/http/http/upstream_request.cc +++ b/source/extensions/upstreams/http/http/upstream_request.cc @@ -56,7 +56,7 @@ void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, void HttpConnPool::onPoolReady(Envoy::Http::RequestEncoder& request_encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, + StreamInfo::StreamInfo& info, absl::optional protocol) { conn_pool_stream_handle_ = nullptr; auto upstream = diff --git a/source/extensions/upstreams/http/http/upstream_request.h b/source/extensions/upstreams/http/http/upstream_request.h index 7136e709964e7..b1e82f06ca22e 100644 --- a/source/extensions/upstreams/http/http/upstream_request.h +++ b/source/extensions/upstreams/http/http/upstream_request.h @@ -40,7 +40,7 @@ class HttpConnPool : public Router::GenericConnPool, public Envoy::Http::Connect absl::string_view transport_failure_reason, Upstream::HostDescriptionConstSharedPtr host) override; void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder, - Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, + Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info, absl::optional protocol) override; Upstream::HostDescriptionConstSharedPtr host() const override { return pool_data_.value().host(); diff --git a/test/common/http/common.h b/test/common/http/common.h index 2e78fb57e84c8..d0096362505d5 100644 --- a/test/common/http/common.h +++ b/test/common/http/common.h @@ -41,7 +41,7 @@ class CodecClientForTest : public Http::CodecClient { */ struct ConnPoolCallbacks : public Http::ConnectionPool::Callbacks { void onPoolReady(Http::RequestEncoder& encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo&, absl::optional) override { + StreamInfo::StreamInfo&, absl::optional) override { outer_encoder_ = &encoder; host_ = host; pool_ready_.ready(); diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 2294d3ea39960..735d42156d6fe 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -5016,6 +5016,10 @@ TEST_F(RouterTest, PropagatesUpstreamFilterState) { upstream_stream_info_.filterState()->setData( "upstream data", std::make_unique(123), StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection); + callbacks_.streamInfo().filterState()->setData( + "downstream data", std::make_unique(456), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Connection); + expectResponseTimerCreate(); EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _)) .WillOnce(Invoke( @@ -5040,6 +5044,7 @@ TEST_F(RouterTest, PropagatesUpstreamFilterState) { EXPECT_TRUE(filter_state_verified); EXPECT_TRUE(callbacks_.streamInfo().upstreamInfo()->upstreamFilterState()->hasDataWithName( "upstream data")); + EXPECT_TRUE(upstream_stream_info_.downstreamFilterState()->hasDataWithName("downstream data")); } TEST_F(RouterTest, UpstreamSSLConnection) { diff --git a/test/common/stream_info/stream_info_impl_test.cc b/test/common/stream_info/stream_info_impl_test.cc index bc4b728d63bdc..666b748d95846 100644 --- a/test/common/stream_info/stream_info_impl_test.cc +++ b/test/common/stream_info/stream_info_impl_test.cc @@ -188,6 +188,9 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) { ->upstreamFilterState() ->getDataReadOnly("test") .access()); + stream_info.setDownstreamFilterState(stream_info.filterState()); + EXPECT_EQ( + 1, stream_info.downstreamFilterState()->getDataReadOnly("test").access()); EXPECT_EQ(absl::nullopt, stream_info.upstreamClusterInfo()); Upstream::ClusterInfoConstSharedPtr cluster_info(new NiceMock()); diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index ffb154a721956..e870a07fac1ae 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -1073,6 +1073,15 @@ TEST_F(TcpProxyTest, ShareFilterState) { ->upstreamFilterState() ->getDataReadOnly("envoy.tcp_proxy.cluster") .value()); + + filter_callbacks_.connection_.streamInfo().filterState()->setData( + "test_state", std::make_unique("test_value"), + StreamInfo::FilterState::StateType::Mutable, StreamInfo::FilterState::LifeSpan::Connection); + EXPECT_EQ("test_value", upstream_connections_.at(0) + ->streamInfo() + .downstreamFilterState() + ->getDataReadOnly("test_state") + .value()); } // Tests that filter callback can access downstream and upstream address and ssl properties. diff --git a/test/integration/upstreams/per_host_upstream_config.h b/test/integration/upstreams/per_host_upstream_config.h index e649aa087a8f6..d16fcb38120f6 100644 --- a/test/integration/upstreams/per_host_upstream_config.h +++ b/test/integration/upstreams/per_host_upstream_config.h @@ -77,7 +77,7 @@ class PerHostHttpConnPool : public Extensions::Upstreams::Http::Http::HttpConnPo : HttpConnPool(thread_local_cluster, is_connect, route_entry, downstream_protocol, ctx) {} void onPoolReady(Envoy::Http::RequestEncoder& callbacks_encoder, - Upstream::HostDescriptionConstSharedPtr host, const StreamInfo::StreamInfo& info, + Upstream::HostDescriptionConstSharedPtr host, StreamInfo::StreamInfo& info, absl::optional protocol) override { conn_pool_stream_handle_ = nullptr; auto upstream = std::make_unique(callbacks_->upstreamToDownstream(), diff --git a/test/mocks/http/conn_pool.h b/test/mocks/http/conn_pool.h index 323eb7a087ac6..d38e89f17f229 100644 --- a/test/mocks/http/conn_pool.h +++ b/test/mocks/http/conn_pool.h @@ -19,7 +19,7 @@ class MockCallbacks : public Callbacks { Upstream::HostDescriptionConstSharedPtr host)); MOCK_METHOD(void, onPoolReady, (RequestEncoder & encoder, Upstream::HostDescriptionConstSharedPtr host, - const StreamInfo::StreamInfo& info, absl::optional protocol)); + StreamInfo::StreamInfo& info, absl::optional protocol)); }; class MockInstance : public Instance { diff --git a/test/mocks/router/mocks.h b/test/mocks/router/mocks.h index 6af0430970419..78337c9a453f1 100644 --- a/test/mocks/router/mocks.h +++ b/test/mocks/router/mocks.h @@ -598,7 +598,7 @@ class MockGenericConnectionPoolCallbacks : public GenericConnectionPoolCallbacks (std::unique_ptr && upstream, Upstream::HostDescriptionConstSharedPtr host, const Network::Address::InstanceConstSharedPtr& upstream_local_address, - const StreamInfo::StreamInfo& info, absl::optional protocol)); + StreamInfo::StreamInfo& info, absl::optional protocol)); MOCK_METHOD(UpstreamToDownstream&, upstreamToDownstream, ()); NiceMock upstream_to_downstream_; diff --git a/test/mocks/stream_info/mocks.cc b/test/mocks/stream_info/mocks.cc index 528ec13a3c898..11250ea5942f6 100644 --- a/test/mocks/stream_info/mocks.cc +++ b/test/mocks/stream_info/mocks.cc @@ -117,6 +117,11 @@ MockStreamInfo::MockStreamInfo() .WillByDefault(Invoke([this](const BytesMeterSharedPtr& downstream_bytes_meter) { downstream_bytes_meter_ = downstream_bytes_meter; })); + ON_CALL(*this, downstreamFilterState()).WillByDefault(ReturnRef(downstream_filter_state_)); + ON_CALL(*this, setDownstreamFilterState(_)) + .WillByDefault(Invoke([this](const FilterStateSharedPtr& filter_state) { + downstream_filter_state_ = filter_state; + })); } MockStreamInfo::~MockStreamInfo() = default; diff --git a/test/mocks/stream_info/mocks.h b/test/mocks/stream_info/mocks.h index bd3603ea25647..fb50aee20292a 100644 --- a/test/mocks/stream_info/mocks.h +++ b/test/mocks/stream_info/mocks.h @@ -89,6 +89,8 @@ class MockStreamInfo : public StreamInfo { MOCK_METHOD(const BytesMeterSharedPtr&, getDownstreamBytesMeter, (), (const)); MOCK_METHOD(void, setUpstreamBytesMeter, (const BytesMeterSharedPtr&)); MOCK_METHOD(void, setDownstreamBytesMeter, (const BytesMeterSharedPtr&)); + MOCK_METHOD(const FilterStateSharedPtr&, downstreamFilterState, (), (const)); + MOCK_METHOD(void, setDownstreamFilterState, (const FilterStateSharedPtr&)); Envoy::Event::SimulatedTimeSystem ts_; SystemTime start_time_; MonotonicTime start_time_monotonic_; @@ -112,6 +114,7 @@ class MockStreamInfo : public StreamInfo { absl::optional attempt_count_; absl::optional virtual_cluster_name_; DownstreamTiming downstream_timing_; + FilterStateSharedPtr downstream_filter_state_; }; } // namespace StreamInfo