Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,41 @@ struct UpstreamTiming {
absl::optional<MonotonicTime> last_upstream_rx_byte_received_;
};

class DownstreamTiming {
public:
absl::optional<MonotonicTime> lastDownstreamRxByteReceived() const {
return last_downstream_rx_byte_received_;
}
absl::optional<MonotonicTime> firstDownstreamTxByteSent() const {
return first_downstream_tx_byte_sent_;
}
absl::optional<MonotonicTime> lastDownstreamTxByteSent() const {
return last_downstream_tx_byte_sent_;
}

void onLastDownstreamRxByteReceived(TimeSource& time_source) {
ASSERT(!last_downstream_rx_byte_received_);
last_downstream_rx_byte_received_ = time_source.monotonicTime();
}
void onFirstDownstreamTxByteSent(TimeSource& time_source) {
ASSERT(!first_downstream_tx_byte_sent_);
first_downstream_tx_byte_sent_ = time_source.monotonicTime();
}
void onLastDownstreamTxByteSent(TimeSource& time_source) {
ASSERT(!last_downstream_tx_byte_sent_);
last_downstream_tx_byte_sent_ = time_source.monotonicTime();
}

private:
absl::flat_hash_map<std::string, MonotonicTime> timings_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is used. Is it for a follow-on change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed, sorry I failed to clean up here, it's part of #18934

// The time when the last byte of the request was received.
absl::optional<MonotonicTime> last_downstream_rx_byte_received_;
// The time when the first byte of the response was sent downstream.
absl::optional<MonotonicTime> first_downstream_tx_byte_sent_;
// The time when the last byte of the response was sent downstream.
absl::optional<MonotonicTime> last_downstream_tx_byte_sent_;
};

// Measure the number of bytes sent and received for a stream.
struct BytesMeter {
uint64_t wireBytesSent() const { return wire_bytes_sent_; }
Expand Down Expand Up @@ -363,11 +398,6 @@ class StreamInfo {
*/
virtual absl::optional<std::chrono::nanoseconds> lastDownstreamRxByteReceived() const PURE;

/**
* Sets the time when the last byte of the request was received.
*/
virtual void onLastDownstreamRxByteReceived() PURE;

/**
* Sets the upstream timing information for this stream. This is useful for
* when multiple upstream requests are issued and we want to save timing
Expand Down Expand Up @@ -406,22 +436,12 @@ class StreamInfo {
*/
virtual absl::optional<std::chrono::nanoseconds> firstDownstreamTxByteSent() const PURE;

/**
* Sets the time when the first byte of the response is sent downstream.
*/
virtual void onFirstDownstreamTxByteSent() PURE;

/**
* @return the duration between the last byte of the response is sent downstream and the start of
* the request.
*/
virtual absl::optional<std::chrono::nanoseconds> lastDownstreamTxByteSent() const PURE;

/**
* Sets the time when the last byte of the response is sent downstream.
*/
virtual void onLastDownstreamTxByteSent() PURE;

/**
* @return the total duration of the request (i.e., when the request's ActiveStream is destroyed)
* and may be longer than lastDownstreamTxByteSent.
Expand All @@ -434,6 +454,11 @@ class StreamInfo {
*/
virtual void onRequestComplete() PURE;

/**
* @return the downstream timing information.
*/
virtual DownstreamTiming& downstreamTiming() PURE;

/**
* @param bytes_sent denotes the number of bytes to add to total sent bytes.
*/
Expand Down
3 changes: 2 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,8 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& heade
headers);

// Now actually encode via the codec.
filter_manager_.streamInfo().onFirstDownstreamTxByteSent();
filter_manager_.streamInfo().downstreamTiming().onFirstDownstreamTxByteSent(
connection_manager_.time_source_);
response_encoder_->encodeHeaders(headers, end_stream);
}

Expand Down
3 changes: 2 additions & 1 deletion source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void endStream() override {
ASSERT(!state_.codec_saw_local_complete_);
state_.codec_saw_local_complete_ = true;
filter_manager_.streamInfo().onLastDownstreamTxByteSent();
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamTxByteSent(
connection_manager_.time_source_);
request_response_timespan_->complete();
connection_manager_.doEndStream(*this);
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ void FilterManager::maybeEndDecode(bool end_stream) {
ASSERT(!state_.remote_complete_);
state_.remote_complete_ = end_stream;
if (end_stream) {
stream_info_.onLastDownstreamRxByteReceived();
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(dispatcher().timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
}
}
Expand Down
42 changes: 20 additions & 22 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@ struct StreamInfoImpl : public StreamInfo {
absl::optional<uint64_t> upstreamConnectionId() const override { return upstream_connection_id_; }

absl::optional<std::chrono::nanoseconds> lastDownstreamRxByteReceived() const override {
return duration(last_downstream_rx_byte_received);
}

void onLastDownstreamRxByteReceived() override {
ASSERT(!last_downstream_rx_byte_received);
last_downstream_rx_byte_received = time_source_.monotonicTime();
if (!downstream_timing_.has_value()) {
return absl::nullopt;
}
return duration(downstream_timing_.value().lastDownstreamRxByteReceived());
}

void setUpstreamTiming(const UpstreamTiming& upstream_timing) override {
Expand All @@ -105,21 +103,17 @@ struct StreamInfoImpl : public StreamInfo {
}

absl::optional<std::chrono::nanoseconds> firstDownstreamTxByteSent() const override {
return duration(first_downstream_tx_byte_sent_);
}

void onFirstDownstreamTxByteSent() override {
ASSERT(!first_downstream_tx_byte_sent_);
first_downstream_tx_byte_sent_ = time_source_.monotonicTime();
if (!downstream_timing_.has_value()) {
return absl::nullopt;
}
return duration(downstream_timing_.value().firstDownstreamTxByteSent());
}

absl::optional<std::chrono::nanoseconds> lastDownstreamTxByteSent() const override {
return duration(last_downstream_tx_byte_sent_);
}

void onLastDownstreamTxByteSent() override {
ASSERT(!last_downstream_tx_byte_sent_);
last_downstream_tx_byte_sent_ = time_source_.monotonicTime();
if (!downstream_timing_.has_value()) {
return absl::nullopt;
}
return duration(downstream_timing_.value().lastDownstreamTxByteSent());
}

absl::optional<std::chrono::nanoseconds> requestComplete() const override {
Expand All @@ -131,6 +125,13 @@ struct StreamInfoImpl : public StreamInfo {
final_time_ = time_source_.monotonicTime();
}

DownstreamTiming& downstreamTiming() override {
if (!downstream_timing_.has_value()) {
downstream_timing_ = DownstreamTiming();
}
return downstream_timing_.value();
}

void addBytesReceived(uint64_t bytes_received) override { bytes_received_ += bytes_received; }

uint64_t bytesReceived() const override { return bytes_received_; }
Expand Down Expand Up @@ -312,10 +313,6 @@ struct StreamInfoImpl : public StreamInfo {
TimeSource& time_source_;
const SystemTime start_time_;
const MonotonicTime start_time_monotonic_;

absl::optional<MonotonicTime> last_downstream_rx_byte_received;
absl::optional<MonotonicTime> first_downstream_tx_byte_sent_;
absl::optional<MonotonicTime> last_downstream_tx_byte_sent_;
absl::optional<MonotonicTime> final_time_;

absl::optional<Http::Protocol> protocol_;
Expand Down Expand Up @@ -360,6 +357,7 @@ struct StreamInfoImpl : public StreamInfo {
std::string requested_server_name_;
const Http::RequestHeaderMap* request_headers_{};
Http::RequestIdStreamInfoProviderSharedPtr request_id_provider_;
absl::optional<DownstreamTiming> downstream_timing_;
UpstreamTiming upstream_timing_;
std::string upstream_transport_failure_reason_;
absl::optional<Upstream::ClusterInfoConstSharedPtr> upstream_cluster_info_;
Expand Down
8 changes: 4 additions & 4 deletions test/common/stream_info/stream_info_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST_F(StreamInfoImplTest, TimingTest) {
EXPECT_GE(post_start, start) << "Start time was higher than expected";

EXPECT_FALSE(info.lastDownstreamRxByteReceived());
info.onLastDownstreamRxByteReceived();
info.downstreamTiming().onLastDownstreamRxByteReceived(test_time_.timeSystem());
std::chrono::nanoseconds dur =
checkDuration(std::chrono::nanoseconds{0}, info.lastDownstreamRxByteReceived());

Expand All @@ -72,12 +72,12 @@ TEST_F(StreamInfoImplTest, TimingTest) {
info.setUpstreamTiming(upstream_timing);
dur = checkDuration(dur, info.lastUpstreamRxByteReceived());

EXPECT_FALSE(info.firstDownstreamTxByteSent());
info.onFirstDownstreamTxByteSent();
EXPECT_FALSE(info.downstreamTiming().firstDownstreamTxByteSent());
info.downstreamTiming().onFirstDownstreamTxByteSent(test_time_.timeSystem());
dur = checkDuration(dur, info.firstDownstreamTxByteSent());

EXPECT_FALSE(info.lastDownstreamTxByteSent());
info.onLastDownstreamTxByteSent();
info.downstreamTiming().onLastDownstreamTxByteSent(test_time_.timeSystem());
dur = checkDuration(dur, info.lastDownstreamTxByteSent());

EXPECT_FALSE(info.requestComplete());
Expand Down
25 changes: 5 additions & 20 deletions test/common/stream_info/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
return duration(last_rx_byte_received_);
}

void onLastDownstreamRxByteReceived() override {
last_rx_byte_received_ = timeSystem().monotonicTime();
}

absl::optional<std::chrono::nanoseconds> firstUpstreamTxByteSent() const override {
return duration(upstream_timing_.first_upstream_tx_byte_sent_);
}
Expand All @@ -127,23 +123,17 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
}

absl::optional<std::chrono::nanoseconds> firstDownstreamTxByteSent() const override {
return duration(first_downstream_tx_byte_sent_);
}

void onFirstDownstreamTxByteSent() override {
first_downstream_tx_byte_sent_ = timeSystem().monotonicTime();
return duration(downstream_timing_.firstDownstreamTxByteSent());
}

absl::optional<std::chrono::nanoseconds> lastDownstreamTxByteSent() const override {
return duration(last_downstream_tx_byte_sent_);
}

void onLastDownstreamTxByteSent() override {
last_downstream_tx_byte_sent_ = timeSystem().monotonicTime();
return duration(downstream_timing_.lastDownstreamTxByteSent());
}

void onRequestComplete() override { end_time_ = timeSystem().monotonicTime(); }

Envoy::StreamInfo::DownstreamTiming& downstreamTiming() override { return downstream_timing_; }

void setUpstreamTiming(const Envoy::StreamInfo::UpstreamTiming& upstream_timing) override {
upstream_timing_ = upstream_timing;
}
Expand Down Expand Up @@ -241,13 +231,8 @@ class TestStreamInfo : public StreamInfo::StreamInfo {
SystemTime start_time_;
MonotonicTime start_time_monotonic_;

Envoy::StreamInfo::DownstreamTiming downstream_timing_;
absl::optional<MonotonicTime> last_rx_byte_received_;
absl::optional<MonotonicTime> first_upstream_tx_byte_sent_;
absl::optional<MonotonicTime> last_upstream_tx_byte_sent_;
absl::optional<MonotonicTime> first_upstream_rx_byte_received_;
absl::optional<MonotonicTime> last_upstream_rx_byte_received_;
absl::optional<MonotonicTime> first_downstream_tx_byte_sent_;
absl::optional<MonotonicTime> last_downstream_tx_byte_sent_;
absl::optional<MonotonicTime> end_time_;

absl::optional<Http::Protocol> protocol_{Http::Protocol::Http11};
Expand Down
1 change: 1 addition & 0 deletions test/mocks/stream_info/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ MockStreamInfo::MockStreamInfo()
std::chrono::duration_cast<std::chrono::nanoseconds>(ts_.systemTime() - start_time_)
.count());
}));
ON_CALL(*this, downstreamTiming()).WillByDefault(ReturnRef(downstream_timing_));
ON_CALL(*this, setUpstreamLocalAddress(_))
.WillByDefault(
Invoke([this](const Network::Address::InstanceConstSharedPtr& upstream_local_address) {
Expand Down
5 changes: 2 additions & 3 deletions test/mocks/stream_info/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class MockStreamInfo : public StreamInfo {
MOCK_METHOD(SystemTime, startTime, (), (const));
MOCK_METHOD(MonotonicTime, startTimeMonotonic, (), (const));
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, lastDownstreamRxByteReceived, (), (const));
MOCK_METHOD(void, onLastDownstreamRxByteReceived, ());
MOCK_METHOD(void, setUpstreamTiming, (const UpstreamTiming&));
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, firstUpstreamTxByteSent, (), (const));
MOCK_METHOD(void, onFirstUpstreamTxByteSent, ());
Expand All @@ -41,11 +40,10 @@ class MockStreamInfo : public StreamInfo {
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, lastUpstreamRxByteReceived, (), (const));
MOCK_METHOD(void, onLastUpstreamRxByteReceived, ());
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, firstDownstreamTxByteSent, (), (const));
MOCK_METHOD(void, onFirstDownstreamTxByteSent, ());
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, lastDownstreamTxByteSent, (), (const));
MOCK_METHOD(void, onLastDownstreamTxByteSent, ());
MOCK_METHOD(void, onRequestComplete, ());
MOCK_METHOD(absl::optional<std::chrono::nanoseconds>, requestComplete, (), (const));
MOCK_METHOD(DownstreamTiming&, downstreamTiming, ());
MOCK_METHOD(void, addBytesReceived, (uint64_t));
MOCK_METHOD(uint64_t, bytesReceived, (), (const));
MOCK_METHOD(void, addWireBytesReceived, (uint64_t));
Expand Down Expand Up @@ -141,6 +139,7 @@ class MockStreamInfo : public StreamInfo {
std::string filter_chain_name_;
absl::optional<uint64_t> upstream_connection_id_;
absl::optional<uint32_t> attempt_count_;
DownstreamTiming downstream_timing_;
};

} // namespace StreamInfo
Expand Down