Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,10 @@ Since these stats utilize the underlying cluster scope, we prefix with the ``thr
thrift.upstream_resp_exception, Counter, Total responses with the "Exception" message type.
thrift.upstream_resp_invalid_type, Counter, Total responses with an unsupported message type.
thrift.upstream_rq_time, Histogram, total rq time from rq complete to resp complete; includes oneway messages.
thrift.upstream_rq_size, Histogram, Request message size in bytes per upstream
thrift.upstream_resp_size, Histogram, Response message size in bytes per upstream

.. note::

The request and response size histograms include what's sent and received during protocol upgrade.
However, invalid responses are not included in the response size histogram.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ New Features
* listener: added ability to change an existing listener's address.
* metric service: added support for sending metric tags as labels. This can be enabled by setting the :ref:`emit_tags_as_labels <envoy_v3_api_field_config.metrics.v3.MetricsServiceConfig.emit_tags_as_labels>` field to true.
* tcp: added support for :ref:`preconnecting <v1.18.0:envoy_v3_api_msg_config.cluster.v3.Cluster.PreconnectPolicy>`. Preconnecting is off by default, but recommended for clusters serving latency-sensitive traffic.
* thrift_proxy: added per upstream metrics within the :ref:`thrift router <envoy_v3_api_msg_extensions.filters.network.thrift_proxy.router.v3.Router>` for request and response size histograms.
* udp_proxy: added :ref:`key <envoy_v3_api_msg_extensions.filters.udp.udp_proxy.v3.UdpProxyConfig.HashPolicy>` as another hash policy to support hash based routing on any given key.

Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ FilterStatus Router::messageEnd() {

upstream_request_->transport_->encodeFrame(transport_buffer, *upstream_request_->metadata_,
upstream_request_buffer_);

request_size_ += transport_buffer.length();
recordClusterScopeHistogram({upstream_rq_size_}, Stats::Histogram::Unit::Bytes, request_size_);

upstream_request_->conn_data_->connection().write(transport_buffer, false);
upstream_request_->onRequestComplete();
return FilterStatus::Continue;
Expand All @@ -324,6 +328,8 @@ FilterStatus Router::messageEnd() {
void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
ASSERT(!upstream_request_->response_complete_);

response_size_ += data.length();

if (upstream_request_->upgrade_response_ != nullptr) {
ENVOY_STREAM_LOG(trace, "reading upgrade response: {} bytes", *callbacks_, data.length());
// Handle upgrade response.
Expand Down Expand Up @@ -351,6 +357,9 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data);
if (status == ThriftFilters::ResponseStatus::Complete) {
ENVOY_STREAM_LOG(debug, "response complete", *callbacks_);
recordClusterScopeHistogram({upstream_resp_size_}, Stats::Histogram::Unit::Bytes,
response_size_);

switch (callbacks_->responseMetadata()->messageType()) {
case MessageType::Reply:
incClusterScopeCounter({upstream_resp_reply_});
Expand All @@ -373,6 +382,7 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) {
cleanup();
return;
} else if (status == ThriftFilters::ResponseStatus::Reset) {
// Note: invalid responses are not accounted in the response size histogram.
ENVOY_STREAM_LOG(debug, "upstream reset", *callbacks_);
upstream_request_->resetStream();
return;
Expand Down Expand Up @@ -503,6 +513,7 @@ void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr
upgrade_response_ =
protocol_->attemptUpgrade(*transport_, *conn_state_, parent_.upstream_request_buffer_);
if (upgrade_response_ != nullptr) {
parent_.request_size_ += parent_.upstream_request_buffer_.length();
conn_data_->connection().write(parent_.upstream_request_buffer_, false);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")),
upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")),
upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")),
upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")),
upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")),
passthrough_supported_(false) {}

~Router() override = default;
Expand Down Expand Up @@ -299,6 +301,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
const Stats::StatName upstream_resp_exception_;
const Stats::StatName upstream_resp_invalid_type_;
const Stats::StatName upstream_rq_time_;
const Stats::StatName upstream_rq_size_;
const Stats::StatName upstream_resp_size_;

ThriftFilters::DecoderFilterCallbacks* callbacks_{};
RouteConstSharedPtr route_{};
Expand All @@ -309,6 +313,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks,
Buffer::OwnedImpl upstream_request_buffer_;

bool passthrough_supported_ : 1;
uint64_t request_size_{};
uint64_t response_size_{};
};

} // namespace Router
Expand Down
75 changes: 65 additions & 10 deletions test/extensions/filters/network/thrift_proxy/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,14 @@ TEST_F(ThriftRouterTest, UnexpectedRouterDestroy) {
}

TEST_F(ThriftRouterTest, ProtocolUpgrade) {
Stats::MockStore cluster_scope;
ON_CALL(*context_.cluster_manager_.thread_local_cluster_.cluster_.info_, statsScope())
.WillByDefault(ReturnRef(cluster_scope));

EXPECT_CALL(cluster_scope, counter("thrift.upstream_rq_call"));
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply"));
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success"));

initializeRouter();
startRequest(MessageType::Call);

Expand All @@ -760,6 +768,21 @@ TEST_F(ThriftRouterTest, ProtocolUpgrade) {

EXPECT_CALL(*protocol_, supportsUpgrade()).WillOnce(Return(true));

EXPECT_CALL(cluster_scope,
histogram("thrift.upstream_rq_time", Stats::Histogram::Unit::Milliseconds));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_rq_time"), _));

EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _));
EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _));

MockThriftObject* upgrade_response = new NiceMock<MockThriftObject>();

EXPECT_CALL(*protocol_, attemptUpgrade(_, _, _))
Expand Down Expand Up @@ -796,16 +819,6 @@ TEST_F(ThriftRouterTest, ProtocolUpgrade) {
completeRequest();
returnResponse();
destroyRouter();

EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope()
.counterFromString("thrift.upstream_rq_call")
.value());
EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope()
.counterFromString("thrift.upstream_resp_reply")
.value());
EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope()
.counterFromString("thrift.upstream_resp_success")
.value());
}

// Test the case where an upgrade will occur, but the conn pool
Expand Down Expand Up @@ -1022,6 +1035,15 @@ TEST_P(ThriftRouterFieldTypeTest, CallWithUpstreamRqTime) {
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply"));
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success"));

EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _));
EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _));

startRequest(MessageType::Call);
connectUpstream();
sendTrivialStruct(field_type);
Expand Down Expand Up @@ -1297,6 +1319,39 @@ TEST_P(ThriftRouterPassthroughTest, PassthroughEnable) {
ConnectionPool::PoolFailureReason::RemoteConnectionFailure);
}

TEST_F(ThriftRouterTest, RequestResponseSize) {
initializeRouter();

Stats::MockStore cluster_scope;
ON_CALL(*context_.cluster_manager_.thread_local_cluster_.cluster_.info_, statsScope())
.WillByDefault(ReturnRef(cluster_scope));

EXPECT_CALL(cluster_scope, counter("thrift.upstream_rq_call")).Times(AtLeast(1));
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply")).Times(AtLeast(1));
EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success")).Times(AtLeast(1));

EXPECT_CALL(cluster_scope,
histogram("thrift.upstream_rq_time", Stats::Histogram::Unit::Milliseconds));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_rq_time"), _));

EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _));
EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes));
EXPECT_CALL(cluster_scope,
deliverHistogramToSinks(
testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _));

startRequestWithExistingConnection(MessageType::Call);
sendTrivialStruct(FieldType::I32);
completeRequest();
returnResponse();
destroyRouter();
}

} // namespace Router
} // namespace ThriftProxy
} // namespace NetworkFilters
Expand Down