Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,16 @@ class StreamInfo {
* @return Network filter chain name of the downstream connection.
*/
virtual const std::string& filterChainName() const PURE;

/**
* @param connection ID of the upstream connection.
*/
virtual void setUpstreamConnectionId(uint64_t id) PURE;

/**
* @return the ID of the upstream connection, or absl::nullopt if not available.
*/
virtual absl::optional<uint64_t> upstreamConnectionId() const PURE;
};

} // namespace StreamInfo
Expand Down
2 changes: 2 additions & 0 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ ClientConnectionImpl::ClientConnectionImpl(
: ConnectionImpl(dispatcher, std::make_unique<ClientSocketImpl>(remote_address, options),
std::move(transport_socket), stream_info_, false),
stream_info_(dispatcher.timeSource(), socket_->addressProviderSharedPtr()) {

stream_info_.setUpstreamConnectionId(id());
Comment thread
junr03 marked this conversation as resolved.
Outdated
// There are no meaningful socket options or source address semantics for
// non-IP sockets, so skip.
if (remote_address->ip() == nullptr) {
Expand Down
8 changes: 8 additions & 0 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,14 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt

downstream_response_started_ = true;
final_upstream_request_ = &upstream_request;
// In upstream request hedging scenarios the upstream connection ID set in onPoolReady might not
// be the connection ID of the upstream connection that ended up receiving upstream headers. Thus
// reset the upstream connection ID here with the ID of the connection that ultimately was the
// transport for the final upstream request.
if (final_upstream_request_->streamInfo().upstreamConnectionId().has_value()) {
callbacks_->streamInfo().setUpstreamConnectionId(
final_upstream_request_->streamInfo().upstreamConnectionId().value());
}
resetOtherUpstreams(upstream_request);
if (end_stream) {
onUpstreamComplete(upstream_request);
Expand Down
5 changes: 5 additions & 0 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,11 @@ void UpstreamRequest::onPoolReady(
stream_info_.setUpstreamSslConnection(info.downstreamSslConnection());
parent_.callbacks()->streamInfo().setUpstreamSslConnection(info.downstreamSslConnection());

if (info.upstreamConnectionId().has_value()) {
Comment thread
junr03 marked this conversation as resolved.
Outdated
stream_info_.setUpstreamConnectionId(info.upstreamConnectionId().value());
parent_.callbacks()->streamInfo().setUpstreamConnectionId(info.upstreamConnectionId().value());
Comment thread
alyssawilk marked this conversation as resolved.
Outdated
}

if (parent_.downstreamEndStream()) {
setupPerTryTimeout();
} else {
Expand Down
2 changes: 2 additions & 0 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
}
bool encodeComplete() const { return encode_complete_; }
RouterFilterInterface& parent() { return parent_; }
// Exposes streamInfo for the upstream stream.
const StreamInfo::StreamInfo& streamInfo() const { return stream_info_; }

private:
bool shouldSendEndStream() {
Expand Down
12 changes: 9 additions & 3 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ struct StreamInfoImpl : public StreamInfo {
start_time_monotonic_);
}

void setUpstreamConnectionId(uint64_t id) override { upstream_connection_id_ = id; }
Comment thread
junr03 marked this conversation as resolved.

absl::optional<uint64_t> upstreamConnectionId() const override { return upstream_connection_id_; }

absl::optional<std::chrono::nanoseconds> lastDownstreamRxByteReceived() const override {
return duration(last_downstream_rx_byte_received);
}
Expand Down Expand Up @@ -260,9 +264,10 @@ struct StreamInfoImpl : public StreamInfo {

void dumpState(std::ostream& os, int indent_level = 0) const {
const char* spaces = spacesForLevel(indent_level);
os << spaces << "StreamInfoImpl " << this << DUMP_OPTIONAL_MEMBER(protocol_)
<< DUMP_OPTIONAL_MEMBER(response_code_) << DUMP_OPTIONAL_MEMBER(response_code_details_)
<< DUMP_MEMBER(health_check_request_) << DUMP_MEMBER(route_name_) << "\n";
os << spaces << "StreamInfoImpl " << this << DUMP_OPTIONAL_MEMBER(upstream_connection_id_)
<< DUMP_OPTIONAL_MEMBER(protocol_) << DUMP_OPTIONAL_MEMBER(response_code_)
<< DUMP_OPTIONAL_MEMBER(response_code_details_) << DUMP_MEMBER(health_check_request_)
<< DUMP_MEMBER(route_name_) << "\n";
}

void setUpstreamClusterInfo(
Expand Down Expand Up @@ -301,6 +306,7 @@ struct StreamInfoImpl : public StreamInfo {
FilterStateSharedPtr filter_state_;
FilterStateSharedPtr upstream_filter_state_;
std::string route_name_;
absl::optional<uint64_t> upstream_connection_id_;

private:
static Network::SocketAddressProviderSharedPtr emptyDownstreamAddressProvider() {
Expand Down
6 changes: 6 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2246,6 +2246,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) {
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder1 = &decoder;
EXPECT_CALL(*router_.retry_state_, onHostAttempted(_));
upstream_stream_info_.upstream_connection_id_ = 111;
callbacks.onPoolReady(encoder1, cm_.thread_local_cluster_.conn_pool_.host_,
upstream_stream_info_, Http::Protocol::Http10);
return nullptr;
Expand Down Expand Up @@ -2284,6 +2285,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) {
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder2 = &decoder;
EXPECT_CALL(*router_.retry_state_, onHostAttempted(_));
upstream_stream_info_.upstream_connection_id_ = 222;
callbacks.onPoolReady(encoder2, cm_.thread_local_cluster_.conn_pool_.host_,
upstream_stream_info_, Http::Protocol::Http10);
return nullptr;
Expand All @@ -2308,6 +2310,7 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) {
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder3 = &decoder;
EXPECT_CALL(*router_.retry_state_, onHostAttempted(_));
upstream_stream_info_.upstream_connection_id_ = 333;
callbacks.onPoolReady(encoder3, cm_.thread_local_cluster_.conn_pool_.host_,
upstream_stream_info_, Http::Protocol::Http10);
return nullptr;
Expand Down Expand Up @@ -2341,6 +2344,8 @@ TEST_F(RouterTest, HedgedPerTryTimeoutThirdRequestSucceeds) {
response_decoder3->decodeHeaders(std::move(response_headers2), true);
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));

EXPECT_EQ(333U, callbacks_.stream_info_.upstream_connection_id_);

// TODO: Verify hedge stats here once they are implemented.
}

Expand Down Expand Up @@ -4870,6 +4875,7 @@ TEST_F(RouterTest, UpstreamSSLConnection) {

ASSERT_NE(nullptr, callbacks_.streamInfo().upstreamSslConnection());
EXPECT_EQ(session_id, callbacks_.streamInfo().upstreamSslConnection()->sessionId());
EXPECT_FALSE(callbacks_.streamInfo().upstreamConnectionId().has_value());
}

// Verify that upstream timing information is set into the StreamInfo after the upstream
Expand Down
5 changes: 5 additions & 0 deletions test/common/stream_info/stream_info_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ TEST_F(StreamInfoImplTest, MiscSettersAndGetters) {
EXPECT_CALL(*ssl_info, sessionId()).WillRepeatedly(testing::ReturnRef(session_id));
stream_info.setUpstreamSslConnection(ssl_info);
EXPECT_EQ(session_id, stream_info.upstreamSslConnection()->sessionId());

EXPECT_FALSE(stream_info.upstreamConnectionId().has_value());
stream_info.setUpstreamConnectionId(12345);
ASSERT_TRUE(stream_info.upstreamConnectionId().has_value());
EXPECT_EQ(12345, stream_info.upstreamConnectionId().value());
}
}

Expand Down
6 changes: 5 additions & 1 deletion test/common/stream_info/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ class TestStreamInfo : public StreamInfo::StreamInfo {

const std::string& filterChainName() const override { return filter_chain_name_; }

void setUpstreamConnectionId(uint64_t id) override { upstream_connection_id_ = id; }

absl::optional<uint64_t> upstreamConnectionId() const override { return upstream_connection_id_; }

Random::RandomGeneratorImpl random_;
SystemTime start_time_;
MonotonicTime start_time_monotonic_;
Expand Down Expand Up @@ -258,9 +262,9 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
Envoy::Event::SimulatedTimeSystem test_time_;
absl::optional<Upstream::ClusterInfoConstSharedPtr> upstream_cluster_info_{};
Http::RequestIdStreamInfoProviderSharedPtr request_id_provider_;
absl::optional<uint64_t> connection_id_;
std::string filter_chain_name_;
Tracing::Reason trace_reason_{Tracing::Reason::NotTraceable};
absl::optional<uint64_t> upstream_connection_id_;
};

} // namespace Envoy
6 changes: 6 additions & 0 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ MockStreamInfo::MockStreamInfo()
filter_chain_name_ = std::string(filter_chain_name);
}));
ON_CALL(*this, filterChainName()).WillByDefault(ReturnRef(filter_chain_name_));
ON_CALL(*this, setUpstreamConnectionId(_)).WillByDefault(Invoke([this](uint64_t id) {
upstream_connection_id_ = id;
}));
ON_CALL(*this, upstreamConnectionId()).WillByDefault(Invoke([this]() {
return upstream_connection_id_;
}));
}

MockStreamInfo::~MockStreamInfo() = default;
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ class MockStreamInfo : public StreamInfo {
MOCK_METHOD(void, setConnectionID, (uint64_t));
MOCK_METHOD(void, setFilterChainName, (const absl::string_view));
MOCK_METHOD(const std::string&, filterChainName, (), (const));
MOCK_METHOD(void, setUpstreamConnectionId, (uint64_t));
MOCK_METHOD(absl::optional<uint64_t>, upstreamConnectionId, (), (const));

std::shared_ptr<testing::NiceMock<Upstream::MockHostDescription>> host_{
new testing::NiceMock<Upstream::MockHostDescription>()};
Expand Down Expand Up @@ -128,6 +130,7 @@ class MockStreamInfo : public StreamInfo {
std::string route_name_;
std::string upstream_transport_failure_reason_;
std::string filter_chain_name_;
absl::optional<uint64_t> upstream_connection_id_;
};

} // namespace StreamInfo
Expand Down