diff --git a/docs/root/configuration/other_protocols/thrift_filters/router_filter.rst b/docs/root/configuration/other_protocols/thrift_filters/router_filter.rst index a6d5c50ad4d2e..dae0be24d7429 100644 --- a/docs/root/configuration/other_protocols/thrift_filters/router_filter.rst +++ b/docs/root/configuration/other_protocols/thrift_filters/router_filter.rst @@ -41,3 +41,10 @@ Since these stats utilize the underlying cluster scope, we prefix with the ``thr thrift.upstream_resp_exception, Counter, Total responses with the "Exception" message type. thrift.upstream_resp_invalid_type, Counter, Total responses with an unsupported message type. thrift.upstream_rq_time, Histogram, total rq time from rq complete to resp complete; includes oneway messages. + thrift.upstream_rq_size, Histogram, Request message size in bytes per upstream + thrift.upstream_resp_size, Histogram, Response message size in bytes per upstream + +.. note:: + + The request and response size histograms include what's sent and received during protocol upgrade. + However, invalid responses are not included in the response size histogram. diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 332df94449503..12de83855bcb3 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -74,6 +74,7 @@ New Features * listener: added ability to change an existing listener's address. * metric service: added support for sending metric tags as labels. This can be enabled by setting the :ref:`emit_tags_as_labels ` field to true. * tcp: added support for :ref:`preconnecting `. Preconnecting is off by default, but recommended for clusters serving latency-sensitive traffic. +* thrift_proxy: added per upstream metrics within the :ref:`thrift router ` for request and response size histograms. * udp_proxy: added :ref:`key ` as another hash policy to support hash based routing on any given key. Deprecated diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 498a86bbe61c0..5712ea8dd171e 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -316,6 +316,10 @@ FilterStatus Router::messageEnd() { upstream_request_->transport_->encodeFrame(transport_buffer, *upstream_request_->metadata_, upstream_request_buffer_); + + request_size_ += transport_buffer.length(); + recordClusterScopeHistogram({upstream_rq_size_}, Stats::Histogram::Unit::Bytes, request_size_); + upstream_request_->conn_data_->connection().write(transport_buffer, false); upstream_request_->onRequestComplete(); return FilterStatus::Continue; @@ -324,6 +328,8 @@ FilterStatus Router::messageEnd() { void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) { ASSERT(!upstream_request_->response_complete_); + response_size_ += data.length(); + if (upstream_request_->upgrade_response_ != nullptr) { ENVOY_STREAM_LOG(trace, "reading upgrade response: {} bytes", *callbacks_, data.length()); // Handle upgrade response. @@ -351,6 +357,9 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) { ThriftFilters::ResponseStatus status = callbacks_->upstreamData(data); if (status == ThriftFilters::ResponseStatus::Complete) { ENVOY_STREAM_LOG(debug, "response complete", *callbacks_); + recordClusterScopeHistogram({upstream_resp_size_}, Stats::Histogram::Unit::Bytes, + response_size_); + switch (callbacks_->responseMetadata()->messageType()) { case MessageType::Reply: incClusterScopeCounter({upstream_resp_reply_}); @@ -373,6 +382,7 @@ void Router::onUpstreamData(Buffer::Instance& data, bool end_stream) { cleanup(); return; } else if (status == ThriftFilters::ResponseStatus::Reset) { + // Note: invalid responses are not accounted in the response size histogram. ENVOY_STREAM_LOG(debug, "upstream reset", *callbacks_); upstream_request_->resetStream(); return; @@ -503,6 +513,7 @@ void Router::UpstreamRequest::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr upgrade_response_ = protocol_->attemptUpgrade(*transport_, *conn_state_, parent_.upstream_request_buffer_); if (upgrade_response_ != nullptr) { + parent_.request_size_ += parent_.upstream_request_buffer_.length(); conn_data_->connection().write(parent_.upstream_request_buffer_, false); return; } diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.h b/source/extensions/filters/network/thrift_proxy/router/router_impl.h index e125c69fce0b8..c12a0a34df8d5 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.h @@ -190,6 +190,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, upstream_resp_exception_(stat_name_set_->add("thrift.upstream_resp_exception")), upstream_resp_invalid_type_(stat_name_set_->add("thrift.upstream_resp_invalid_type")), upstream_rq_time_(stat_name_set_->add("thrift.upstream_rq_time")), + upstream_rq_size_(stat_name_set_->add("thrift.upstream_rq_size")), + upstream_resp_size_(stat_name_set_->add("thrift.upstream_resp_size")), passthrough_supported_(false) {} ~Router() override = default; @@ -299,6 +301,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, const Stats::StatName upstream_resp_exception_; const Stats::StatName upstream_resp_invalid_type_; const Stats::StatName upstream_rq_time_; + const Stats::StatName upstream_rq_size_; + const Stats::StatName upstream_resp_size_; ThriftFilters::DecoderFilterCallbacks* callbacks_{}; RouteConstSharedPtr route_{}; @@ -309,6 +313,8 @@ class Router : public Tcp::ConnectionPool::UpstreamCallbacks, Buffer::OwnedImpl upstream_request_buffer_; bool passthrough_supported_ : 1; + uint64_t request_size_{}; + uint64_t response_size_{}; }; } // namespace Router diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 03448be78b93a..d17758c682bd5 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -740,6 +740,14 @@ TEST_F(ThriftRouterTest, UnexpectedRouterDestroy) { } TEST_F(ThriftRouterTest, ProtocolUpgrade) { + Stats::MockStore cluster_scope; + ON_CALL(*context_.cluster_manager_.thread_local_cluster_.cluster_.info_, statsScope()) + .WillByDefault(ReturnRef(cluster_scope)); + + EXPECT_CALL(cluster_scope, counter("thrift.upstream_rq_call")); + EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply")); + EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success")); + initializeRouter(); startRequest(MessageType::Call); @@ -760,6 +768,21 @@ TEST_F(ThriftRouterTest, ProtocolUpgrade) { EXPECT_CALL(*protocol_, supportsUpgrade()).WillOnce(Return(true)); + EXPECT_CALL(cluster_scope, + histogram("thrift.upstream_rq_time", Stats::Histogram::Unit::Milliseconds)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_rq_time"), _)); + + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _)); + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _)); + MockThriftObject* upgrade_response = new NiceMock(); EXPECT_CALL(*protocol_, attemptUpgrade(_, _, _)) @@ -796,16 +819,6 @@ TEST_F(ThriftRouterTest, ProtocolUpgrade) { completeRequest(); returnResponse(); destroyRouter(); - - EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope() - .counterFromString("thrift.upstream_rq_call") - .value()); - EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope() - .counterFromString("thrift.upstream_resp_reply") - .value()); - EXPECT_EQ(1UL, context_.cluster_manager_.thread_local_cluster_.cluster_.info_->statsScope() - .counterFromString("thrift.upstream_resp_success") - .value()); } // Test the case where an upgrade will occur, but the conn pool @@ -1022,6 +1035,15 @@ TEST_P(ThriftRouterFieldTypeTest, CallWithUpstreamRqTime) { EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply")); EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success")); + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _)); + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _)); + startRequest(MessageType::Call); connectUpstream(); sendTrivialStruct(field_type); @@ -1297,6 +1319,39 @@ TEST_P(ThriftRouterPassthroughTest, PassthroughEnable) { ConnectionPool::PoolFailureReason::RemoteConnectionFailure); } +TEST_F(ThriftRouterTest, RequestResponseSize) { + initializeRouter(); + + Stats::MockStore cluster_scope; + ON_CALL(*context_.cluster_manager_.thread_local_cluster_.cluster_.info_, statsScope()) + .WillByDefault(ReturnRef(cluster_scope)); + + EXPECT_CALL(cluster_scope, counter("thrift.upstream_rq_call")).Times(AtLeast(1)); + EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_reply")).Times(AtLeast(1)); + EXPECT_CALL(cluster_scope, counter("thrift.upstream_resp_success")).Times(AtLeast(1)); + + EXPECT_CALL(cluster_scope, + histogram("thrift.upstream_rq_time", Stats::Histogram::Unit::Milliseconds)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_rq_time"), _)); + + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_rq_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_rq_size"), _)); + EXPECT_CALL(cluster_scope, histogram("thrift.upstream_resp_size", Stats::Histogram::Unit::Bytes)); + EXPECT_CALL(cluster_scope, + deliverHistogramToSinks( + testing::Property(&Stats::Metric::name, "thrift.upstream_resp_size"), _)); + + startRequestWithExistingConnection(MessageType::Call); + sendTrivialStruct(FieldType::I32); + completeRequest(); + returnResponse(); + destroyRouter(); +} + } // namespace Router } // namespace ThriftProxy } // namespace NetworkFilters