Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,6 @@ class UpstreamInfo {
*/
virtual Ssl::ConnectionInfoConstSharedPtr upstreamSslConnection() const PURE;

/**
* Sets the upstream timing information for this stream. This is useful for
* when multiple upstream requests are issued and we want to save timing
* information for the one that "wins".
*/
virtual void setUpstreamTiming(const UpstreamTiming& upstream_timing) PURE;

/*
* @return the upstream timing for this stream
* */
Expand Down Expand Up @@ -451,11 +444,6 @@ class StreamInfo {
*/
virtual bool intersectResponseFlags(uint64_t response_flags) const PURE;

/**
* @param host the selected upstream host for the request.
*/
virtual void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) PURE;

/**
* @param std::string name denotes the name of the route.
*/
Expand Down Expand Up @@ -518,13 +506,6 @@ class StreamInfo {
*/
virtual absl::optional<std::chrono::nanoseconds> lastDownstreamRxByteReceived() const PURE;

/**
* Sets the upstream timing information for this stream. This is useful for
* when multiple upstream requests are issued and we want to save timing
* information for the one that "wins".
*/
virtual void setUpstreamTiming(const UpstreamTiming& upstream_timing) PURE;

/**
* Sets the upstream information for this stream.
*/
Expand Down Expand Up @@ -627,13 +608,6 @@ class StreamInfo {
*/
virtual Upstream::HostDescriptionConstSharedPtr upstreamHost() const PURE;

/**
* @param upstream_local_address sets the local address of the upstream connection. Note that it
* can be different than the local address of the downstream connection.
*/
virtual void setUpstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& upstream_local_address) PURE;

/**
* @return the upstream local address.
*/
Expand All @@ -654,12 +628,6 @@ class StreamInfo {
*/
virtual const Network::ConnectionInfoProvider& downstreamAddressProvider() const PURE;

/**
* @param connection_info sets the upstream ssl connection.
*/
virtual void
setUpstreamSslConnection(const Ssl::ConnectionInfoConstSharedPtr& ssl_connection_info) PURE;

/**
* @return the upstream SSL connection. This will be nullptr if the upstream
* connection does not use SSL.
Expand Down Expand Up @@ -701,12 +669,6 @@ class StreamInfo {
* @return pointer to filter state to be used by upstream connections.
*/
virtual const FilterStateSharedPtr& upstreamFilterState() const PURE;
virtual void setUpstreamFilterState(const FilterStateSharedPtr& filter_state) PURE;

/**
* @param failure_reason the upstream transport failure reason.
*/
virtual void setUpstreamTransportFailureReason(absl::string_view failure_reason) PURE;

/**
* @return const std::string& the upstream transport failure reason, e.g. certificate validation
Expand Down Expand Up @@ -768,11 +730,6 @@ class StreamInfo {
*/
virtual const std::string& filterChainName() const PURE;

/**
* @param connection ID of the upstream connection.
*/
virtual void setUpstreamConnectionId(uint64_t id) PURE;

/**
* @return the ID of the upstream connection, or absl::nullopt if not available.
*/
Expand Down
19 changes: 2 additions & 17 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,12 +895,6 @@ void Filter::onDestroy() {
void Filter::onResponseTimeout() {
ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);

// If we had an upstream request that got a "good" response, save its
// upstream timing information into the downstream stream info.
if (final_upstream_request_) {
callbacks_->streamInfo().setUpstreamTiming(final_upstream_request_->upstreamTiming());
}

// Reset any upstream requests that are still in flight.
while (!upstream_requests_.empty()) {
UpstreamRequestPtr upstream_request =
Expand Down Expand Up @@ -1164,7 +1158,6 @@ void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
? ", transport failure reason: "
: "",
transport_failure_reason);
callbacks_->streamInfo().setUpstreamTransportFailureReason(transport_failure_reason);
const std::string& basic_details =
downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
: StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
Expand Down Expand Up @@ -1407,14 +1400,8 @@ void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPt

downstream_response_started_ = true;
final_upstream_request_ = &upstream_request;
// In upstream request hedging scenarios the upstream connection ID set in onPoolReady might not
// be the connection ID of the upstream connection that ended up receiving upstream headers. Thus
// reset the upstream connection ID here with the ID of the connection that ultimately was the
// transport for the final upstream request.
if (final_upstream_request_->streamInfo().upstreamConnectionId().has_value()) {
callbacks_->streamInfo().setUpstreamConnectionId(
final_upstream_request_->streamInfo().upstreamConnectionId().value());
}
// Make sure that for request hedging, we end up with the correct final upstream info.
callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
resetOtherUpstreams(upstream_request);
if (end_stream) {
onUpstreamComplete(upstream_request);
Expand Down Expand Up @@ -1471,8 +1458,6 @@ void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
if (!downstream_end_stream_) {
upstream_request.resetStream();
}
callbacks_->streamInfo().setUpstreamTiming(final_upstream_request_->upstreamTiming());

Event::Dispatcher& dispatcher = callbacks_->dispatcher();
std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
Expand Down
44 changes: 19 additions & 25 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attemptCount() - 1));
}
}
stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());

stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
absl::optional<Upstream::ClusterInfoConstSharedPtr> cluster_info =
Expand Down Expand Up @@ -115,7 +117,6 @@ UpstreamRequest::~UpstreamRequest() {
}
}

stream_info_.setUpstreamTiming(upstream_timing_);
stream_info_.onRequestComplete();
for (const auto& upstream_log : parent_.config().upstream_logs_) {
upstream_log->log(parent_.downstreamHeaders(), upstream_headers_.get(),
Expand Down Expand Up @@ -162,7 +163,7 @@ void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool e

// TODO(rodaine): This is actually measuring after the headers are parsed and not the first
// byte.
upstream_timing_.onFirstUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onFirstUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
maybeEndDecode(end_stream);

awaiting_headers_ = false;
Expand Down Expand Up @@ -219,15 +220,15 @@ void UpstreamRequest::decodeMetadata(Http::MetadataMapPtr&& metadata_map) {

void UpstreamRequest::maybeEndDecode(bool end_stream) {
if (end_stream) {
upstream_timing_.onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onLastUpstreamRxByteReceived(parent_.callbacks()->dispatcher().timeSource());
decode_complete_ = true;
}
}

void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) {
stream_info_.onUpstreamHostSelected(host);
StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
upstream_info.setUpstreamHost(host);
upstream_host_ = host;
parent_.callbacks()->streamInfo().onUpstreamHostSelected(host);
parent_.onUpstreamHostSelected(host);
}

Expand Down Expand Up @@ -260,7 +261,7 @@ void UpstreamRequest::encodeData(Buffer::Instance& data, bool end_stream) {
stream_info_.addBytesSent(data.length());
upstream_->encodeData(data, end_stream);
if (end_stream) {
upstream_timing_.onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
}
}
}
Expand All @@ -277,7 +278,7 @@ void UpstreamRequest::encodeTrailers(const Http::RequestTrailerMap& trailers) {

ENVOY_STREAM_LOG(trace, "proxying trailers", *parent_.callbacks());
upstream_->encodeTrailers(trailers);
upstream_timing_.onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
}
}

Expand Down Expand Up @@ -427,29 +428,22 @@ void UpstreamRequest::onPoolReady(
stream_info_.protocol(protocol.value());
}

parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
if (info.upstreamInfo().has_value()) {
auto& upstream_timing = info.upstreamInfo().value().get().upstreamTiming();
upstream_timing_.upstream_connect_start_ = upstream_timing.upstream_connect_start_;
upstream_timing_.upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
upstream_timing_.upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
upstreamTiming().upstream_connect_start_ = upstream_timing.upstream_connect_start_;
upstreamTiming().upstream_connect_complete_ = upstream_timing.upstream_connect_complete_;
upstreamTiming().upstream_handshake_complete_ = upstream_timing.upstream_handshake_complete_;
}

stream_info_.setUpstreamFilterState(std::make_shared<StreamInfo::FilterStateImpl>(
StreamInfo::UpstreamInfo& upstream_info = *stream_info_.upstreamInfo();
upstream_info.setUpstreamFilterState(std::make_shared<StreamInfo::FilterStateImpl>(
info.filterState().parent()->parent(), StreamInfo::FilterState::LifeSpan::Request));
parent_.callbacks()->streamInfo().setUpstreamFilterState(
std::make_shared<StreamInfo::FilterStateImpl>(info.filterState().parent()->parent(),
StreamInfo::FilterState::LifeSpan::Request));
stream_info_.setUpstreamLocalAddress(upstream_local_address);
parent_.callbacks()->streamInfo().setUpstreamLocalAddress(upstream_local_address);

stream_info_.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());
parent_.callbacks()->streamInfo().setUpstreamSslConnection(
info.downstreamAddressProvider().sslConnection());
upstream_info.setUpstreamLocalAddress(upstream_local_address);
upstream_info.setUpstreamSslConnection(info.downstreamAddressProvider().sslConnection());

if (info.downstreamAddressProvider().connectionID().has_value()) {
stream_info_.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
parent_.callbacks()->streamInfo().setUpstreamConnectionId(
info.downstreamAddressProvider().connectionID().value());
upstream_info.setUpstreamConnectionId(info.downstreamAddressProvider().connectionID().value());
}

stream_info_.setUpstreamBytesMeter(upstream_->bytesMeter());
Expand Down Expand Up @@ -478,7 +472,7 @@ void UpstreamRequest::onPoolReady(
span_->injectContext(*parent_.downstreamHeaders());
}

upstream_timing_.onFirstUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onFirstUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());

// Make sure that when we are forwarding CONNECT payload we do not do so until
// the upstream has accepted the CONNECT request.
Expand Down Expand Up @@ -549,7 +543,7 @@ void UpstreamRequest::encodeBodyAndTrailers() {
}

if (encode_complete_) {
upstream_timing_.onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
upstreamTiming().onLastUpstreamTxByteSent(parent_.callbacks()->dispatcher().timeSource());
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
outlier_detection_timeout_recorded_ = recorded;
}
bool outlierDetectionTimeoutRecorded() { return outlier_detection_timeout_recorded_; }
const StreamInfo::UpstreamTiming& upstreamTiming() { return upstream_timing_; }
void retried(bool value) { retried_ = value; }
bool retried() { return retried_; }
bool grpcRqSuccessDeferred() { return grpc_rq_success_deferred_; }
Expand All @@ -122,6 +121,9 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
StreamInfo::StreamInfo& streamInfo() { return stream_info_; }

private:
StreamInfo::UpstreamTiming& upstreamTiming() {
return stream_info_.upstreamInfo()->upstreamTiming();
}
bool shouldSendEndStream() {
// Only encode end stream if the full request has been received, the body
// has been sent, and any trailers or metadata have also been sent.
Expand All @@ -147,7 +149,6 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
DownstreamWatermarkManager downstream_watermark_manager_{*this};
Tracing::SpanPtr span_;
StreamInfo::StreamInfoImpl stream_info_;
StreamInfo::UpstreamTiming upstream_timing_;
const MonotonicTime start_time_;
// This is wrapped in an optional, since we want to avoid computing zero size headers when in
// reality we just didn't get a response back.
Expand Down
40 changes: 2 additions & 38 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ struct UpstreamInfoImpl : public UpstreamInfo {
Ssl::ConnectionInfoConstSharedPtr upstreamSslConnection() const override {
return upstream_ssl_info_;
}
void setUpstreamTiming(const UpstreamTiming& upstream_timing) override {
upstream_timing_ = upstream_timing;
}
UpstreamTiming& upstreamTiming() override { return upstream_timing_; }
const UpstreamTiming& upstreamTiming() const override { return upstream_timing_; }
const Network::Address::InstanceConstSharedPtr& upstreamLocalAddress() const override {
Expand Down Expand Up @@ -135,11 +132,6 @@ struct StreamInfoImpl : public StreamInfo {
}
}

void setUpstreamConnectionId(uint64_t id) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamConnectionId(id);
}

absl::optional<uint64_t> upstreamConnectionId() const override {
if (!upstream_info_) {
return absl::nullopt;
Expand All @@ -154,13 +146,10 @@ struct StreamInfoImpl : public StreamInfo {
return duration(downstream_timing_.value().lastDownstreamRxByteReceived());
}

void setUpstreamTiming(const UpstreamTiming& upstream_timing) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamTiming(upstream_timing);
}

void setUpstreamInfo(std::shared_ptr<UpstreamInfo> info) override { upstream_info_ = info; }

std::shared_ptr<UpstreamInfo> upstreamInfo() override { return upstream_info_; }

OptRef<const UpstreamInfo> upstreamInfo() const override {
if (!upstream_info_) {
return {};
Expand Down Expand Up @@ -274,11 +263,6 @@ struct StreamInfoImpl : public StreamInfo {

uint64_t responseFlags() const override { return response_flags_; }

void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamHost(host);
}

Upstream::HostDescriptionConstSharedPtr upstreamHost() const override {
if (!upstream_info_) {
return nullptr;
Expand All @@ -292,12 +276,6 @@ struct StreamInfoImpl : public StreamInfo {

const std::string& getRouteName() const override { return route_name_; }

void setUpstreamLocalAddress(
const Network::Address::InstanceConstSharedPtr& upstream_local_address) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamLocalAddress(upstream_local_address);
}

const Network::Address::InstanceConstSharedPtr& upstreamLocalAddress() const override {
if (!upstream_info_) {
return legacy_upstream_local_address_;
Expand All @@ -313,11 +291,6 @@ struct StreamInfoImpl : public StreamInfo {
return *downstream_connection_info_provider_;
}

void setUpstreamSslConnection(const Ssl::ConnectionInfoConstSharedPtr& connection_info) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamSslConnection(connection_info);
}

Ssl::ConnectionInfoConstSharedPtr upstreamSslConnection() const override {
return upstream_info_ ? upstream_info_->upstreamSslConnection() : nullptr;
}
Expand All @@ -340,15 +313,6 @@ struct StreamInfoImpl : public StreamInfo {
}
return upstream_info_->upstreamFilterState();
}
void setUpstreamFilterState(const FilterStateSharedPtr& filter_state) override {
maybeCreateUpstreamInfo();
return upstream_info_->setUpstreamFilterState(filter_state);
}

void setUpstreamTransportFailureReason(absl::string_view failure_reason) override {
maybeCreateUpstreamInfo();
upstream_info_->setUpstreamTransportFailureReason(failure_reason);
}

const std::string& upstreamTransportFailureReason() const override {
if (!upstream_info_) {
Expand Down
Loading