Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ Since these stats utilize the underlying cluster scope, we prefix with the ``thr
thrift.upstream_rq_size, Histogram, Request message size in bytes per upstream
thrift.upstream_resp_size, Histogram, Response message size in bytes per upstream

If the service zone is available for both the local service (via :option:`--service-zone`)
and the :ref:`upstream cluster <arch_overview_service_discovery_types_eds>`,
Envoy will track the following statistics in *cluster.<name>.zone.<from_zone>.<to_zone>.* namespace.

.. csv-table::
:header: Name, Type, Description
:widths: 1, 1, 2

thrift.upstream_resp_<\*>, Counter, "Total responses of each type (e.g., reply, success, etc.)"
thrift.upstream_rq_time, Histogram, Request time milliseconds

.. note::

The request and response size histograms include what's sent and received during protocol upgrade.
Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Removed Config or Runtime
New Features
------------
* http: added support for :ref:`retriable health check status codes <envoy_v3_api_field_config.core.v3.HealthCheck.HttpHealthCheck.retriable_statuses>`.
* thrift_proxy: add upstream response zone metrics in the form ``cluster.cluster_name.zone.local_zone.upstream_zone.thrift.upstream_resp_success``.

Deprecated
----------
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ ThriftFilters::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoT
const std::string& stat_prefix, Server::Configuration::FactoryContext& context) {
UNREFERENCED_PARAMETER(proto_config);

auto shadow_writer =
std::make_shared<ShadowWriterImpl>(context.clusterManager(), stat_prefix, context.scope(),
context.mainThreadDispatcher(), context.threadLocal());
auto shadow_writer = std::make_shared<ShadowWriterImpl>(
context.clusterManager(), stat_prefix, context.scope(), context.mainThreadDispatcher(),
context.threadLocal(), context.localInfo());

return [&context, stat_prefix,
shadow_writer](ThriftFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Router>(
context.clusterManager(), stat_prefix, context.scope(), context.runtime(), *shadow_writer));
callbacks.addDecoderFilter(std::make_shared<Router>(context.clusterManager(), stat_prefix,
context.scope(), context.runtime(),
context.localInfo(), *shadow_writer));
};
}

Expand Down
96 changes: 70 additions & 26 deletions source/extensions/filters/network/thrift_proxy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "envoy/buffer/buffer.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/tcp/conn_pool.h"

Expand Down Expand Up @@ -117,7 +118,7 @@ struct RouterStats {
class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::Id::thrift> {
public:
RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
Stats::Scope& scope, const LocalInfo::LocalInfo& local_info)
: cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
symbol_table_(scope.symbolTable()),
Expand All @@ -131,7 +132,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")) {}
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")),
zone_(stat_name_set_->add("zone")), local_zone_name_(local_info.zoneStatName()) {}
~RequestOwner() override = default;

/**
Expand Down Expand Up @@ -187,7 +189,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
* @param value uint64_t the value of the duration.
* @param unit Unit the unit of the duration.
*/
virtual void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) PURE;
virtual void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host,
uint64_t value, Stats::Histogram::Unit unit) PURE;

/**
* @return Upstream::ClusterManager& the cluster manager.
Expand All @@ -207,72 +210,78 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
/**
* Increment counter for received responses that are replies.
*/
void incResponseReply(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_});
void incResponseReply(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_);
}

/**
* Increment counter for request calls.
*/
void incRequestCall(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_call_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_call_);
}

/**
* Increment counter for requests that are one way only.
*/
void incRequestOneWay(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_oneway_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_oneway_);
}

/**
* Increment counter for requests that are invalid.
*/
void incRequestInvalid(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_invalid_type_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_invalid_type_);
}

/**
* Increment counter for received responses that are replies that are successful.
*/
void incResponseReplySuccess(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_success_});
void incResponseReplySuccess(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_success_);
}

/**
* Increment counter for received responses that are replies that are an error.
*/
void incResponseReplyError(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_error_});
void incResponseReplyError(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_error_);
}

/**
* Increment counter for received responses that are exceptions.
*/
void incResponseException(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_exception_});
void incResponseException(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_exception_);
}

/**
* Increment counter for received responses that are invalid.
*/
void incResponseInvalidType(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_invalid_type_});
void incResponseInvalidType(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_invalid_type_);
}

/**
* Record a value for the request size histogram.
*/
void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_rq_size_}, Stats::Histogram::Unit::Bytes, value);
recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes,
value);
}

/**
* Record a value for the response size histogram.
*/
void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_resp_size_}, Stats::Histogram::Unit::Bytes,
value);
recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_,
Stats::Histogram::Unit::Bytes, value);
}

/**
Expand All @@ -282,9 +291,10 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
* @param value uint64_t the value of the duration.
* @param unit Unit the unit of the duration.
*/
void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster, uint64_t value,
Stats::Histogram::Unit unit) {
recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value);
void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host,
uint64_t value, Stats::Histogram::Unit unit) {
recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_, unit, value);
}

protected:
Expand Down Expand Up @@ -369,18 +379,45 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I

private:
void incClusterScopeCounter(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
Upstream::HostDescriptionConstSharedPtr upstream_host,
const Stats::StatName& stat_name) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc();
const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
upstreamZoneStatName(upstream_host, stat_name);
if (zone_stat_name_storage) {
cluster.statsScope().counterFromStatName(Stats::StatName(zone_stat_name_storage.get())).inc();
}
}

void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names, Stats::Histogram::Unit unit,
Upstream::HostDescriptionConstSharedPtr upstream_host,
const Stats::StatName& stat_name, Stats::Histogram::Unit unit,
uint64_t value) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
cluster.statsScope()
.histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit)
.recordValue(value);
const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
upstreamZoneStatName(upstream_host, stat_name);
if (zone_stat_name_storage) {
cluster.statsScope()
.histogramFromStatName(Stats::StatName(zone_stat_name_storage.get()), unit)
.recordValue(value);
}
}

Stats::SymbolTable::StoragePtr
upstreamZoneStatName(Upstream::HostDescriptionConstSharedPtr upstream_host,
const Stats::StatName& stat_name) const {
if (!upstream_host || local_zone_name_.empty()) {
return nullptr;
}
const auto& upstream_zone_name = upstream_host->localityZoneStatName();
if (upstream_zone_name.empty()) {
return nullptr;
}
return symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name});
}

RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) {
Expand All @@ -404,6 +441,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
const Stats::StatName upstream_rq_time_;
const Stats::StatName upstream_rq_size_;
const Stats::StatName upstream_resp_size_;
const Stats::StatName zone_;
const Stats::StatName local_zone_name_;
};

/**
Expand Down Expand Up @@ -469,6 +508,11 @@ class ShadowWriter {
*/
virtual Stats::Scope& scope() PURE;

/**
* @return LocalInfo::LocalInfo& the local info used by the router.
*/
virtual const LocalInfo::LocalInfo& localInfo() const PURE;

/**
* @return Dispatcher& the dispatcher.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -218,9 +219,10 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
public ThriftFilters::DecoderFilter {
public:
Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope, Runtime::Loader& runtime, ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stat_prefix, scope), passthrough_supported_(false),
runtime_(runtime), shadow_writer_(shadow_writer) {}
Stats::Scope& scope, Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stat_prefix, scope, local_info),
passthrough_supported_(false), runtime_(runtime), shadow_writer_(shadow_writer) {}

~Router() override = default;

Expand All @@ -239,8 +241,9 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override {
callbacks_->sendLocalReply(response, end_stream);
}
void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, value, unit);
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP
ShadowRouterImpl::ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name,
MessageMetadataSharedPtr& metadata, TransportType transport_type,
ProtocolType protocol_type)
: RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope()), parent_(parent),
cluster_name_(cluster_name), metadata_(metadata->clone()), transport_type_(transport_type),
protocol_type_(protocol_type),
: RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope(),
parent.localInfo()),
parent_(parent), cluster_name_(cluster_name), metadata_(metadata->clone()),
transport_type_(transport_type), protocol_type_(protocol_type),
transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()) {
response_decoder_ = std::make_unique<NullResponseDecoder>(*transport_, *protocol_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <memory>

#include "envoy/event/dispatcher.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -142,8 +143,9 @@ class ShadowRouterImpl : public ShadowRouterHandle,
void continueDecoding() override { flushPendingCallbacks(); }
void resetDownstreamConnection() override {}
void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {}
void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, value, unit);
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
Expand Down Expand Up @@ -248,9 +250,10 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
public:
ShadowWriterImpl(Upstream::ClusterManager& cm, const std::string& stat_prefix,
Stats::Scope& scope, Event::Dispatcher& dispatcher,
ThreadLocal::SlotAllocator& tls)
: cm_(cm), stat_prefix_(stat_prefix), scope_(scope), dispatcher_(dispatcher),
stats_(generateStats(stat_prefix, scope)), tls_(tls.allocateSlot()) {
ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info)
: cm_(cm), stat_prefix_(stat_prefix), scope_(scope), local_info_(local_info),
dispatcher_(dispatcher), stats_(generateStats(stat_prefix, scope)),
tls_(tls.allocateSlot()) {

tls_->set([](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ActiveRouters>(dispatcher);
Expand All @@ -265,6 +268,7 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
Upstream::ClusterManager& clusterManager() override { return cm_; }
const std::string& statPrefix() const override { return stat_prefix_; }
Stats::Scope& scope() override { return scope_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
absl::optional<std::reference_wrapper<ShadowRouterHandle>>
submit(const std::string& cluster_name, MessageMetadataSharedPtr metadata,
Expand All @@ -282,6 +286,7 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
Upstream::ClusterManager& cm_;
const std::string stat_prefix_;
Stats::Scope& scope_;
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
ShadowWriterStats stats_;
ThreadLocal::SlotPtr tls_;
Expand Down
Loading