Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ ThriftFilters::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoT
const std::string& stat_prefix, Server::Configuration::FactoryContext& context) {
UNREFERENCED_PARAMETER(proto_config);

auto stats =
std::make_shared<const RouterStats>(stat_prefix, context.scope(), context.localInfo());
auto shadow_writer = std::make_shared<ShadowWriterImpl>(
context.clusterManager(), stat_prefix, context.scope(), context.mainThreadDispatcher(),
context.threadLocal(), context.localInfo());
context.clusterManager(), *stats, context.mainThreadDispatcher(), context.threadLocal());

return [&context, stat_prefix,
return [&context, stats,
shadow_writer](ThriftFilters::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addDecoderFilter(std::make_shared<Router>(context.clusterManager(), stat_prefix,
context.scope(), context.runtime(),
context.localInfo(), *shadow_writer));
callbacks.addDecoderFilter(std::make_shared<Router>(context.clusterManager(), *stats,
context.runtime(), *shadow_writer));
};
}

Expand Down
378 changes: 194 additions & 184 deletions source/extensions/filters/network/thrift_proxy/router/router.h

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ FilterStatus Router::messageBegin(MessageMetadataSharedPtr metadata) {
route_ = callbacks_->route();
if (!route_) {
ENVOY_STREAM_LOG(debug, "no route match for method '{}'", *callbacks_, metadata->methodName());
stats().route_missing_.inc();
stats().named_.route_missing_.inc();
callbacks_->sendLocalReply(
AppException(AppExceptionType::UnknownMethod,
fmt::format("no route for method '{}'", metadata->methodName())),
Expand Down Expand Up @@ -294,7 +294,7 @@ FilterStatus Router::messageEnd() {
ProtocolConverter::messageEnd();
const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_);
addSize(encode_size);
recordUpstreamRequestSize(*cluster_, request_size_);
stats().recordUpstreamRequestSize(*cluster_, request_size_);

// Dispatch shadow requests, if any.
// Note: if connections aren't ready, the write will happen when appropriate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
#include <vector>

#include "envoy/extensions/filters/network/thrift_proxy/v3/route.pb.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
#include "envoy/tcp/conn_pool.h"
#include "envoy/upstream/load_balancer.h"

Expand Down Expand Up @@ -218,11 +215,10 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
public RequestOwner,
public ThriftFilters::DecoderFilter {
public:
Router(Upstream::ClusterManager& cluster_manager, const std::string& stat_prefix,
Stats::Scope& scope, Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stat_prefix, scope, local_info),
passthrough_supported_(false), runtime_(runtime), shadow_writer_(shadow_writer) {}
Router(Upstream::ClusterManager& cluster_manager, const RouterStats& stats,
Runtime::Loader& runtime, ShadowWriter& shadow_writer)
: RequestOwner(cluster_manager, stats), passthrough_supported_(false), runtime_(runtime),
shadow_writer_(shadow_writer) {}

~Router() override = default;

Expand All @@ -241,10 +237,6 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
void sendLocalReply(const ThriftProxy::DirectResponse& response, bool end_stream) override {
callbacks_->sendLocalReply(response, end_stream);
}
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP
original_transport, original_protocol);
const bool created = shadow_router->createUpstreamRequest();
if (!created) {
stats_.shadow_request_submit_failure_.inc();
stats_.named_.shadow_request_submit_failure_.inc();
return absl::nullopt;
}

Expand All @@ -34,10 +34,9 @@ ShadowWriterImpl::submit(const std::string& cluster_name, MessageMetadataSharedP
ShadowRouterImpl::ShadowRouterImpl(ShadowWriterImpl& parent, const std::string& cluster_name,
MessageMetadataSharedPtr& metadata, TransportType transport_type,
ProtocolType protocol_type)
: RequestOwner(parent.clusterManager(), parent.statPrefix(), parent.scope(),
parent.localInfo()),
parent_(parent), cluster_name_(cluster_name), metadata_(metadata->clone()),
transport_type_(transport_type), protocol_type_(protocol_type),
: RequestOwner(parent.clusterManager(), parent.stats()), parent_(parent),
cluster_name_(cluster_name), metadata_(metadata->clone()), transport_type_(transport_type),
protocol_type_(protocol_type),
transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()) {
response_decoder_ = std::make_unique<NullResponseDecoder>(*transport_, *protocol_);
Expand Down Expand Up @@ -225,7 +224,7 @@ FilterStatus ShadowRouterImpl::messageEnd() {
ProtocolConverter::messageEnd();
const auto encode_size = upstream_request_->encodeAndWrite(upstream_request_buffer_);
addSize(encode_size);
recordUpstreamRequestSize(*cluster_, request_size_);
stats().recordUpstreamRequestSize(*cluster_, request_size_);

request_sent_ = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
#include <memory>

#include "envoy/event/dispatcher.h"
#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"
#include "envoy/stats/scope.h"
#include "envoy/stats/stats_macros.h"
#include "envoy/tcp/conn_pool.h"
#include "envoy/upstream/load_balancer.h"

Expand Down Expand Up @@ -143,10 +140,6 @@ class ShadowRouterImpl : public ShadowRouterHandle,
void continueDecoding() override { flushPendingCallbacks(); }
void resetDownstreamConnection() override {}
void sendLocalReply(const ThriftProxy::DirectResponse&, bool) override {}
void recordResponseDuration(Upstream::HostDescriptionConstSharedPtr upstream_host, uint64_t value,
Stats::Histogram::Unit unit) override {
recordClusterResponseDuration(*cluster_, upstream_host, value, unit);
}

// RequestOwner::ProtocolConverter
FilterStatus transportBegin(MessageMetadataSharedPtr) override { return FilterStatus::Continue; }
Expand Down Expand Up @@ -218,12 +211,6 @@ class ShadowRouterImpl : public ShadowRouterHandle,
bool deferred_deleting_{};
};

#define ALL_SHADOW_WRITER_STATS(COUNTER, GAUGE, HISTOGRAM) COUNTER(shadow_request_submit_failure)

struct ShadowWriterStats {
ALL_SHADOW_WRITER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT, GENERATE_HISTOGRAM_STRUCT)
};

class ActiveRouters : public ThreadLocal::ThreadLocalObject {
public:
ActiveRouters(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}
Expand All @@ -248,13 +235,9 @@ class ActiveRouters : public ThreadLocal::ThreadLocalObject {

class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrift> {
public:
ShadowWriterImpl(Upstream::ClusterManager& cm, const std::string& stat_prefix,
Stats::Scope& scope, Event::Dispatcher& dispatcher,
ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info)
: cm_(cm), stat_prefix_(stat_prefix), scope_(scope), local_info_(local_info),
dispatcher_(dispatcher), stats_(generateStats(stat_prefix, scope)),
tls_(tls.allocateSlot()) {

ShadowWriterImpl(Upstream::ClusterManager& cm, const RouterStats& stats,
Event::Dispatcher& dispatcher, ThreadLocal::SlotAllocator& tls)
: cm_(cm), stats_(stats), dispatcher_(dispatcher), tls_(tls.allocateSlot()) {
tls_->set([](Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ActiveRouters>(dispatcher);
});
Expand All @@ -263,12 +246,10 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
~ShadowWriterImpl() override = default;

void remove(ShadowRouterImpl& router) { tls_->getTyped<ActiveRouters>().remove(router); }
const RouterStats& stats() { return stats_; }

// Router::ShadowWriter
Upstream::ClusterManager& clusterManager() override { return cm_; }
const std::string& statPrefix() const override { return stat_prefix_; }
Stats::Scope& scope() override { return scope_; }
const LocalInfo::LocalInfo& localInfo() const override { return local_info_; }
Event::Dispatcher& dispatcher() override { return dispatcher_; }
absl::optional<std::reference_wrapper<ShadowRouterHandle>>
submit(const std::string& cluster_name, MessageMetadataSharedPtr metadata,
Expand All @@ -277,18 +258,9 @@ class ShadowWriterImpl : public ShadowWriter, Logger::Loggable<Logger::Id::thrif
private:
friend class ShadowRouterImpl;

ShadowWriterStats generateStats(const std::string& prefix, Stats::Scope& scope) {
return ShadowWriterStats{ALL_SHADOW_WRITER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
POOL_GAUGE_PREFIX(scope, prefix),
POOL_HISTOGRAM_PREFIX(scope, prefix))};
}

Upstream::ClusterManager& cm_;
const std::string stat_prefix_;
Stats::Scope& scope_;
const LocalInfo::LocalInfo& local_info_;
const RouterStats& stats_;
Event::Dispatcher& dispatcher_;
ShadowWriterStats stats_;
ThreadLocal::SlotPtr tls_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Router {
UpstreamRequest::UpstreamRequest(RequestOwner& parent, Upstream::TcpPoolData& pool_data,
MessageMetadataSharedPtr& metadata, TransportType transport_type,
ProtocolType protocol_type)
: parent_(parent), conn_pool_data_(pool_data), metadata_(metadata),
: parent_(parent), stats_(parent.stats()), conn_pool_data_(pool_data), metadata_(metadata),
transport_(NamedTransportConfigFactory::getFactory(transport_type).createTransport()),
protocol_(NamedProtocolConfigFactory::getFactory(protocol_type).createProtocol()),
request_complete_(false), response_started_(false), response_complete_(false) {}
Expand Down Expand Up @@ -129,30 +129,30 @@ UpstreamRequest::handleRegularResponse(Buffer::Instance& data,
if (status == ThriftFilters::ResponseStatus::Complete) {
ENVOY_LOG(debug, "response complete");

parent_.recordUpstreamResponseSize(cluster, response_size_);
stats_.recordUpstreamResponseSize(cluster, response_size_);

switch (callbacks.responseMetadata()->messageType()) {
case MessageType::Reply:
parent_.incResponseReply(cluster, upstream_host_);
stats_.incResponseReply(cluster, upstream_host_);
if (callbacks.responseSuccess()) {
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestSuccess);
parent_.incResponseReplySuccess(cluster, upstream_host_);
stats_.incResponseReplySuccess(cluster, upstream_host_);
} else {
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
parent_.incResponseReplyError(cluster, upstream_host_);
stats_.incResponseReplyError(cluster, upstream_host_);
}
break;

case MessageType::Exception:
upstream_host_->outlierDetector().putResult(
Upstream::Outlier::Result::ExtOriginRequestFailed);
parent_.incResponseException(cluster, upstream_host_);
stats_.incResponseException(cluster, upstream_host_);
break;

default:
parent_.incResponseInvalidType(cluster, upstream_host_);
stats_.incResponseInvalidType(cluster, upstream_host_);
break;
}
onResponseComplete();
Expand Down Expand Up @@ -315,8 +315,7 @@ void UpstreamRequest::chargeResponseTiming() {
const std::chrono::milliseconds response_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
parent_.recordResponseDuration(upstream_host_, response_time.count(),
Stats::Histogram::Unit::Milliseconds);
stats_.recordUpstreamResponseTime(parent_.cluster(), upstream_host_, response_time.count());
}

} // namespace Router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct UpstreamRequest : public Tcp::ConnectionPool::Callbacks,
void chargeResponseTiming();

RequestOwner& parent_;
const RouterStats& stats_;
Upstream::TcpPoolData& conn_pool_data_;
MessageMetadataSharedPtr metadata_;

Expand Down
4 changes: 0 additions & 4 deletions test/extensions/filters/network/thrift_proxy/mocks.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "envoy/local_info/local_info.h"
#include "envoy/router/router.h"

#include "source/extensions/filters/network/thrift_proxy/conn_manager.h"
Expand Down Expand Up @@ -363,9 +362,6 @@ class MockShadowWriter : public ShadowWriter {
~MockShadowWriter() override;

MOCK_METHOD(Upstream::ClusterManager&, clusterManager, (), ());
MOCK_METHOD(std::string&, statPrefix, (), (const));
MOCK_METHOD(Stats::Scope&, scope, (), ());
MOCK_METHOD(LocalInfo::LocalInfo&, localInfo, (), (const));
MOCK_METHOD(Event::Dispatcher&, dispatcher, (), ());
MOCK_METHOD(absl::optional<std::reference_wrapper<ShadowRouterHandle>>, submit,
(const std::string&, MessageMetadataSharedPtr, TransportType, ProtocolType), ());
Expand Down
16 changes: 8 additions & 8 deletions test/extensions/filters/network/thrift_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,15 @@ class ThriftRouterTestBase {
route_ = new NiceMock<MockRoute>();
route_ptr_.reset(route_);

stats_ = std::make_shared<const RouterStats>("test", context_.scope(), context_.localInfo());
if (!use_real_shadow_writer) {
router_ = std::make_unique<Router>(context_.clusterManager(), "test", context_.scope(),
context_.runtime(), context_.localInfo(), shadow_writer_);
router_ = std::make_unique<Router>(context_.clusterManager(), *stats_, context_.runtime(),
shadow_writer_);
} else {
shadow_writer_impl_ = std::make_shared<ShadowWriterImpl>(
context_.clusterManager(), "test", context_.scope(), dispatcher_, context_.threadLocal(),
context_.localInfo());
router_ =
std::make_unique<Router>(context_.clusterManager(), "test", context_.scope(),
context_.runtime(), context_.localInfo(), *shadow_writer_impl_);
shadow_writer_impl_ = std::make_shared<ShadowWriterImpl>(context_.clusterManager(), *stats_,
dispatcher_, context_.threadLocal());
router_ = std::make_unique<Router>(context_.clusterManager(), *stats_, context_.runtime(),
*shadow_writer_impl_);
}

EXPECT_EQ(nullptr, router_->downstreamConnection());
Expand Down Expand Up @@ -491,6 +490,7 @@ class ThriftRouterTestBase {
NiceMock<Server::Configuration::MockFactoryContext> context_;

std::unique_ptr<Router> router_;
std::shared_ptr<const RouterStats> stats_;
MockShadowWriter shadow_writer_;
std::shared_ptr<ShadowWriterImpl> shadow_writer_impl_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ struct MockNullResponseDecoder : public NullResponseDecoder {
class ShadowWriterTest : public testing::Test {
public:
ShadowWriterTest() {
shadow_writer_ = std::make_shared<ShadowWriterImpl>(
cm_, "test", context_.scope(), dispatcher_, context_.threadLocal(), context_.localInfo());
stats_ = std::make_shared<const RouterStats>("test", context_.scope(), context_.localInfo());
shadow_writer_ =
std::make_shared<ShadowWriterImpl>(cm_, *stats_, dispatcher_, context_.threadLocal());
metadata_ = std::make_shared<MessageMetadata>();
metadata_->setMethodName("ping");
metadata_->setMessageType(MessageType::Call);
Expand Down Expand Up @@ -270,6 +271,7 @@ class ShadowWriterTest : public testing::Test {
NiceMock<Tcp::ConnectionPool::MockInstance> conn_pool_;
std::shared_ptr<NiceMock<Upstream::MockHost>> host_;
envoy::config::core::v3::Locality upstream_locality_;
std::shared_ptr<const RouterStats> stats_;
std::shared_ptr<ShadowWriterImpl> shadow_writer_;
};

Expand Down