Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,12 @@ MAKE_STATS_STRUCT(ClusterLbStats, ClusterLbStatNames, ALL_CLUSTER_LB_STATS);
*/
MAKE_STAT_NAMES_STRUCT(ClusterTrafficStatNames, ALL_CLUSTER_TRAFFIC_STATS);
MAKE_STATS_STRUCT(ClusterTrafficStats, ClusterTrafficStatNames, ALL_CLUSTER_TRAFFIC_STATS);
/*
* NOTE: LazyClusterTrafficStats for now is an alias of "std::unique_ptr<ClusterTrafficStats>",
* this is to make way for future lazy-init on trafficStats(). See
* https://github.com/envoyproxy/envoy/pull/23921#issuecomment-1335239116 for more context.
*/
using LazyClusterTrafficStats = std::unique_ptr<ClusterTrafficStats>;
Comment thread
jmarantz marked this conversation as resolved.

MAKE_STAT_NAMES_STRUCT(ClusterLoadReportStatNames, ALL_CLUSTER_LOAD_REPORT_STATS);
MAKE_STATS_STRUCT(ClusterLoadReportStats, ClusterLoadReportStatNames,
Expand Down Expand Up @@ -1060,9 +1066,9 @@ class ClusterInfo : public Http::FilterChainFactory {
virtual ClusterEndpointStats& endpointStats() const PURE;

/**
* @return ClusterTrafficStats& all traffic related stats for this cluster.
* @return all traffic related stats for this cluster.
*/
virtual ClusterTrafficStats& trafficStats() const PURE;
virtual LazyClusterTrafficStats& trafficStats() const PURE;

/**
* @return the stats scope that contains all cluster stats. This can be used to produce dynamic
Expand Down
46 changes: 24 additions & 22 deletions source/common/conn_pool/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
const bool can_create_connection = host_->canCreateConnection(priority_);

if (!can_create_connection) {
host_->cluster().trafficStats().upstream_cx_overflow_.inc();
host_->cluster().trafficStats()->upstream_cx_overflow_.inc();
}
// If we are at the connection circuit-breaker limit due to other upstreams having
// too many open connections, and this upstream has no connections, always create one, to
Expand Down Expand Up @@ -167,15 +167,16 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient&
AttachContext& context) {
ASSERT(client.readyForStream());

Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
host_->cluster().trafficStats().upstream_rq_0rtt_.inc();
traffic_stats.upstream_rq_0rtt_.inc();
}

if (enforceMaxRequests() && !host_->cluster().resourceManager(priority_).requests().canCreate()) {
ENVOY_LOG(debug, "max streams overflow");
onPoolFailure(client.real_host_description_, absl::string_view(),
ConnectionPool::PoolFailureReason::Overflow, context);
host_->cluster().trafficStats().upstream_rq_pending_overflow_.inc();
traffic_stats.upstream_rq_pending_overflow_.inc();
return;
}
ENVOY_CONN_LOG(debug, "creating stream", client);
Expand All @@ -185,7 +186,7 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient&
client.remaining_streams_--;
if (client.remaining_streams_ == 0) {
ENVOY_CONN_LOG(debug, "maximum streams per connection, start draining", client);
host_->cluster().trafficStats().upstream_cx_max_requests_.inc();
traffic_stats.upstream_cx_max_requests_.inc();
transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Draining);
} else if (capacity == 1) {
// As soon as the new stream is created, the client will be maxed out.
Expand All @@ -202,8 +203,8 @@ void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient&
num_active_streams_++;
host_->stats().rq_total_.inc();
host_->stats().rq_active_.inc();
host_->cluster().trafficStats().upstream_rq_total_.inc();
host_->cluster().trafficStats().upstream_rq_active_.inc();
traffic_stats.upstream_rq_total_.inc();
traffic_stats.upstream_rq_active_.inc();
host_->cluster().resourceManager(priority_).requests().inc();

onPoolReady(client, context);
Expand All @@ -216,7 +217,7 @@ void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& clien
state_.decrActiveStreams(1);
num_active_streams_--;
host_->stats().rq_active_.dec();
host_->cluster().trafficStats().upstream_rq_active_.dec();
host_->cluster().trafficStats()->upstream_rq_active_.dec();
host_->cluster().resourceManager(priority_).requests().dec();
// We don't update the capacity for HTTP/3 as the stream count should only
// increase when a MAX_STREAMS frame is received.
Expand Down Expand Up @@ -282,7 +283,7 @@ ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& cont
ENVOY_LOG(debug, "max pending streams overflow");
onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
context);
host_->cluster().trafficStats().upstream_rq_pending_overflow_.inc();
host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
return nullptr;
}

Expand Down Expand Up @@ -490,7 +491,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view

if (!client.hasHandshakeCompleted()) {
client.has_handshake_completed_ = true;
host_->cluster().trafficStats().upstream_cx_connect_fail_.inc();
host_->cluster().trafficStats()->upstream_cx_connect_fail_.inc();
host_->stats().cx_connect_fail_.inc();

onConnectFailed(client);
Expand Down Expand Up @@ -595,7 +596,7 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view
client.currentUnusedCapacity());
// No need to update connecting capacity and connect_timer_ as the client is still connecting.
ASSERT(client.state() == ActiveClient::State::Connecting);
host()->cluster().trafficStats().upstream_cx_connect_with_0_rtt_.inc();
host()->cluster().trafficStats()->upstream_cx_connect_with_0_rtt_.inc();
transitionActiveClientState(client, (client.currentUnusedCapacity() > 0
? ActiveClient::State::ReadyForEarlyData
: ActiveClient::State::Busy));
Expand All @@ -606,13 +607,14 @@ void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view

PendingStream::PendingStream(ConnPoolImplBase& parent, bool can_send_early_data)
: parent_(parent), can_send_early_data_(can_send_early_data) {
parent_.host()->cluster().trafficStats().upstream_rq_pending_total_.inc();
parent_.host()->cluster().trafficStats().upstream_rq_pending_active_.inc();
Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
traffic_stats.upstream_rq_pending_total_.inc();
traffic_stats.upstream_rq_pending_active_.inc();
parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
}

PendingStream::~PendingStream() {
parent_.host()->cluster().trafficStats().upstream_rq_pending_active_.dec();
parent_.host()->cluster().trafficStats()->upstream_rq_pending_active_.dec();
parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
}

Expand All @@ -630,7 +632,7 @@ void ConnPoolImplBase::purgePendingStreams(
while (!pending_streams_to_purge_.empty()) {
PendingStreamPtr stream =
pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
host_->cluster().trafficStats().upstream_rq_pending_failure_eject_.inc();
host_->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
onPoolFailure(host_description, failure_reason, reason, stream->context());
}
}
Expand Down Expand Up @@ -683,7 +685,7 @@ void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
}
}

host_->cluster().trafficStats().upstream_rq_cancelled_.inc();
host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
checkForIdleAndCloseIdleConnsIfDraining();
}

Expand Down Expand Up @@ -757,16 +759,16 @@ ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_li
concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host()->cluster().trafficStats().upstream_cx_connect_ms_,
parent_.host()->cluster().trafficStats()->upstream_cx_connect_ms_,
parent_.dispatcher().timeSource());
conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
parent_.host()->cluster().trafficStats().upstream_cx_length_ms_,
parent_.host()->cluster().trafficStats()->upstream_cx_length_ms_,
parent_.dispatcher().timeSource());
connect_timer_->enableTimer(parent_.host()->cluster().connectTimeout());
parent_.host()->stats().cx_total_.inc();
parent_.host()->stats().cx_active_.inc();
parent_.host()->cluster().trafficStats().upstream_cx_total_.inc();
parent_.host()->cluster().trafficStats().upstream_cx_active_.inc();
parent_.host()->cluster().trafficStats()->upstream_cx_total_.inc();
parent_.host()->cluster().trafficStats()->upstream_cx_active_.inc();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capture *host_->cluster().trafficStats() in a temp.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

parent_.host()->cluster().resourceManager(parent_.priority()).connections().inc();
}

Expand All @@ -778,15 +780,15 @@ void ActiveClient::releaseResourcesBase() {

conn_length_->complete();

parent_.host()->cluster().trafficStats().upstream_cx_active_.dec();
parent_.host()->cluster().trafficStats()->upstream_cx_active_.dec();
parent_.host()->stats().cx_active_.dec();
parent_.host()->cluster().resourceManager(parent_.priority()).connections().dec();
}
}

void ActiveClient::onConnectTimeout() {
ENVOY_CONN_LOG(debug, "connect timeout", *this);
parent_.host()->cluster().trafficStats().upstream_cx_connect_timeout_.inc();
parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
timed_out_ = true;
close();
}
Expand All @@ -811,7 +813,7 @@ void ActiveClient::onConnectionDurationTimeout() {
}

ENVOY_CONN_LOG(debug, "max connection duration reached, start draining", *this);
parent_.host()->cluster().trafficStats().upstream_cx_max_duration_reached_.inc();
parent_.host()->cluster().trafficStats()->upstream_cx_max_duration_reached_.inc();
parent_.transitionActiveClientState(*this, Envoy::ConnectionPool::ActiveClient::State::Draining);

// Close out the draining client if we no longer have active streams.
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/codec_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void CodecClient::onData(Buffer::Instance& data) {
if (!isPrematureResponseError(status) ||
(!active_requests_.empty() ||
getPrematureResponseHttpCode(status) != Code::RequestTimeout)) {
host_->cluster().trafficStats().upstream_cx_protocol_error_.inc();
host_->cluster().trafficStats()->upstream_cx_protocol_error_.inc();
protocol_error_ = true;
}
close();
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class CodecClient : protected Logger::Loggable<Logger::Id::client>,
}

void onIdleTimeout() {
host_->cluster().trafficStats().upstream_cx_idle_timeout_.inc();
host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
close();
}

Expand Down
8 changes: 4 additions & 4 deletions source/common/http/conn_pool_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static const uint64_t DEFAULT_MAX_STREAMS = (1 << 29);

void MultiplexedActiveClientBase::onGoAway(Http::GoAwayErrorCode) {
ENVOY_CONN_LOG(debug, "remote goaway", *codec_client_);
parent_.host()->cluster().trafficStats().upstream_cx_close_notify_.inc();
parent_.host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
if (state() != ActiveClient::State::Draining) {
if (codec_client_->numActiveRequests() == 0) {
codec_client_->close();
Expand Down Expand Up @@ -160,16 +160,16 @@ void MultiplexedActiveClientBase::onStreamReset(Http::StreamResetReason reason)
switch (reason) {
case StreamResetReason::ConnectionTermination:
case StreamResetReason::ConnectionFailure:
parent_.host()->cluster().trafficStats().upstream_rq_pending_failure_eject_.inc();
parent_.host()->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
closed_with_active_rq_ = true;
break;
case StreamResetReason::LocalReset:
case StreamResetReason::ProtocolError:
case StreamResetReason::OverloadManager:
parent_.host()->cluster().trafficStats().upstream_rq_tx_reset_.inc();
parent_.host()->cluster().trafficStats()->upstream_rq_tx_reset_.inc();
break;
case StreamResetReason::RemoteReset:
parent_.host()->cluster().trafficStats().upstream_rq_rx_reset_.inc();
parent_.host()->cluster().trafficStats()->upstream_rq_rx_reset_.inc();
break;
case StreamResetReason::LocalRefusedStreamReset:
case StreamResetReason::RemoteRefusedStreamReset:
Expand Down
9 changes: 4 additions & 5 deletions source/common/http/conn_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
real_host_description_ = data.host_description_;
codec_client_ = parent.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);
Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
codec_client_->setConnectionStats(
{parent_.host()->cluster().trafficStats().upstream_cx_rx_bytes_total_,
parent_.host()->cluster().trafficStats().upstream_cx_rx_bytes_buffered_,
parent_.host()->cluster().trafficStats().upstream_cx_tx_bytes_total_,
parent_.host()->cluster().trafficStats().upstream_cx_tx_bytes_buffered_,
&parent_.host()->cluster().trafficStats().bind_errors_, nullptr});
{traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
&traffic_stats.bind_errors_, nullptr});
}

absl::optional<Http::Protocol> protocol() const override { return codec_client_->protocol(); }
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/conn_pool_grid.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ HttpServerPropertiesCache::Http3StatusTracker& ConnectivityGrid::getHttp3StatusT
bool ConnectivityGrid::isHttp3Broken() const { return getHttp3StatusTracker().isHttp3Broken(); }

void ConnectivityGrid::markHttp3Broken() {
host_->cluster().trafficStats().upstream_http3_broken_.inc();
host_->cluster().trafficStats()->upstream_http3_broken_.inc();
getHttp3StatusTracker().markHttp3Broken();
}

Expand Down
4 changes: 2 additions & 2 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void ActiveClient::StreamWrapper::decodeHeaders(ResponseHeaderMapPtr&& headers,
close_connection_ =
HeaderUtility::shouldCloseConnection(parent_.codec_client_->protocol(), *headers);
if (close_connection_) {
parent_.parent().host()->cluster().trafficStats().upstream_cx_close_notify_.inc();
parent_.parent().host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
}
ResponseDecoderWrapper::decodeHeaders(std::move(headers), end_stream);
}
Expand Down Expand Up @@ -76,7 +76,7 @@ ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
: Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
/* effective_concurrent_stream_limit */ 1,
/* configured_concurrent_stream_limit */ 1, data) {
parent.host()->cluster().trafficStats().upstream_cx_http1_total_.inc();
parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc();
}

ActiveClient::~ActiveClient() { ASSERT(!stream_wrapper_.get()); }
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
: MultiplexedActiveClientBase(
parent, calculateInitialStreamsLimit(parent.cache(), parent.origin(), parent.host()),
parent.host()->cluster().http2Options().max_concurrent_streams().value(),
parent.host()->cluster().trafficStats().upstream_cx_http2_total_, data) {}
parent.host()->cluster().trafficStats()->upstream_cx_http2_total_, data) {}

ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http3/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ActiveClient::ActiveClient(Envoy::Http::HttpConnPoolImplBase& parent,
Upstream::Host::CreateConnectionData& data)
: MultiplexedActiveClientBase(
parent, getMaxStreams(parent.host()->cluster()), getMaxStreams(parent.host()->cluster()),
parent.host()->cluster().trafficStats().upstream_cx_http3_total_, data),
parent.host()->cluster().trafficStats()->upstream_cx_http3_total_, data),
async_connect_callback_(parent_.dispatcher().createSchedulableCallback([this]() {
if (state() != Envoy::ConnectionPool::ActiveClient::State::Connecting) {
return;
Expand Down
12 changes: 6 additions & 6 deletions source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ void RetryStateImpl::enableBackoffTimer() {
// be reused.
ratelimited_backoff_strategy_.reset();

cluster_.trafficStats().upstream_rq_retry_backoff_ratelimited_.inc();
cluster_.trafficStats()->upstream_rq_retry_backoff_ratelimited_.inc();

} else {
// Otherwise we use a fully jittered exponential backoff algorithm.
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));

cluster_.trafficStats().upstream_rq_retry_backoff_exponential_.inc();
cluster_.trafficStats()->upstream_rq_retry_backoff_exponential_.inc();
}
}

Expand Down Expand Up @@ -277,7 +277,7 @@ RetryStatus RetryStateImpl::shouldRetry(RetryDecision would_retry, DoRetryCallba
// retry this particular request, we can infer that we did a retry earlier
// and it was successful.
if ((backoff_callback_ || next_loop_callback_) && would_retry == RetryDecision::NoRetry) {
cluster_.trafficStats().upstream_rq_retry_success_.inc();
cluster_.trafficStats()->upstream_rq_retry_success_.inc();
if (vcluster_) {
vcluster_->stats().upstream_rq_retry_success_.inc();
}
Expand All @@ -295,7 +295,7 @@ RetryStatus RetryStateImpl::shouldRetry(RetryDecision would_retry, DoRetryCallba
// The request has exhausted the number of retries allotted to it by the retry policy configured
// (or the x-envoy-max-retries header).
if (retries_remaining_ == 0) {
cluster_.trafficStats().upstream_rq_retry_limit_exceeded_.inc();
cluster_.trafficStats()->upstream_rq_retry_limit_exceeded_.inc();
if (vcluster_) {
vcluster_->stats().upstream_rq_retry_limit_exceeded_.inc();
}
Expand All @@ -308,7 +308,7 @@ RetryStatus RetryStateImpl::shouldRetry(RetryDecision would_retry, DoRetryCallba
retries_remaining_--;

if (!cluster_.resourceManager(priority_).retries().canCreate()) {
cluster_.trafficStats().upstream_rq_retry_overflow_.inc();
cluster_.trafficStats()->upstream_rq_retry_overflow_.inc();
if (vcluster_) {
vcluster_->stats().upstream_rq_retry_overflow_.inc();
}
Expand All @@ -324,7 +324,7 @@ RetryStatus RetryStateImpl::shouldRetry(RetryDecision would_retry, DoRetryCallba

ASSERT(!backoff_callback_ && !next_loop_callback_);
cluster_.resourceManager(priority_).retries().inc();
cluster_.trafficStats().upstream_rq_retry_.inc();
cluster_.trafficStats()->upstream_rq_retry_.inc();
if (vcluster_) {
vcluster_->stats().upstream_rq_retry_.inc();
}
Expand Down
Loading