diff --git a/envoy/stream_info/stream_info.h b/envoy/stream_info/stream_info.h index ac6a83304cecb..93affe6d63ed5 100644 --- a/envoy/stream_info/stream_info.h +++ b/envoy/stream_info/stream_info.h @@ -582,6 +582,16 @@ class StreamInfo { */ virtual const std::string& filterChainName() const PURE; + /** + * @param connection ID of the upstream connection. + */ + virtual void setUpstreamConnectionId(uint64_t id) PURE; + + /** + * @return the ID of the upstream connection, or absl::nullopt if not available. + */ + virtual absl::optional upstreamConnectionId() const PURE; + /** * @param attempt_count, the number of times the request was attempted upstream. */ diff --git a/source/common/router/router.cc b/source/common/router/router.cc index d52dede3093f2..0f7048d820e9b 100644 --- a/source/common/router/router.cc +++ b/source/common/router/router.cc @@ -1355,6 +1355,14 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt downstream_response_started_ = true; final_upstream_request_ = &upstream_request; + // In upstream request hedging scenarios the upstream connection ID set in onPoolReady might not + // be the connection ID of the upstream connection that ended up receiving upstream headers. Thus + // reset the upstream connection ID here with the ID of the connection that ultimately was the + // transport for the final upstream request. + if (final_upstream_request_->streamInfo().upstreamConnectionId().has_value()) { + callbacks_->streamInfo().setUpstreamConnectionId( + final_upstream_request_->streamInfo().upstreamConnectionId().value()); + } resetOtherUpstreams(upstream_request); if (end_stream) { onUpstreamComplete(upstream_request); diff --git a/source/common/router/upstream_request.cc b/source/common/router/upstream_request.cc index 7fb899e4798c9..a354b6923d792 100644 --- a/source/common/router/upstream_request.cc +++ b/source/common/router/upstream_request.cc @@ -421,6 +421,12 @@ void UpstreamRequest::onPoolReady( parent_.callbacks()->streamInfo().setUpstreamSslConnection( info.downstreamAddressProvider().sslConnection()); + if (info.downstreamAddressProvider().connectionID().has_value()) { + stream_info_.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value()); + parent_.callbacks()->streamInfo().setUpstreamConnectionId( + info.downstreamAddressProvider().connectionID().value()); + } + if (parent_.downstreamEndStream()) { setupPerTryTimeout(); } else { diff --git a/source/common/router/upstream_request.h b/source/common/router/upstream_request.h index 8776089fe0434..f0b07e8bdacd5 100644 --- a/source/common/router/upstream_request.h +++ b/source/common/router/upstream_request.h @@ -119,6 +119,8 @@ class UpstreamRequest : public Logger::Loggable, } bool encodeComplete() const { return encode_complete_; } RouterFilterInterface& parent() { return parent_; } + // Exposes streamInfo for the upstream stream. + const StreamInfo::StreamInfo& streamInfo() const { return stream_info_; } private: bool shouldSendEndStream() { diff --git a/source/common/stream_info/stream_info_impl.h b/source/common/stream_info/stream_info_impl.h index 18be2ac7e8600..2024754272fd6 100644 --- a/source/common/stream_info/stream_info_impl.h +++ b/source/common/stream_info/stream_info_impl.h @@ -68,6 +68,10 @@ struct StreamInfoImpl : public StreamInfo { start_time_monotonic_); } + void setUpstreamConnectionId(uint64_t id) override { upstream_connection_id_ = id; } + + absl::optional upstreamConnectionId() const override { return upstream_connection_id_; } + absl::optional lastDownstreamRxByteReceived() const override { return duration(last_downstream_rx_byte_received); } @@ -251,10 +255,10 @@ struct StreamInfoImpl : public StreamInfo { void dumpState(std::ostream& os, int indent_level = 0) const { const char* spaces = spacesForLevel(indent_level); - os << spaces << "StreamInfoImpl " << this << DUMP_OPTIONAL_MEMBER(protocol_) - << DUMP_OPTIONAL_MEMBER(response_code_) << DUMP_OPTIONAL_MEMBER(response_code_details_) - << DUMP_OPTIONAL_MEMBER(attempt_count_) << DUMP_MEMBER(health_check_request_) - << DUMP_MEMBER(route_name_) << "\n"; + os << spaces << "StreamInfoImpl " << this << DUMP_OPTIONAL_MEMBER(upstream_connection_id_) + << DUMP_OPTIONAL_MEMBER(protocol_) << DUMP_OPTIONAL_MEMBER(response_code_) + << DUMP_OPTIONAL_MEMBER(response_code_details_) << DUMP_OPTIONAL_MEMBER(attempt_count_) + << DUMP_MEMBER(health_check_request_) << DUMP_MEMBER(route_name_) << "\n"; } void setUpstreamClusterInfo( @@ -297,6 +301,7 @@ struct StreamInfoImpl : public StreamInfo { FilterStateSharedPtr filter_state_; FilterStateSharedPtr upstream_filter_state_; std::string route_name_; + absl::optional upstream_connection_id_; absl::optional attempt_count_; private: diff --git a/test/common/router/router_test.cc b/test/common/router/router_test.cc index 0530b38fed1c9..5ab03fb26d3e4 100644 --- a/test/common/router/router_test.cc +++ b/test/common/router/router_test.cc @@ -2250,6 +2250,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) { Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder1 = &decoder; EXPECT_CALL(*router_.retry_state_, onHostAttempted(_)); + upstream_stream_info_.downstream_address_provider_->setConnectionID(111); callbacks.onPoolReady(encoder1, cm_.thread_local_cluster_.conn_pool_.host_, upstream_stream_info_, Http::Protocol::Http10); return nullptr; @@ -2288,6 +2289,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) { Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder2 = &decoder; EXPECT_CALL(*router_.retry_state_, onHostAttempted(_)); + upstream_stream_info_.downstream_address_provider_->setConnectionID(222); callbacks.onPoolReady(encoder2, cm_.thread_local_cluster_.conn_pool_.host_, upstream_stream_info_, Http::Protocol::Http10); return nullptr; @@ -2312,6 +2314,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) { Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* { response_decoder3 = &decoder; EXPECT_CALL(*router_.retry_state_, onHostAttempted(_)); + upstream_stream_info_.downstream_address_provider_->setConnectionID(333); callbacks.onPoolReady(encoder3, cm_.thread_local_cluster_.conn_pool_.host_, upstream_stream_info_, Http::Protocol::Http10); return nullptr; @@ -2345,6 +2348,8 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) { response_decoder3->decodeHeaders(std::move(response_headers2), true); EXPECT_TRUE(verifyHostUpstreamStats(1, 1)); + EXPECT_EQ(333U, callbacks_.stream_info_.upstream_connection_id_); + // TODO: Verify hedge stats here once they are implemented. } @@ -4880,6 +4885,7 @@ TEST_F(RouterTest, UpstreamSSLConnection) { ASSERT_NE(nullptr, callbacks_.streamInfo().upstreamSslConnection()); EXPECT_EQ(session_id, callbacks_.streamInfo().upstreamSslConnection()->sessionId()); + EXPECT_FALSE(callbacks_.streamInfo().upstreamConnectionId().has_value()); } // Verify that upstream timing information is set into the StreamInfo after the upstream diff --git a/test/common/stream_info/stream_info_impl_test.cc b/test/common/stream_info/stream_info_impl_test.cc index 5435588ab8fbd..cfe761f58f3a0 100644 --- a/test/common/stream_info/stream_info_impl_test.cc +++ b/test/common/stream_info/stream_info_impl_test.cc @@ -193,6 +193,11 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) { EXPECT_CALL(*ssl_info, sessionId()).WillRepeatedly(testing::ReturnRef(session_id)); stream_info.setUpstreamSslConnection(ssl_info); EXPECT_EQ(session_id, stream_info.upstreamSslConnection()->sessionId()); + + EXPECT_FALSE(stream_info.upstreamConnectionId().has_value()); + stream_info.setUpstreamConnectionId(12345); + ASSERT_TRUE(stream_info.upstreamConnectionId().has_value()); + EXPECT_EQ(12345, stream_info.upstreamConnectionId().value()); } } diff --git a/test/common/stream_info/test_util.h b/test/common/stream_info/test_util.h index 9a06acd565083..1c8632ba6f768 100644 --- a/test/common/stream_info/test_util.h +++ b/test/common/stream_info/test_util.h @@ -211,6 +211,10 @@ class TestStreamInfo : public StreamInfo::StreamInfo { const std::string& filterChainName() const override { return filter_chain_name_; } + void setUpstreamConnectionId(uint64_t id) override { upstream_connection_id_ = id; } + + absl::optional upstreamConnectionId() const override { return upstream_connection_id_; } + void setAttemptCount(uint32_t attempt_count) override { attempt_count_ = attempt_count; } absl::optional attemptCount() const override { return attempt_count_; } @@ -254,9 +258,9 @@ class TestStreamInfo : public StreamInfo::StreamInfo { Envoy::Event::SimulatedTimeSystem test_time_; absl::optional upstream_cluster_info_{}; Http::RequestIdStreamInfoProviderSharedPtr request_id_provider_; - absl::optional connection_id_; std::string filter_chain_name_; Tracing::Reason trace_reason_{Tracing::Reason::NotTraceable}; + absl::optional upstream_connection_id_; absl::optional attempt_count_; }; diff --git a/test/mocks/stream_info/mocks.cc b/test/mocks/stream_info/mocks.cc index 31118d2847322..5d6332a230862 100644 --- a/test/mocks/stream_info/mocks.cc +++ b/test/mocks/stream_info/mocks.cc @@ -120,6 +120,12 @@ MockStreamInfo::MockStreamInfo() filter_chain_name_ = std::string(filter_chain_name); })); ON_CALL(*this, filterChainName()).WillByDefault(ReturnRef(filter_chain_name_)); + ON_CALL(*this, setUpstreamConnectionId(_)).WillByDefault(Invoke([this](uint64_t id) { + upstream_connection_id_ = id; + })); + ON_CALL(*this, upstreamConnectionId()).WillByDefault(Invoke([this]() { + return upstream_connection_id_; + })); ON_CALL(*this, setAttemptCount(_)).WillByDefault(Invoke([this](uint32_t attempt_count) { attempt_count_ = attempt_count; })); diff --git a/test/mocks/stream_info/mocks.h b/test/mocks/stream_info/mocks.h index 4eaafc720fe2f..6209cff3a2424 100644 --- a/test/mocks/stream_info/mocks.h +++ b/test/mocks/stream_info/mocks.h @@ -94,6 +94,8 @@ class MockStreamInfo : public StreamInfo { MOCK_METHOD(void, setConnectionID, (uint64_t)); MOCK_METHOD(void, setFilterChainName, (const absl::string_view)); MOCK_METHOD(const std::string&, filterChainName, (), (const)); + MOCK_METHOD(void, setUpstreamConnectionId, (uint64_t)); + MOCK_METHOD(absl::optional, upstreamConnectionId, (), (const)); MOCK_METHOD(void, setAttemptCount, (uint32_t), ()); MOCK_METHOD(absl::optional, attemptCount, (), (const)); @@ -128,6 +130,7 @@ class MockStreamInfo : public StreamInfo { std::string route_name_; std::string upstream_transport_failure_reason_; std::string filter_chain_name_; + absl::optional upstream_connection_id_; absl::optional attempt_count_; };