Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Removed Config or Runtime
New Features
------------
* http: added support for :ref:`retriable health check status codes <envoy_v3_api_field_config.core.v3.HealthCheck.HttpHealthCheck.retriable_statuses>`.
* thrift_proxy: add upstream response zone metrics in the form ``cluster.cluster_name.zone.local_zone.upstream_zone.thrift.upstream_resp_success``.

Deprecated
----------
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ ThriftFilters::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoT

auto shadow_writer =
std::make_shared<ShadowWriterImpl>(context.clusterManager(), stat_prefix, context.scope(),
context.mainThreadDispatcher(), context.threadLocal());
context.mainThreadDispatcher(), context.threadLocal(),
context.localInfo());

return [&context, stat_prefix,
shadow_writer](ThriftFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Router>(
context.clusterManager(), stat_prefix, context.scope(), context.runtime(), *shadow_writer));
context.clusterManager(), stat_prefix, context.scope(), context.runtime(), context.localInfo(), *shadow_writer));
Comment thread
fishcakez marked this conversation as resolved.
Outdated
};
}

Expand Down
97 changes: 71 additions & 26 deletions source/extensions/filters/network/thrift_proxy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "envoy/buffer/buffer.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/tcp/conn_pool.h"

Expand Down Expand Up @@ -117,7 +118,7 @@ struct RouterStats {
class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::Id::thrift> {
public:
RequestOwner(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope)
Stats::Scope& scope, const LocalInfo::LocalInfo& local_info)
: cluster_manager_(cluster_manager), stats_(generateStats(stat_prefix, scope)),
stat_name_set_(scope.symbolTable().makeSet("thrift_proxy")),
symbol_table_(scope.symbolTable()),
Expand All @@ -131,7 +132,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")) {}
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")),
zone_(stat_name_set_->add("zone")), local_zone_name_(local_info.zoneStatName()) {}
~RequestOwner() override = default;

/**
Expand Down Expand Up @@ -187,7 +189,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
* @param value uint64_t the value of the duration.
* @param unit Unit the unit of the duration.
*/
virtual void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) PURE;
virtual void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host,
uint64_t value, Stats::Histogram::Unit unit) PURE;

/**
* @return Upstream::ClusterManager& the cluster manager.
Expand All @@ -207,72 +210,78 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
/**
* Increment counter for received responses that are replies.
*/
void incResponseReply(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_});
void incResponseReply(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_);
}

/**
* Increment counter for request calls.
*/
void incRequestCall(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_call_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_call_);
}

/**
* Increment counter for requests that are one way only.
*/
void incRequestOneWay(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_oneway_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_oneway_);
}

/**
* Increment counter for requests that are invalid.
*/
void incRequestInvalid(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_rq_invalid_type_});
incClusterScopeCounter(cluster, nullptr, upstream_rq_invalid_type_);
}

/**
* Increment counter for received responses that are replies that are successful.
*/
void incResponseReplySuccess(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_success_});
void incResponseReplySuccess(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_success_);
}

/**
* Increment counter for received responses that are replies that are an error.
*/
void incResponseReplyError(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_reply_error_});
void incResponseReplyError(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_reply_error_);
}

/**
* Increment counter for received responses that are exceptions.
*/
void incResponseException(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_exception_});
void incResponseException(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_exception_);
}

/**
* Increment counter for received responses that are invalid.
*/
void incResponseInvalidType(const Upstream::ClusterInfo& cluster) {
incClusterScopeCounter(cluster, {upstream_resp_invalid_type_});
void incResponseInvalidType(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host) {
incClusterScopeCounter(cluster, upstream_host, upstream_resp_invalid_type_);
}

/**
* Record a value for the request size histogram.
*/
void recordUpstreamRequestSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_rq_size_}, Stats::Histogram::Unit::Bytes, value);
recordClusterScopeHistogram(cluster, nullptr, upstream_rq_size_, Stats::Histogram::Unit::Bytes,
value);
}

/**
* Record a value for the response size histogram.
*/
void recordUpstreamResponseSize(const Upstream::ClusterInfo& cluster, uint64_t value) {
recordClusterScopeHistogram(cluster, {upstream_resp_size_}, Stats::Histogram::Unit::Bytes,
value);
recordClusterScopeHistogram(cluster, nullptr, upstream_resp_size_,
Stats::Histogram::Unit::Bytes, value);
}

/**
Expand All @@ -282,9 +291,10 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
* @param value uint64_t the value of the duration.
* @param unit Unit the unit of the duration.
*/
void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster, uint64_t value,
Stats::Histogram::Unit unit) {
recordClusterScopeHistogram(cluster, {upstream_rq_time_}, unit, value);
void recordClusterResponseDuration(const Upstream::ClusterInfo& cluster,
Upstream::HostDescriptionConstSharedPtr upstream_host,
uint64_t value, Stats::Histogram::Unit unit) {
recordClusterScopeHistogram(cluster, upstream_host, upstream_rq_time_, unit, value);
}

protected:
Expand Down Expand Up @@ -369,18 +379,46 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I

private:
void incClusterScopeCounter(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
Upstream::HostDescriptionConstSharedPtr upstream_host,
const Stats::StatName& stat_name) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
cluster.statsScope().counterFromStatName(Stats::StatName(stat_name_storage.get())).inc();
if (!upstream_host || local_zone_name_.empty()) {
return;
}

const auto& upstream_zone_name = upstream_host->localityZoneStatName();
if (upstream_zone_name.empty()) {
return;
}

const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name});
Comment thread
fishcakez marked this conversation as resolved.
Outdated
cluster.statsScope().counterFromStatName(Stats::StatName(zone_stat_name_storage.get())).inc();
}

void recordClusterScopeHistogram(const Upstream::ClusterInfo& cluster,
const Stats::StatNameVec& names, Stats::Histogram::Unit unit,
Upstream::HostDescriptionConstSharedPtr upstream_host,
const Stats::StatName& stat_name, Stats::Histogram::Unit unit,
uint64_t value) const {
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join(names);
const Stats::SymbolTable::StoragePtr stat_name_storage = symbol_table_.join({stat_name});
cluster.statsScope()
.histogramFromStatName(Stats::StatName(stat_name_storage.get()), unit)
.recordValue(value);
if (!upstream_host || local_zone_name_.empty()) {
return;
}

const auto& upstream_zone_name = upstream_host->localityZoneStatName();
if (upstream_zone_name.empty()) {
return;
}

const Stats::SymbolTable::StoragePtr zone_stat_name_storage =
symbol_table_.join({zone_, local_zone_name_, upstream_zone_name, stat_name});
cluster.statsScope()
.histogramFromStatName(Stats::StatName(zone_stat_name_storage.get()), unit)
.recordValue(value);
}

RouterStats generateStats(const std::string& prefix, Stats::Scope& scope) {
Expand All @@ -404,6 +442,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
const Stats::StatName upstream_rq_time_;
const Stats::StatName upstream_rq_size_;
const Stats::StatName upstream_resp_size_;
const Stats::StatName zone_;
const Stats::StatName local_zone_name_;
};

/**
Expand Down Expand Up @@ -469,6 +509,11 @@ class ShadowWriter {
*/
virtual Stats::Scope& scope() PURE;

/**
* @return LocalInfo::LocalInfo& the local info used by the router.
*/
virtual const LocalInfo::LocalInfo& localInfo() const PURE;

/**
* @return Dispatcher& the dispatcher.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -218,9 +219,10 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
public ThriftFilters::DecoderFilter {
public:
Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope, Runtime::Loader& runtime, ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stat_prefix, scope), passthrough_supported_(false),
runtime_(runtime), shadow_writer_(shadow_writer) {}
Stats::Scope& scope, Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stat_prefix, scope, local_info),
passthrough_supported_(false), runtime_(runtime), shadow_writer_(shadow_writer) {}

~Router() override = default;

Expand All @@ -239,8 +241,9 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override {
callbacks_->sendLocalReply(response, end_stream);
}
void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, value, unit);
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP
ShadowRouterImpl::ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name,
MessageMetadataSharedPtr& metadata, TransportType transport_type,
ProtocolType protocol_type)
: RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope()), parent_(parent),
cluster_name_(cluster_name), metadata_(metadata->clone()), transport_type_(transport_type),
protocol_type_(protocol_type),
: RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope(),
parent.localInfo()),
parent_(parent), cluster_name_(cluster_name), metadata_(metadata->clone()),
transport_type_(transport_type), protocol_type_(protocol_type),
transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()) {
response_decoder_ = std::make_unique<NullResponseDecoder>(*transport_, *protocol_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <memory>

#include "envoy/event/dispatcher.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
Expand Down Expand Up @@ -142,8 +143,9 @@ class ShadowRouterImpl : public ShadowRouterHandle,
void continueDecoding() override { flushPendingCallbacks(); }
void resetDownstreamConnection() override {}
void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {}
void recordResponseDuration(uint64_t value, Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, value, unit);
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
Expand Down Expand Up @@ -248,9 +250,10 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
public:
ShadowWriterImpl(Upstream::ClusterManager& cm, const std::string& stat_prefix,
Stats::Scope& scope, Event::Dispatcher& dispatcher,
ThreadLocal::SlotAllocator& tls)
: cm_(cm), stat_prefix_(stat_prefix), scope_(scope), dispatcher_(dispatcher),
stats_(generateStats(stat_prefix, scope)), tls_(tls.allocateSlot()) {
ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info)
: cm_(cm), stat_prefix_(stat_prefix), scope_(scope), local_info_(local_info),
dispatcher_(dispatcher), stats_(generateStats(stat_prefix, scope)),
tls_(tls.allocateSlot()) {

tls_->set([](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ActiveRouters>(dispatcher);
Expand All @@ -265,6 +268,7 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
Upstream::ClusterManager& clusterManager() override { return cm_; }
const std::string& statPrefix() const override { return stat_prefix_; }
Stats::Scope& scope() override { return scope_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
absl::optional<std::reference_wrapper<ShadowRouterHandle>>
submit(const std::string& cluster_name, MessageMetadataSharedPtr metadata,
Expand All @@ -282,6 +286,7 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
Upstream::ClusterManager& cm_;
const std::string stat_prefix_;
Stats::Scope& scope_;
const LocalInfo::LocalInfo& local_info_;
Event::Dispatcher& dispatcher_;
ShadowWriterStats stats_;
ThreadLocal::SlotPtr tls_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,26 +133,26 @@ UpstreamRequest::handleRegularResponse(Buffer::Instance& data,

switch (callbacks.responseMetadata()->messageType()) {
case MessageType::Reply:
parent_.incResponseReply(cluster);
parent_.incResponseReply(cluster, upstream_host_);
if (callbacks.responseSuccess()) {
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestSuccess);
parent_.incResponseReplySuccess(cluster);
parent_.incResponseReplySuccess(cluster, upstream_host_);
} else {
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
parent_.incResponseReplyError(cluster);
parent_.incResponseReplyError(cluster, upstream_host_);
}
break;

case MessageType::Exception:
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
parent_.incResponseException(cluster);
parent_.incResponseException(cluster, upstream_host_);
break;

default:
parent_.incResponseInvalidType(cluster);
parent_.incResponseInvalidType(cluster, upstream_host_);
break;
}
onResponseComplete();
Expand Down Expand Up @@ -315,7 +315,7 @@ void UpstreamRequest::chargeResponseTiming() {
const std::chrono::milliseconds response_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
parent_.recordResponseDuration(response_time.count(), Stats::Histogram::Unit::Milliseconds);
parent_.recordResponseDuration(upstream_host_, response_time.count(), Stats::Histogram::Unit::Milliseconds);
}

} // namespace Router
Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ envoy_extension_cc_test(
"//test/mocks/upstream:host_mocks",
"//test/test_common:printers_lib",
"//test/test_common:registry_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/config/filter/thrift/router/v2alpha1:pkg_cc_proto",
],
)
Expand Down Expand Up @@ -379,6 +380,7 @@ envoy_extension_cc_test(
"//test/mocks/upstream:host_mocks",
"//test/test_common:printers_lib",
"//test/test_common:registry_lib",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/network/thrift_proxy/v3:pkg_cc_proto",
],
)
Loading