From 57dc48949f890c6cb6075fc6c4d4cacb436a47e3 Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 12 Oct 2021 09:09:29 -0700 Subject: [PATCH 1/2] Refactor thrift router stats handling to create named stats and stats names once Signed-off-by: James Fish --- .../network/thrift_proxy/router/config.cc | 12 +- .../network/thrift_proxy/router/router.h | 378 +++++++++--------- .../thrift_proxy/router/router_impl.cc | 4 +- .../network/thrift_proxy/router/router_impl.h | 16 +- .../thrift_proxy/router/shadow_writer_impl.cc | 11 +- .../thrift_proxy/router/shadow_writer_impl.h | 38 +- .../thrift_proxy/router/upstream_request.cc | 17 +- .../thrift_proxy/router/upstream_request.h | 1 + .../filters/network/thrift_proxy/mocks.h | 4 - .../network/thrift_proxy/router_test.cc | 16 +- .../thrift_proxy/shadow_writer_test.cc | 6 +- 11 files changed, 237 insertions(+), 266 deletions(-) diff --git a/source/extensions/filters/network/thrift_proxy/router/config.cc b/source/extensions/filters/network/thrift_proxy/router/config.cc index 99567e633c28f..bd93949ed7470 100644 --- a/source/extensions/filters/network/thrift_proxy/router/config.cc +++ b/source/extensions/filters/network/thrift_proxy/router/config.cc @@ -18,15 +18,15 @@ ThriftFilters::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoT const std::string& stat_prefix, Server::Configuration::FactoryContext& context) { UNREFERENCED_PARAMETER(proto_config); + auto stats = + std::make_shared(stat_prefix, context.scope(), context.localInfo()); auto shadow_writer = std::make_shared( - context.clusterManager(), stat_prefix, context.scope(), context.mainThreadDispatcher(), - context.threadLocal(), context.localInfo()); + context.clusterManager(), *stats, context.mainThreadDispatcher(), context.threadLocal()); - return [&context, stat_prefix, + return [&context, stats, shadow_writer](ThriftFilters::FilterChainFactoryCallbacks& callbacks) -> void { - callbacks.addDecoderFilter(std::make_shared(context.clusterManager(), stat_prefix, - context.scope(), context.runtime(), - context.localInfo(), *shadow_writer)); + callbacks.addDecoderFilter(std::make_shared(context.clusterManager(), *stats, + context.runtime(), *shadow_writer)); }; } diff --git a/source/extensions/filters/network/thrift_proxy/router/router.h b/source/extensions/filters/network/thrift_proxy/router/router.h index 6f54f2e5687b5..e1e34622827ca 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router.h +++ b/source/extensions/filters/network/thrift_proxy/router/router.h @@ -106,20 +106,30 @@ using ConfigConstSharedPtr = std::shared_ptr; COUNTER(route_missing) \ COUNTER(unknown_cluster) \ COUNTER(upstream_rq_maintenance_mode) \ - COUNTER(no_healthy_upstream) + COUNTER(no_healthy_upstream) \ + COUNTER(shadow_request_submit_failure) -struct RouterStats { +/** + * Struct containing named stats for the router. + */ +struct RouterNamedStats { ALL_THRIFT_ROUTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT) + + static RouterNamedStats generateStats(const std::string& prefix, Stats::Scope& scope) { + return RouterNamedStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), + POOL_GAUGE_PREFIX(scope, prefix), + POOL_HISTOGRAM_PREFIX(scope, prefix))}; + } }; /** - * This interface is used by an upstream request to communicate its state. + * Stats for use in the router. */ -class RequestOwner : public ProtocolConverter, public Logger::Loggable { +class RouterStats { public: - RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, - Stats::Scope& scope, const LocalInfo::LocalInfo& local_info) - : cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)), + RouterStats(const std::string& stat_prefix, Stats::Scope& scope, + const LocalInfo::LocalInfo& local_info) + : named_(RouterNamedStats::generateStats(stat_prefix, scope)), stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")), symbol_table_(scope.symbolTable()), upstream_rq_call_(stat_name_set_->add("thrift.upstream_rq_call")), @@ -134,169 +144,247 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggableadd("thrift.upstream_rq_size")), upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")), zone_(stat_name_set_->add("zone")), local_zone_name_(local_info.zoneStatName()) {} - ~RequestOwner() override = default; - - /** - * @return ConnectionPool::UpstreamCallbacks& the handler for upstream data. - */ - virtual Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() PURE; - - /** - * @return Buffer::OwnedImpl& the buffer used to serialize the upstream request. - */ - virtual Buffer::OwnedImpl& buffer() PURE; - - /** - * @return Event::Dispatcher& the dispatcher used for timers, etc. - */ - virtual Event::Dispatcher& dispatcher() PURE; - - /** - * Converts message begin into the right protocol. - */ - void convertMessageBegin(MessageMetadataSharedPtr metadata) { - ProtocolConverter::messageBegin(metadata); - } - - /** - * Used to update the request size every time bytes are pushed out. - * - * @param size uint64_t the value of the increment. - */ - virtual void addSize(uint64_t size) PURE; - - /** - * Used to continue decoding if it was previously stopped. - */ - virtual void continueDecoding() PURE; - - /** - * Used to reset the downstream connection after an error. - */ - virtual void resetDownstreamConnection() PURE; - - /** - * Sends a locally generated response using the provided response object. - * - * @param response DirectResponse the response to send to the downstream client - * @param end_stream if true, the downstream connection should be closed after this response - */ - virtual void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) PURE; - - /** - * Records the duration of the request. - * - * @param value uint64_t the value of the duration. - * @param unit Unit the unit of the duration. - */ - virtual void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, - uint64_t value, Stats::Histogram::Unit unit) PURE; - - /** - * @return Upstream::ClusterManager& the cluster manager. - */ - Upstream::ClusterManager& clusterManager() { return cluster_manager_; } - - /** - * @return Upstream::Cluster& the upstream cluster associated with the request. - */ - const Upstream::ClusterInfo& cluster() const { return *cluster_; } - - /** - * Common stats. - */ - RouterStats& stats() { return stats_; } /** * Increment counter for received responses that are replies. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host */ void incResponseReply(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host) { + Upstream::HostDescriptionConstSharedPtr upstream_host) const { incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_); } /** * Increment counter for request calls. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster */ - void incRequestCall(const Upstream::ClusterInfo& cluster) { + void incRequestCall(const Upstream::ClusterInfo& cluster) const { incClusterScopeCounter(cluster, nullptr, upstream_rq_call_); } /** * Increment counter for requests that are one way only. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster */ - void incRequestOneWay(const Upstream::ClusterInfo& cluster) { + void incRequestOneWay(const Upstream::ClusterInfo& cluster) const { incClusterScopeCounter(cluster, nullptr, upstream_rq_oneway_); } /** * Increment counter for requests that are invalid. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster */ - void incRequestInvalid(const Upstream::ClusterInfo& cluster) { + void incRequestInvalid(const Upstream::ClusterInfo& cluster) const { incClusterScopeCounter(cluster, nullptr, upstream_rq_invalid_type_); } /** * Increment counter for received responses that are replies that are successful. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host */ void incResponseReplySuccess(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host) { + Upstream::HostDescriptionConstSharedPtr upstream_host) const { incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_success_); } /** * Increment counter for received responses that are replies that are an error. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host */ void incResponseReplyError(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host) { + Upstream::HostDescriptionConstSharedPtr upstream_host) const { incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_error_); } /** * Increment counter for received responses that are exceptions. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host */ void incResponseException(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host) { + Upstream::HostDescriptionConstSharedPtr upstream_host) const { incClusterScopeCounter(cluster, upstream_host, upstream_resp_exception_); } /** * Increment counter for received responses that are invalid. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host */ void incResponseInvalidType(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host) { + Upstream::HostDescriptionConstSharedPtr upstream_host) const { incClusterScopeCounter(cluster, upstream_host, upstream_resp_invalid_type_); } /** * Record a value for the request size histogram. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param value uinty64_t size in bytes of the full request */ - void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) { + void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes, value); } /** * Record a value for the response size histogram. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param value uinty64_t size in bytes of the full response */ - void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) { + void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_, Stats::Histogram::Unit::Bytes, value); } /** - * Records the duration of the request for a given cluster. - * - * @param cluster ClusterInfo the cluster to record the duration for. - * @param value uint64_t the value of the duration. - * @param unit Unit the unit of the duration. - */ - void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster, - Upstream::HostDescriptionConstSharedPtr upstream_host, - uint64_t value, Stats::Histogram::Unit unit) { - recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_, unit, value); + * Record a value for the response time duration histogram. + * @param cluster Upstream::ClusterInfo& describing the upstream cluster + * @param upstream_host Upstream::HostDescriptionConstSharedPtr describing the upstream host + * @param value uint64_t duration in milliseconds to receive the complete response + */ + void recordUpstreamResponseTime(const Upstream::ClusterInfo& cluster, + Upstream::HostDescriptionConstSharedPtr upstream_host, + uint64_t value) const { + recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_, + Stats::Histogram::Unit::Milliseconds, value); + } + + const RouterNamedStats named_; + +private: + void incClusterScopeCounter(const Upstream::ClusterInfo& cluster, + Upstream::HostDescriptionConstSharedPtr upstream_host, + const Stats::StatName& stat_name) const { + const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name}); + cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc(); + const Stats::SymbolTable::StoragePtr zone_stat_name_storage = + upstreamZoneStatName(upstream_host, stat_name); + if (zone_stat_name_storage) { + cluster.statsScope().counterFromStatName(Stats::StatName(zone_stat_name_storage.get())).inc(); + } } + void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster, + Upstream::HostDescriptionConstSharedPtr upstream_host, + const Stats::StatName& stat_name, Stats::Histogram::Unit unit, + uint64_t value) const { + const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name}); + cluster.statsScope() + .histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit) + .recordValue(value); + const Stats::SymbolTable::StoragePtr zone_stat_name_storage = + upstreamZoneStatName(upstream_host, stat_name); + if (zone_stat_name_storage) { + cluster.statsScope() + .histogramFromStatName(Stats::StatName(zone_stat_name_storage.get()), unit) + .recordValue(value); + } + } + + Stats::SymbolTable::StoragePtr + upstreamZoneStatName(Upstream::HostDescriptionConstSharedPtr upstream_host, + const Stats::StatName& stat_name) const { + if (!upstream_host || local_zone_name_.empty()) { + return nullptr; + } + const auto& upstream_zone_name = upstream_host->localityZoneStatName(); + if (upstream_zone_name.empty()) { + return nullptr; + } + return symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name}); + } + + Stats::StatNameSetPtr stat_name_set_; + Stats::SymbolTable& symbol_table_; + const Stats::StatName upstream_rq_call_; + const Stats::StatName upstream_rq_oneway_; + const Stats::StatName upstream_rq_invalid_type_; + const Stats::StatName upstream_resp_reply_; + const Stats::StatName upstream_resp_reply_success_; + const Stats::StatName upstream_resp_reply_error_; + const Stats::StatName upstream_resp_exception_; + const Stats::StatName upstream_resp_invalid_type_; + const Stats::StatName upstream_rq_time_; + const Stats::StatName upstream_rq_size_; + const Stats::StatName upstream_resp_size_; + const Stats::StatName zone_; + const Stats::StatName local_zone_name_; +}; + +/** + * This interface is used by an upstream request to communicate its state. + */ +class RequestOwner : public ProtocolConverter, public Logger::Loggable { +public: + RequestOwner(Upstream::ClusterManager& cluster_manager, const RouterStats& stats) + : cluster_manager_(cluster_manager), stats_(stats) {} + ~RequestOwner() override = default; + + /** + * @return ConnectionPool::UpstreamCallbacks& the handler for upstream data. + */ + virtual Tcp::ConnectionPool::UpstreamCallbacks& upstreamCallbacks() PURE; + + /** + * @return Buffer::OwnedImpl& the buffer used to serialize the upstream request. + */ + virtual Buffer::OwnedImpl& buffer() PURE; + + /** + * @return Event::Dispatcher& the dispatcher used for timers, etc. + */ + virtual Event::Dispatcher& dispatcher() PURE; + + /** + * Converts message begin into the right protocol. + */ + void convertMessageBegin(MessageMetadataSharedPtr metadata) { + ProtocolConverter::messageBegin(metadata); + } + + /** + * Used to update the request size every time bytes are pushed out. + * + * @param size uint64_t the value of the increment. + */ + virtual void addSize(uint64_t size) PURE; + + /** + * Used to continue decoding if it was previously stopped. + */ + virtual void continueDecoding() PURE; + + /** + * Used to reset the downstream connection after an error. + */ + virtual void resetDownstreamConnection() PURE; + + /** + * Sends a locally generated response using the provided response object. + * + * @param response DirectResponse the response to send to the downstream client + * @param end_stream if true, the downstream connection should be closed after this response + */ + virtual void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) PURE; + + /** + * @return Upstream::ClusterManager& the cluster manager. + */ + Upstream::ClusterManager& clusterManager() { return cluster_manager_; } + + /** + * @return Upstream::Cluster& the upstream cluster associated with the request. + */ + const Upstream::ClusterInfo& cluster() const { return *cluster_; } + + /** + * @return RouterStats the common router stats. + */ + const RouterStats& stats() { return stats_; } + protected: struct UpstreamRequestInfo { bool passthrough_supported; @@ -318,7 +406,7 @@ class RequestOwner : public ProtocolConverter, public Logger::LoggablemessageType()) { case MessageType::Call: - incRequestCall(*cluster_); + stats().incRequestCall(*cluster_); break; case MessageType::Oneway: - incRequestOneWay(*cluster_); + stats().incRequestOneWay(*cluster_); break; default: - incRequestInvalid(*cluster_); + stats().incRequestInvalid(*cluster_); break; } if (cluster_->maintenanceMode()) { - stats().upstream_rq_maintenance_mode_.inc(); + stats().named_.upstream_rq_maintenance_mode_.inc(); return {AppException(AppExceptionType::InternalError, fmt::format("maintenance mode for cluster '{}'", cluster_name)), absl::nullopt}; @@ -360,7 +448,7 @@ class RequestOwner : public ProtocolConverter, public Logger::LoggabletcpConnPool(Upstream::ResourcePriority::Default, lb_context); if (!conn_pool_data) { - stats().no_healthy_upstream_.inc(); + stats().named_.no_healthy_upstream_.inc(); return {AppException(AppExceptionType::InternalError, fmt::format("no healthy upstream for '{}'", cluster_name)), absl::nullopt}; @@ -378,71 +466,8 @@ class RequestOwner : public ProtocolConverter, public Logger::LoggablelocalityZoneStatName(); - if (upstream_zone_name.empty()) { - return nullptr; - } - return symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name}); - } - - RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) { - return RouterStats{ALL_THRIFT_ROUTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), - POOL_GAUGE_PREFIX(scope, prefix), - POOL_HISTOGRAM_PREFIX(scope, prefix))}; - } - Upstream::ClusterManager& cluster_manager_; - RouterStats stats_; - Stats::StatNameSetPtr stat_name_set_; - Stats::SymbolTable& symbol_table_; - const Stats::StatName upstream_rq_call_; - const Stats::StatName upstream_rq_oneway_; - const Stats::StatName upstream_rq_invalid_type_; - const Stats::StatName upstream_resp_reply_; - const Stats::StatName upstream_resp_reply_success_; - const Stats::StatName upstream_resp_reply_error_; - const Stats::StatName upstream_resp_exception_; - const Stats::StatName upstream_resp_invalid_type_; - const Stats::StatName upstream_rq_time_; - const Stats::StatName upstream_rq_size_; - const Stats::StatName upstream_resp_size_; - const Stats::StatName zone_; - const Stats::StatName local_zone_name_; + const RouterStats& stats_; }; /** @@ -498,21 +523,6 @@ class ShadowWriter { */ virtual Upstream::ClusterManager& clusterManager() PURE; - /** - * @return std::string& the stat prefix used by the router. - */ - virtual const std::string& statPrefix() const PURE; - - /** - * @return Stats::Scope& the Scope used by the router. - */ - virtual Stats::Scope& scope() PURE; - - /** - * @return LocalInfo::LocalInfo& the local info used by the router. - */ - virtual const LocalInfo::LocalInfo& localInfo() const PURE; - /** * @return Dispatcher& the dispatcher. */ diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 5cdad0c0b468d..4d822a2458460 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -236,7 +236,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) { route_ = callbacks_->route(); if (!route_) { ENVOY_STREAM_LOG(debug, "no route match for method '{}'", *callbacks_, metadata->methodName()); - stats().route_missing_.inc(); + stats().named_.route_missing_.inc(); callbacks_->sendLocalReply( AppException(AppExceptionType::UnknownMethod, fmt::format("no route for method '{}'", metadata->methodName())), @@ -294,7 +294,7 @@ FilterStatus Router::messageEnd() { ProtocolConverter::messageEnd(); const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_); addSize(encode_size); - recordUpstreamRequestSize(*cluster_, request_size_); + stats().recordUpstreamRequestSize(*cluster_, request_size_); // Dispatch shadow requests, if any. // Note: if connections aren't ready, the write will happen when appropriate. diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index 07cd148e8faca..1fabf48b8ce0c 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -5,10 +5,7 @@ #include #include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h" -#include "envoy/local_info/local_info.h" #include "envoy/router/router.h" -#include "envoy/stats/scope.h" -#include "envoy/stats/stats_macros.h" #include "envoy/tcp/conn_pool.h" #include "envoy/upstream/load_balancer.h" @@ -218,11 +215,10 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, public RequestOwner, public ThriftFilters::DecoderFilter { public: - Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix, - Stats::Scope& scope, Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info, - ShadowWriter& shadow_writer) - : RequestOwner(cluster_manager, stat_prefix, scope, local_info), - passthrough_supported_(false), runtime_(runtime), shadow_writer_(shadow_writer) {} + Router(Upstream::ClusterManager& cluster_manager, const RouterStats& stats, + Runtime::Loader& runtime, ShadowWriter& shadow_writer) + : RequestOwner(cluster_manager, stats), passthrough_supported_(false), runtime_(runtime), + shadow_writer_(shadow_writer) {} ~Router() override = default; @@ -241,10 +237,6 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override { callbacks_->sendLocalReply(response, end_stream); } - void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value, - Stats::Histogram::Unit unit) override { - recordClusterResponseDuration(*cluster_, upstream_host, value, unit); - } // RequestOwner::ProtocolConverter FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override; diff --git a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.cc b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.cc index a69e70ffcb789..68df0aae71bf1 100644 --- a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.cc @@ -21,7 +21,7 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP original_transport, original_protocol); const bool created = shadow_router->createUpstreamRequest(); if (!created) { - stats_.shadow_request_submit_failure_.inc(); + stats_.named_.shadow_request_submit_failure_.inc(); return absl::nullopt; } @@ -34,10 +34,9 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP ShadowRouterImpl::ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name, MessageMetadataSharedPtr& metadata, TransportType transport_type, ProtocolType protocol_type) - : RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope(), - parent.localInfo()), - parent_(parent), cluster_name_(cluster_name), metadata_(metadata->clone()), - transport_type_(transport_type), protocol_type_(protocol_type), + : RequestOwner(parent.clusterManager(), parent.stats()), parent_(parent), + cluster_name_(cluster_name), metadata_(metadata->clone()), transport_type_(transport_type), + protocol_type_(protocol_type), transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()), protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()) { response_decoder_ = std::make_unique(*transport_, *protocol_); @@ -225,7 +224,7 @@ FilterStatus ShadowRouterImpl::messageEnd() { ProtocolConverter::messageEnd(); const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_); addSize(encode_size); - recordUpstreamRequestSize(*cluster_, request_size_); + stats().recordUpstreamRequestSize(*cluster_, request_size_); request_sent_ = true; diff --git a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h index d6eaae4ec8a49..98cdbe5374435 100644 --- a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h @@ -3,10 +3,7 @@ #include #include "envoy/event/dispatcher.h" -#include "envoy/local_info/local_info.h" #include "envoy/router/router.h" -#include "envoy/stats/scope.h" -#include "envoy/stats/stats_macros.h" #include "envoy/tcp/conn_pool.h" #include "envoy/upstream/load_balancer.h" @@ -143,10 +140,6 @@ class ShadowRouterImpl : public ShadowRouterHandle, void continueDecoding() override { flushPendingCallbacks(); } void resetDownstreamConnection() override {} void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {} - void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value, - Stats::Histogram::Unit unit) override { - recordClusterResponseDuration(*cluster_, upstream_host, value, unit); - } // RequestOwner::ProtocolConverter FilterStatus transportBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; } @@ -218,12 +211,6 @@ class ShadowRouterImpl : public ShadowRouterHandle, bool deferred_deleting_{}; }; -#define ALL_SHADOW_WRITER_STATS(COUNTER, GAUGE, HISTOGRAM) COUNTER(shadow_request_submit_failure) - -struct ShadowWriterStats { - ALL_SHADOW_WRITER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT) -}; - class ActiveRouters : public ThreadLocal::ThreadLocalObject { public: ActiveRouters(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} @@ -248,13 +235,9 @@ class ActiveRouters : public ThreadLocal::ThreadLocalObject { class ShadowWriterImpl : public ShadowWriter, Logger::Loggable { public: - ShadowWriterImpl(Upstream::ClusterManager& cm, const std::string& stat_prefix, - Stats::Scope& scope, Event::Dispatcher& dispatcher, - ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info) - : cm_(cm), stat_prefix_(stat_prefix), scope_(scope), local_info_(local_info), - dispatcher_(dispatcher), stats_(generateStats(stat_prefix, scope)), - tls_(tls.allocateSlot()) { - + ShadowWriterImpl(Upstream::ClusterManager& cm, const RouterStats& stats, + Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls) + : cm_(cm), stats_(stats), dispatcher_(dispatcher), tls_(tls.allocateSlot()) { tls_->set([](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(dispatcher); }); @@ -263,12 +246,10 @@ class ShadowWriterImpl : public ShadowWriter, Logger::LoggablegetTyped().remove(router); } + const RouterStats& stats() { return stats_; } // Router::ShadowWriter Upstream::ClusterManager& clusterManager() override { return cm_; } - const std::string& statPrefix() const override { return stat_prefix_; } - Stats::Scope& scope() override { return scope_; } - const LocalInfo::LocalInfo& localInfo() const override { return local_info_; } Event::Dispatcher& dispatcher() override { return dispatcher_; } absl::optional> submit(const std::string& cluster_name, MessageMetadataSharedPtr metadata, @@ -277,18 +258,9 @@ class ShadowWriterImpl : public ShadowWriter, Logger::LoggablemessageType()) { case MessageType::Reply: - parent_.incResponseReply(cluster, upstream_host_); + stats_.incResponseReply(cluster, upstream_host_); if (callbacks.responseSuccess()) { upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestSuccess); - parent_.incResponseReplySuccess(cluster, upstream_host_); + stats_.incResponseReplySuccess(cluster, upstream_host_); } else { upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestFailed); - parent_.incResponseReplyError(cluster, upstream_host_); + stats_.incResponseReplyError(cluster, upstream_host_); } break; case MessageType::Exception: upstream_host_->outlierDetector().putResult( Upstream::Outlier::Result::ExtOriginRequestFailed); - parent_.incResponseException(cluster, upstream_host_); + stats_.incResponseException(cluster, upstream_host_); break; default: - parent_.incResponseInvalidType(cluster, upstream_host_); + stats_.incResponseInvalidType(cluster, upstream_host_); break; } onResponseComplete(); @@ -315,8 +315,7 @@ void UpstreamRequest::chargeResponseTiming() { const std::chrono::milliseconds response_time = std::chrono::duration_cast( dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_); - parent_.recordResponseDuration(upstream_host_, response_time.count(), - Stats::Histogram::Unit::Milliseconds); + stats_.recordUpstreamResponseTime(parent_.cluster(), upstream_host_, response_time.count()); } } // namespace Router diff --git a/source/extensions/filters/network/thrift_proxy/router/upstream_request.h b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h index 287610b6b98cc..ae51ccddcff20 100644 --- a/source/extensions/filters/network/thrift_proxy/router/upstream_request.h +++ b/source/extensions/filters/network/thrift_proxy/router/upstream_request.h @@ -59,6 +59,7 @@ struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks, void chargeResponseTiming(); RequestOwner& parent_; + const RouterStats& stats_; Upstream::TcpPoolData& conn_pool_data_; MessageMetadataSharedPtr metadata_; diff --git a/test/extensions/filters/network/thrift_proxy/mocks.h b/test/extensions/filters/network/thrift_proxy/mocks.h index 933a4bbc09560..97b59928e580a 100644 --- a/test/extensions/filters/network/thrift_proxy/mocks.h +++ b/test/extensions/filters/network/thrift_proxy/mocks.h @@ -1,6 +1,5 @@ #pragma once -#include "envoy/local_info/local_info.h" #include "envoy/router/router.h" #include "source/extensions/filters/network/thrift_proxy/conn_manager.h" @@ -363,9 +362,6 @@ class MockShadowWriter : public ShadowWriter { ~MockShadowWriter() override; MOCK_METHOD(Upstream::ClusterManager&, clusterManager, (), ()); - MOCK_METHOD(std::string&, statPrefix, (), (const)); - MOCK_METHOD(Stats::Scope&, scope, (), ()); - MOCK_METHOD(LocalInfo::LocalInfo&, localInfo, (), (const)); MOCK_METHOD(Event::Dispatcher&, dispatcher, (), ()); MOCK_METHOD(absl::optional>, submit, (const std::string&, MessageMetadataSharedPtr, TransportType, ProtocolType), ()); diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index e98752c513bdc..b86abbfd551cc 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -112,16 +112,15 @@ class ThriftRouterTestBase { route_ = new NiceMock(); route_ptr_.reset(route_); + stats_ = std::make_shared("test", context_.scope(), context_.localInfo()); if (!use_real_shadow_writer) { - router_ = std::make_unique(context_.clusterManager(), "test", context_.scope(), - context_.runtime(), context_.localInfo(), shadow_writer_); + router_ = std::make_unique(context_.clusterManager(), *stats_, context_.runtime(), + shadow_writer_); } else { - shadow_writer_impl_ = std::make_shared( - context_.clusterManager(), "test", context_.scope(), dispatcher_, context_.threadLocal(), - context_.localInfo()); - router_ = - std::make_unique(context_.clusterManager(), "test", context_.scope(), - context_.runtime(), context_.localInfo(), *shadow_writer_impl_); + shadow_writer_impl_ = std::make_shared(context_.clusterManager(), *stats_, + dispatcher_, context_.threadLocal()); + router_ = std::make_unique(context_.clusterManager(), *stats_, context_.runtime(), + *shadow_writer_impl_); } EXPECT_EQ(nullptr, router_->downstreamConnection()); @@ -491,6 +490,7 @@ class ThriftRouterTestBase { NiceMock context_; std::unique_ptr router_; + std::shared_ptr stats_; MockShadowWriter shadow_writer_; std::shared_ptr shadow_writer_impl_; diff --git a/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc b/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc index a30ba5cd88393..1a15a105ea1d4 100644 --- a/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc +++ b/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc @@ -39,8 +39,9 @@ struct MockNullResponseDecoder : public NullResponseDecoder { class ShadowWriterTest : public testing::Test { public: ShadowWriterTest() { - shadow_writer_ = std::make_shared( - cm_, "test", context_.scope(), dispatcher_, context_.threadLocal(), context_.localInfo()); + stats_ = std::make_shared("test", context_.scope(), context_.localInfo()); + shadow_writer_ = + std::make_shared(cm_, *stats_, dispatcher_, context_.threadLocal()); metadata_ = std::make_shared(); metadata_->setMethodName("ping"); metadata_->setMessageType(MessageType::Call); @@ -270,6 +271,7 @@ class ShadowWriterTest : public testing::Test { NiceMock conn_pool_; std::shared_ptr> host_; envoy::config::core::v3::Locality upstream_locality_; + std::shared_ptr stats_; std::shared_ptr shadow_writer_; }; From b91dd3e4d5c978dab97c889cf83d0a02c337bd2f Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 12 Oct 2021 10:00:44 -0700 Subject: [PATCH 2/2] Fix spelling in comment Signed-off-by: James Fish --- .../extensions/filters/network/thrift_proxy/router/router.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/extensions/filters/network/thrift_proxy/router/router.h b/source/extensions/filters/network/thrift_proxy/router/router.h index e1e34622827ca..e7df1eb074d52 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router.h +++ b/source/extensions/filters/network/thrift_proxy/router/router.h @@ -222,7 +222,7 @@ class RouterStats { /** * Record a value for the request size histogram. * @param cluster Upstream::ClusterInfo& describing the upstream cluster - * @param value uinty64_t size in bytes of the full request + * @param value uint64_t size in bytes of the full request */ void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes, @@ -232,7 +232,7 @@ class RouterStats { /** * Record a value for the response size histogram. * @param cluster Upstream::ClusterInfo& describing the upstream cluster - * @param value uinty64_t size in bytes of the full response + * @param value uint64_t size in bytes of the full response */ void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) const { recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_,