From 722bdfdfded8a38c595b1b632ae89b1f9d2ed206 Mon Sep 17 00:00:00 2001 From: James Fish Date: Fri, 1 Oct 2021 12:03:19 -0700 Subject: [PATCH 1/4] Fix success/error thrift metrics on passthrough Signed-off-by: James Fish --- docs/root/version_history/current.rst | 1 + .../network/thrift_proxy/auto_protocol_impl.h | 4 ++ .../thrift_proxy/binary_protocol_impl.cc | 32 ++++++++++ .../thrift_proxy/binary_protocol_impl.h | 2 + .../thrift_proxy/compact_protocol_impl.cc | 45 ++++++++++++++ .../thrift_proxy/compact_protocol_impl.h | 2 + .../network/thrift_proxy/conn_manager.cc | 44 +------------- .../network/thrift_proxy/conn_manager.h | 6 +- .../filters/network/thrift_proxy/decoder.cc | 26 ++++++++- .../filters/network/thrift_proxy/decoder.h | 7 +++ .../filters/network/thrift_proxy/metadata.h | 9 +++ .../filters/network/thrift_proxy/protocol.h | 9 +++ .../thrift_proxy/router/shadow_writer_impl.h | 18 +----- .../filters/network/thrift_proxy/thrift.h | 8 +++ .../network/thrift_proxy/conn_manager_test.cc | 3 +- .../network/thrift_proxy/integration_test.cc | 58 ++++++++++++++++--- .../filters/network/thrift_proxy/mocks.h | 2 + .../thrift_proxy/shadow_writer_test.cc | 16 ++--- .../translation_integration_test.cc | 12 ++-- 19 files changed, 216 insertions(+), 88 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 5179376bfc907..a7f95505dbe29 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -98,6 +98,7 @@ Bug Fixes * http: limit use of deferred resets in the http2 codec to server-side connections. Use of deferred reset for client connections can result in incorrect behavior and performance problems. * listener: fixed an issue on Windows where connections are not handled by all worker threads. * lua: fix ``BodyBuffer`` setting a Lua string and printing Lua string containing hex characters. Previously, ``BodyBuffer`` setting a Lua string or printing strings with hex characters will be truncated. +* thrift_proxy: fix the thrift_proxy connection manage to correctly report success/error response metrics when performing :ref:`payload passthrough `. * xray: fix the AWS X-Ray tracer bug where span's error, fault and throttle information was not reported properly as per the `AWS X-Ray documentation `_. Before this fix, server error was reported under the 'annotations' section of the segment data. Removed Config or Runtime diff --git a/source/extensions/filters/network/thrift_proxy/auto_protocol_impl.h b/source/extensions/filters/network/thrift_proxy/auto_protocol_impl.h index c15d50e3de9d2..7f931199d31ba 100644 --- a/source/extensions/filters/network/thrift_proxy/auto_protocol_impl.h +++ b/source/extensions/filters/network/thrift_proxy/auto_protocol_impl.h @@ -6,6 +6,7 @@ #include "source/common/common/fmt.h" #include "source/extensions/filters/network/thrift_proxy/protocol.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" namespace Envoy { namespace Extensions { @@ -33,6 +34,9 @@ class AutoProtocolImpl : public Protocol { bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override; bool readMessageEnd(Buffer::Instance& buffer) override; + bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override { + return protocol_->peekReplyPayload(buffer, reply_type); + } bool readStructBegin(Buffer::Instance& buffer, std::string& name) override { return protocol_->readStructBegin(buffer, name); } diff --git a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc index 805ec973e0213..00526f67a1103 100644 --- a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc @@ -60,6 +60,38 @@ bool BinaryProtocolImpl::readMessageEnd(Buffer::Instance& buffer) { return true; } +bool BinaryProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) { + // binary protocol does not transmit struct names so go straight to peek at field begin + // FieldType::Stop is encoded as 1 byte. + if (buffer.length() < 1) { + return false; + } + + FieldType type = static_cast(buffer.peekInt()); + if (type == FieldType::Stop) { + // If the first field is stop then response is void success + reply_type = ReplyType::Success; + return true; + } + + if (buffer.length() < 3) { + return false; + } + + int16_t id = buffer.peekBEInt(1); + if (id < 0) { + throw EnvoyException(absl::StrCat("invalid binary protocol field id ", id)); + } else if (id == 0) { + // successful response is inside field id 0 + reply_type = ReplyType::Success; + } else { + // error (IDL exception) is in field id greater than 0 + reply_type = ReplyType::Error; + } + + return true; +} + bool BinaryProtocolImpl::readStructBegin(Buffer::Instance& buffer, std::string& name) { UNREFERENCED_PARAMETER(buffer); name.clear(); // binary protocol does not transmit struct names diff --git a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.h b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.h index a3497aa70a6dd..1550e8d226107 100644 --- a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.h +++ b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.h @@ -6,6 +6,7 @@ #include "envoy/common/pure.h" #include "source/extensions/filters/network/thrift_proxy/protocol.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" namespace Envoy { namespace Extensions { @@ -25,6 +26,7 @@ class BinaryProtocolImpl : public Protocol { ProtocolType type() const override { return ProtocolType::Binary; } bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override; bool readMessageEnd(Buffer::Instance& buffer) override; + bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override; bool readStructBegin(Buffer::Instance& buffer, std::string& name) override; bool readStructEnd(Buffer::Instance& buffer) override; bool readFieldBegin(Buffer::Instance& buffer, std::string& name, FieldType& field_type, diff --git a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc index 4bab4da2e625f..15dad23d83696 100644 --- a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc @@ -79,6 +79,51 @@ bool CompactProtocolImpl::readMessageEnd(Buffer::Instance& buffer) { return true; } +bool CompactProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) { + // compact protocol does not transmit struct names so go straight to peek for field begin + // Minimum size: FieldType::Stop is encoded as 1 byte. + if (buffer.length() < 1) { + return false; + } + + uint8_t delta_and_type = buffer.peekInt(); + if ((delta_and_type & 0x0f) == 0) { + // Type is stop, no need to do further decoding + // If the first field is stop then response is void success + reply_type = ReplyType::Success; + return true; + } + + if ((delta_and_type >> 4) != 0) { + // field id delta is non zero and so is an IDL exception (success field id is 0) + reply_type = ReplyType::Error; + return true; + } + + int id_size = 0; + // Field ID delta is zero: this is a long-form field header, followed by zig-zag field id. + if (buffer.length() < 2) { + return false; + } + + int32_t id = BufferHelper::peekZigZagI32(buffer, 1, id_size); + if (id_size < 0) { + return false; + } + + if (id < 0 || id > std::numeric_limits::max()) { + throw EnvoyException(absl::StrCat("invalid compact protocol field id ", id)); + } else if (id == 0) { + // successful response is inside field id 0 + reply_type = ReplyType::Success; + } else { + // error (IDL exception) is in field id greater than 0 + reply_type = ReplyType::Error; + } + + return true; +} + bool CompactProtocolImpl::readStructBegin(Buffer::Instance& buffer, std::string& name) { UNREFERENCED_PARAMETER(buffer); name.clear(); // compact protocol does not transmit struct names diff --git a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.h b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.h index fb89e613ece3e..a2c978bcf2f86 100644 --- a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.h +++ b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.h @@ -7,6 +7,7 @@ #include "envoy/common/pure.h" #include "source/extensions/filters/network/thrift_proxy/protocol.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" #include "absl/types/optional.h" @@ -28,6 +29,7 @@ class CompactProtocolImpl : public Protocol { ProtocolType type() const override { return ProtocolType::Compact; } bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override; bool readMessageEnd(Buffer::Instance& buffer) override; + bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override; bool readStructBegin(Buffer::Instance& buffer, std::string& name) override; bool readStructEnd(Buffer::Instance& buffer) override; bool readFieldBegin(Buffer::Instance& buffer, std::string& name, FieldType& field_type, diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index e7becf40c1d15..27e77ef726b9d 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -212,11 +212,6 @@ bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) { FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) { passthrough_ = true; - // If passing through data, ensure that first reply field is false as if handling a - // fieldBegin. Otherwise all requests will be marked as a success, as if void response, - // in messageEnd. Therefore later will only increment reply and not the inferred subtype - // success/error which requires reading the field id of the first field, see fieldBegin. - first_reply_field_ = false; return ProtocolConverter::passthroughData(data); } @@ -224,40 +219,10 @@ FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSha metadata_ = metadata; metadata_->setSequenceId(parent_.original_sequence_id_); - first_reply_field_ = - (metadata->hasMessageType() && metadata->messageType() == MessageType::Reply); - return ProtocolConverter::messageBegin(metadata); -} - -FilterStatus ConnectionManager::ResponseDecoder::fieldBegin(absl::string_view name, - FieldType& field_type, - int16_t& field_id) { - if (first_reply_field_) { - // Reply messages contain a struct where field 0 is the call result and fields 1+ are - // exceptions, if defined. At most one field may be set. Therefore, the very first field we - // encounter in a reply is either field 0 (success) or not (IDL exception returned). - // If first fieldType is FieldType::Stop then it is a void success and handled in messageEnd() - // because decoder state machine does not call decoder event callback fieldBegin on - // FieldType::Stop. - success_ = (field_id == 0); - first_reply_field_ = false; + if (metadata->hasReplyType()) { + success_ = (metadata->replyType() == ReplyType::Success); } - - return ProtocolConverter::fieldBegin(name, field_type, field_id); -} - -FilterStatus ConnectionManager::ResponseDecoder::messageEnd() { - if (first_reply_field_) { - // When the response is thrift void type there is never a fieldBegin call on a success - // because the response struct has no fields and so the first field type is FieldType::Stop. - // The decoder state machine handles FieldType::Stop by going immediately to structEnd, - // skipping fieldBegin callback. Therefore if we are still waiting for the first reply field - // at end of message then it is a void success. - success_ = true; - first_reply_field_ = false; - } - - return ProtocolConverter::messageEnd(); + return ProtocolConverter::messageBegin(metadata); } FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { @@ -292,9 +257,6 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { switch (metadata_->messageType()) { case MessageType::Reply: cm.stats_.response_reply_.inc(); - // success_ is set by inspecting the payload, which wont - // occur when passthrough is enabled as parsing the payload - // is skipped entirely. if (success_) { if (success_.value()) { cm.stats_.response_success_.inc(); diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.h b/source/extensions/filters/network/thrift_proxy/conn_manager.h index 851e1d10ad6f3..789d54aaffe39 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.h +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.h @@ -74,7 +74,7 @@ class ConnectionManager : public Network::ReadFilter, struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter { ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol) : parent_(parent), decoder_(std::make_unique(transport, protocol, *this)), - complete_(false), first_reply_field_(false), passthrough_{false} { + complete_(false), passthrough_{false} { initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_); } @@ -83,9 +83,6 @@ class ConnectionManager : public Network::ReadFilter, // ProtocolConverter FilterStatus passthroughData(Buffer::Instance& data) override; FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override; - FilterStatus messageEnd() override; - FilterStatus fieldBegin(absl::string_view name, FieldType& field_type, - int16_t& field_id) override; FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override { UNREFERENCED_PARAMETER(metadata); return FilterStatus::Continue; @@ -102,7 +99,6 @@ class ConnectionManager : public Network::ReadFilter, MessageMetadataSharedPtr metadata_; absl::optional success_; bool complete_ : 1; - bool first_reply_field_ : 1; bool passthrough_ : 1; }; using ResponseDecoderPtr = std::unique_ptr; diff --git a/source/extensions/filters/network/thrift_proxy/decoder.cc b/source/extensions/filters/network/thrift_proxy/decoder.cc index 111703e63da55..66c762a65f006 100644 --- a/source/extensions/filters/network/thrift_proxy/decoder.cc +++ b/source/extensions/filters/network/thrift_proxy/decoder.cc @@ -6,6 +6,7 @@ #include "source/common/common/assert.h" #include "source/common/common/macros.h" #include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" namespace Envoy { namespace Extensions { @@ -26,19 +27,29 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::passthroughData(Buffer:: } // MessageBegin -> StructBegin +// MessageBegin -> ReplyPayload (reply received, get reply type) DecoderStateMachine::DecoderStatus DecoderStateMachine::messageBegin(Buffer::Instance& buffer) { const auto total = buffer.length(); if (!proto_.readMessageBegin(buffer, *metadata_)) { return {ProtocolState::WaitForData}; } + body_start_ = total - buffer.length(); stack_.clear(); stack_.emplace_back(Frame(ProtocolState::MessageEnd)); + // If a reply peek at the payload to see if success or error (IDL exception) + if (metadata_->hasMessageType() && metadata_->messageType() == MessageType::Reply) { + return {ProtocolState::ReplyPayload, FilterStatus::Continue}; + } + + return handleMessageBegin(); +} +DecoderStateMachine::DecoderStatus DecoderStateMachine::handleMessageBegin() { const auto status = handler_.messageBegin(metadata_); if (callbacks_.passthroughEnabled()) { - body_bytes_ = metadata_->frameSize() - (total - buffer.length()); + body_bytes_ = metadata_->frameSize() - body_start_; return {ProtocolState::PassthroughData, status}; } @@ -54,6 +65,17 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::messageEnd(Buffer::Insta return {ProtocolState::Done, handler_.messageEnd()}; } +// ReplyPayload -> StructBegin +DecoderStateMachine::DecoderStatus DecoderStateMachine::replyPayload(Buffer::Instance& buffer) { + ReplyType reply_type; + if (!proto_.peekReplyPayload(buffer, reply_type)) { + return {ProtocolState::WaitForData}; + } + + metadata_->setReplyType(reply_type); + return handleMessageBegin(); +} + // StructBegin -> FieldBegin DecoderStateMachine::DecoderStatus DecoderStateMachine::structBegin(Buffer::Instance& buffer) { std::string name; @@ -318,6 +340,8 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState(Buffer::Inst return passthroughData(buffer); case ProtocolState::MessageBegin: return messageBegin(buffer); + case ProtocolState::ReplyPayload: + return replyPayload(buffer); case ProtocolState::StructBegin: return structBegin(buffer); case ProtocolState::StructEnd: diff --git a/source/extensions/filters/network/thrift_proxy/decoder.h b/source/extensions/filters/network/thrift_proxy/decoder.h index 99739221c5104..f00bea67baba5 100644 --- a/source/extensions/filters/network/thrift_proxy/decoder.h +++ b/source/extensions/filters/network/thrift_proxy/decoder.h @@ -19,6 +19,7 @@ namespace ThriftProxy { FUNCTION(PassthroughData) \ FUNCTION(MessageBegin) \ FUNCTION(MessageEnd) \ + FUNCTION(ReplyPayload) \ FUNCTION(StructBegin) \ FUNCTION(StructEnd) \ FUNCTION(FieldBegin) \ @@ -134,6 +135,7 @@ class DecoderStateMachine : public Logger::Loggable { DecoderStatus passthroughData(Buffer::Instance& buffer); DecoderStatus messageBegin(Buffer::Instance& buffer); DecoderStatus messageEnd(Buffer::Instance& buffer); + DecoderStatus replyPayload(Buffer::Instance& buffer); DecoderStatus structBegin(Buffer::Instance& buffer); DecoderStatus structEnd(Buffer::Instance& buffer); DecoderStatus fieldBegin(Buffer::Instance& buffer); @@ -150,6 +152,10 @@ class DecoderStateMachine : public Logger::Loggable { DecoderStatus setValue(Buffer::Instance& buffer); DecoderStatus setEnd(Buffer::Instance& buffer); + // handleMessageBegin calls the handler for messageBegin and then determines whether to + // perform payload passthrough or not + DecoderStatus handleMessageBegin(); + // handleValue represents the generic Value state from the state machine documentation. It // returns either ProtocolState::WaitForData if more data is required or the next state. For // structs, lists, maps, or sets the return_state is pushed onto the stack and the next state is @@ -171,6 +177,7 @@ class DecoderStateMachine : public Logger::Loggable { DecoderCallbacks& callbacks_; ProtocolState state_; std::vector stack_; + uint32_t body_start_{}; uint32_t body_bytes_{}; }; diff --git a/source/extensions/filters/network/thrift_proxy/metadata.h b/source/extensions/filters/network/thrift_proxy/metadata.h index 08b91a1c4f040..de44db1948b2d 100644 --- a/source/extensions/filters/network/thrift_proxy/metadata.h +++ b/source/extensions/filters/network/thrift_proxy/metadata.h @@ -53,6 +53,10 @@ class MessageMetadata { copy->setMessageType(messageType()); } + if (hasReplyType()) { + copy->setReplyType(replyType()); + } + Http::HeaderMapImpl::copyFrom(copy->headers(), headers()); copy->mutableSpans().assign(spans().begin(), spans().end()); @@ -115,6 +119,10 @@ class MessageMetadata { MessageType messageType() const { return msg_type_.value(); } void setMessageType(MessageType msg_type) { msg_type_ = msg_type; } + bool hasReplyType() const { return reply_type_.has_value(); } + ReplyType replyType() const { return reply_type_.value(); } + void setReplyType(ReplyType reply_type) { reply_type_ = reply_type; } + /** * @return HeaderMap of current headers (never throws) */ @@ -168,6 +176,7 @@ class MessageMetadata { absl::optional method_name_{}; absl::optional seq_id_{}; absl::optional msg_type_{}; + absl::optional reply_type_{}; Http::HeaderMapPtr headers_{Http::RequestHeaderMapImpl::create()}; absl::optional app_ex_type_; absl::optional app_ex_msg_; diff --git a/source/extensions/filters/network/thrift_proxy/protocol.h b/source/extensions/filters/network/thrift_proxy/protocol.h index a9eae7779e1ec..d57941dea8b17 100644 --- a/source/extensions/filters/network/thrift_proxy/protocol.h +++ b/source/extensions/filters/network/thrift_proxy/protocol.h @@ -73,6 +73,15 @@ class Protocol { */ virtual bool readMessageEnd(Buffer::Instance& buffer) PURE; + /** + * Peeks the start of a Thrift protocol reply payload in the buffer and updates the reply + * type parameter with the reply type of the payload. + * @param buffer the buffer to peek from + * @return true if reply type was successfully read, false if more data is required + * @throw EnvoyException if the data is not a valid payload + */ + virtual bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) PURE; + /** * Reads the start of a Thrift struct from the buffer and updates the name parameter with the * value from the struct header. If successful, the struct header is removed from the buffer. diff --git a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h index dc43592b58cf9..b9651d1258d03 100644 --- a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h @@ -59,21 +59,8 @@ struct NullResponseDecoder : public DecoderCallbacks, public ProtocolConverter { // ProtocolConverter FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override { metadata_ = metadata; - first_reply_field_ = - (metadata->hasMessageType() && metadata->messageType() == MessageType::Reply); - return FilterStatus::Continue; - } - FilterStatus messageEnd() override { - if (first_reply_field_) { - success_ = true; - first_reply_field_ = false; - } - return FilterStatus::Continue; - } - FilterStatus fieldBegin(absl::string_view, FieldType&, int16_t& field_id) override { - if (first_reply_field_) { - success_ = (field_id == 0); - first_reply_field_ = false; + if (metadata_->hasReplyType()) { + success_ = (metadata_->replyType() == ReplyType::Success); } return FilterStatus::Continue; } @@ -97,7 +84,6 @@ struct NullResponseDecoder : public DecoderCallbacks, public ProtocolConverter { MessageMetadataSharedPtr metadata_; absl::optional success_; bool complete_ : 1; - bool first_reply_field_ : 1; }; using NullResponseDecoderPtr = std::unique_ptr; diff --git a/source/extensions/filters/network/thrift_proxy/thrift.h b/source/extensions/filters/network/thrift_proxy/thrift.h index 3092bceb11b7e..bfbb312553513 100644 --- a/source/extensions/filters/network/thrift_proxy/thrift.h +++ b/source/extensions/filters/network/thrift_proxy/thrift.h @@ -118,6 +118,14 @@ enum class MessageType { LastMessageType = Oneway, }; +/** + * A Reply message is either a success or an error (IDL exception) + */ +enum class ReplyType { + Success, + Error, +}; + /** * Thrift protocol struct field types. * See https://github.com/apache/thrift/blob/master/lib/cpp/src/thrift/protocol/TProtocol.h diff --git a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc index cb4b097ce294f..ff9dd784f622d 100644 --- a/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc +++ b/test/extensions/filters/network/thrift_proxy/conn_manager_test.cc @@ -1750,10 +1750,9 @@ payload_passthrough: true EXPECT_EQ(1U, store_.counter("test.response_reply").value()); EXPECT_EQ(0U, store_.counter("test.response_exception").value()); EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value()); - // In payload_passthrough mode, Envoy cannot detect response error. EXPECT_EQ(1U, store_.counter("test.response_passthrough").value()); EXPECT_EQ(0U, store_.counter("test.response_success").value()); - EXPECT_EQ(0U, store_.counter("test.response_error").value()); + EXPECT_EQ(1U, store_.counter("test.response_error").value()); } TEST_F(ThriftConnectionManagerTest, PayloadPassthroughRequestAndInvalidResponse) { diff --git a/test/extensions/filters/network/thrift_proxy/integration_test.cc b/test/extensions/filters/network/thrift_proxy/integration_test.cc index 1dab12e86368a..179861e244968 100644 --- a/test/extensions/filters/network/thrift_proxy/integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/integration_test.cc @@ -1,5 +1,6 @@ #include "envoy/config/bootstrap/v3/bootstrap.pb.h" +#include "source/common/common/fmt.h" #include "source/extensions/filters/network/thrift_proxy/buffer_helper.h" #include "test/extensions/filters/network/thrift_proxy/integration.h" @@ -148,6 +149,11 @@ class ThriftConnManagerIntegrationTest // while oneway's are handled by the "poke" method. All other requests // are handled by "execute". FakeUpstream* getExpectedUpstream(bool oneway) { + int upstreamIdx = getExpectedUpstreamIdx(oneway); + return fake_upstreams_[upstreamIdx].get(); + } + + int getExpectedUpstreamIdx(bool oneway) { int upstreamIdx = 2; if (multiplexed_) { upstreamIdx = 0; @@ -157,7 +163,7 @@ class ThriftConnManagerIntegrationTest upstreamIdx = 1; } - return fake_upstreams_[upstreamIdx].get(); + return upstreamIdx; } TransportType transport_; @@ -225,21 +231,29 @@ TEST_P(ThriftConnManagerIntegrationTest, Success) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); + int upstream_idx = getExpectedUpstreamIdx(false); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_rq_call", upstream_idx)); + EXPECT_EQ(1U, counter->value()); if (payload_passthrough_ && (transport_ == TransportType::Framed || transport_ == TransportType::Header) && protocol_ != ProtocolType::Twitter) { counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); - EXPECT_EQ(0U, counter->value()); } else { counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(0U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); - EXPECT_EQ(1U, counter->value()); } counter = test_server_->counter("thrift.thrift_stats.response_reply"); EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_reply", upstream_idx)); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_success", upstream_idx)); + EXPECT_EQ(1U, counter->value()); } TEST_P(ThriftConnManagerIntegrationTest, IDLException) { @@ -265,21 +279,28 @@ TEST_P(ThriftConnManagerIntegrationTest, IDLException) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); + int upstream_idx = getExpectedUpstreamIdx(false); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_rq_call", upstream_idx)); if (payload_passthrough_ && (transport_ == TransportType::Framed || transport_ == TransportType::Header) && protocol_ != ProtocolType::Twitter) { counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_error"); - EXPECT_EQ(0U, counter->value()); } else { counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(0U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_error"); - EXPECT_EQ(1U, counter->value()); } counter = test_server_->counter("thrift.thrift_stats.response_reply"); EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_error"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_reply", upstream_idx)); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_error", upstream_idx)); + EXPECT_EQ(1U, counter->value()); } TEST_P(ThriftConnManagerIntegrationTest, Exception) { @@ -305,8 +326,15 @@ TEST_P(ThriftConnManagerIntegrationTest, Exception) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); + int upstream_idx = getExpectedUpstreamIdx(false); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_rq_call", upstream_idx)); + EXPECT_EQ(1U, counter->value()); counter = test_server_->counter("thrift.thrift_stats.response_exception"); EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_exception", upstream_idx)); + EXPECT_EQ(1U, counter->value()); } TEST_P(ThriftConnManagerIntegrationTest, EarlyClose) { @@ -382,6 +410,10 @@ TEST_P(ThriftConnManagerIntegrationTest, EarlyUpstreamClose) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); + int upstream_idx = getExpectedUpstreamIdx(false); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_rq_call", upstream_idx)); + EXPECT_EQ(1U, counter->value()); counter = test_server_->counter("thrift.thrift_stats.response_exception"); EXPECT_EQ(1U, counter->value()); } @@ -513,10 +545,18 @@ TEST_P(ThriftTwitterConnManagerIntegrationTest, Success) { EXPECT_TRUE(TestUtility::buffersEqual( Buffer::OwnedImpl(tcp_client->data().substr(upgrade_response_size)), response_bytes_)); + // 2 requests on downstream but the first is an upgrade, so only one on upstream side Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(2U, counter->value()); + int upstream_idx = getExpectedUpstreamIdx(false); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_rq_call", upstream_idx)); + EXPECT_EQ(1U, counter->value()); counter = test_server_->counter("thrift.thrift_stats.response_success"); EXPECT_EQ(2U, counter->value()); + counter = test_server_->counter( + fmt::format("cluster.cluster_{}.thrift.upstream_resp_success", upstream_idx)); + EXPECT_EQ(1U, counter->value()); #endif } diff --git a/test/extensions/filters/network/thrift_proxy/mocks.h b/test/extensions/filters/network/thrift_proxy/mocks.h index b3eddda2cb352..a9e41e54a235d 100644 --- a/test/extensions/filters/network/thrift_proxy/mocks.h +++ b/test/extensions/filters/network/thrift_proxy/mocks.h @@ -10,6 +10,7 @@ #include "source/extensions/filters/network/thrift_proxy/protocol.h" #include "source/extensions/filters/network/thrift_proxy/router/router.h" #include "source/extensions/filters/network/thrift_proxy/router/router_ratelimit.h" +#include "source/extensions/filters/network/thrift_proxy/thrift.h" #include "source/extensions/filters/network/thrift_proxy/transport.h" #include "test/mocks/network/mocks.h" @@ -65,6 +66,7 @@ class MockProtocol : public Protocol { MOCK_METHOD(void, setType, (ProtocolType)); MOCK_METHOD(bool, readMessageBegin, (Buffer::Instance & buffer, MessageMetadata& metadata)); MOCK_METHOD(bool, readMessageEnd, (Buffer::Instance & buffer)); + MOCK_METHOD(bool, peekReplyPayload, (Buffer::Instance & buffer, ReplyType& reply_type)); MOCK_METHOD(bool, readStructBegin, (Buffer::Instance & buffer, std::string& name)); MOCK_METHOD(bool, readStructEnd, (Buffer::Instance & buffer)); MOCK_METHOD(bool, readFieldBegin, diff --git a/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc b/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc index 5532da024e32c..9e8e80f7e93dc 100644 --- a/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc +++ b/test/extensions/filters/network/thrift_proxy/shadow_writer_test.cc @@ -147,6 +147,10 @@ class ShadowWriterTest : public testing::Test { MessageMetadataSharedPtr response_metadata = std::make_shared(); response_metadata->setMessageType(message_type); response_metadata->setSequenceId(1); + if (message_type == MessageType::Reply) { + const auto reply_type = success ? ReplyType::Success : ReplyType::Error; + response_metadata->setReplyType(reply_type); + } auto transport_ptr = NamedTransportConfigFactory::getFactory(TransportType::Framed).createTransport(); @@ -412,22 +416,14 @@ TEST_F(ShadowWriterTest, TestNullResponseDecoder) { EXPECT_TRUE(decoder_ptr->passthroughEnabled()); metadata_->setMessageType(MessageType::Reply); + metadata_->setReplyType(ReplyType::Success); EXPECT_EQ(FilterStatus::Continue, decoder_ptr->messageBegin(metadata_)); + EXPECT_TRUE(decoder_ptr->responseSuccess()); Buffer::OwnedImpl buffer; decoder_ptr->upstreamData(buffer); - EXPECT_EQ(FilterStatus::Continue, decoder_ptr->messageEnd()); - // First reply field. - { - FieldType field_type; - int16_t field_id = 0; - EXPECT_EQ(FilterStatus::Continue, decoder_ptr->messageBegin(metadata_)); - EXPECT_EQ(FilterStatus::Continue, decoder_ptr->fieldBegin("", field_type, field_id)); - EXPECT_TRUE(decoder_ptr->responseSuccess()); - } - EXPECT_EQ(FilterStatus::Continue, decoder_ptr->transportBegin(nullptr)); EXPECT_EQ(FilterStatus::Continue, decoder_ptr->transportEnd()); } diff --git a/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc b/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc index 483cf74841b70..8319b926e95cf 100644 --- a/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc @@ -154,6 +154,8 @@ TEST_P(ThriftTranslationIntegrationTest, Translates) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("cluster.cluster_0.thrift.upstream_rq_call"); + EXPECT_EQ(1U, counter->value()); if (passthrough_ && (downstream_transport_ == TransportType::Framed || downstream_transport_ == TransportType::Header) && @@ -164,18 +166,20 @@ TEST_P(ThriftTranslationIntegrationTest, Translates) { EXPECT_EQ(1U, counter->value()); counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); - EXPECT_EQ(0U, counter->value()); } else { counter = test_server_->counter("thrift.thrift_stats.request_passthrough"); EXPECT_EQ(0U, counter->value()); counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); EXPECT_EQ(0U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); - EXPECT_EQ(1U, counter->value()); } counter = test_server_->counter("thrift.thrift_stats.response_reply"); EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("cluster.cluster_0.thrift.upstream_resp_reply"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("cluster.cluster_0.thrift.upstream_resp_success"); + EXPECT_EQ(1U, counter->value()); } } // namespace ThriftProxy From 46bd4d8497e70a1c2db4b502b38333979d8b0cbc Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 4 Oct 2021 15:48:57 -0700 Subject: [PATCH 2/4] Fix nits Signed-off-by: James Fish --- docs/root/version_history/current.rst | 2 +- .../filters/network/thrift_proxy/binary_protocol_impl.cc | 9 ++------- .../network/thrift_proxy/compact_protocol_impl.cc | 9 ++------- .../extensions/filters/network/thrift_proxy/protocol.h | 1 + .../network/thrift_proxy/router/shadow_writer_impl.h | 2 +- 5 files changed, 7 insertions(+), 16 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index a7f95505dbe29..416f1f4dc5dfe 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -98,7 +98,7 @@ Bug Fixes * http: limit use of deferred resets in the http2 codec to server-side connections. Use of deferred reset for client connections can result in incorrect behavior and performance problems. * listener: fixed an issue on Windows where connections are not handled by all worker threads. * lua: fix ``BodyBuffer`` setting a Lua string and printing Lua string containing hex characters. Previously, ``BodyBuffer`` setting a Lua string or printing strings with hex characters will be truncated. -* thrift_proxy: fix the thrift_proxy connection manage to correctly report success/error response metrics when performing :ref:`payload passthrough `. +* thrift_proxy: fix the thrift_proxy connection manager to correctly report success/error response metrics when performing :ref:`payload passthrough `. * xray: fix the AWS X-Ray tracer bug where span's error, fault and throttle information was not reported properly as per the `AWS X-Ray documentation `_. Before this fix, server error was reported under the 'annotations' section of the segment data. Removed Config or Runtime diff --git a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc index 00526f67a1103..961f89a8277a0 100644 --- a/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/binary_protocol_impl.cc @@ -81,14 +81,9 @@ bool BinaryProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& r int16_t id = buffer.peekBEInt(1); if (id < 0) { throw EnvoyException(absl::StrCat("invalid binary protocol field id ", id)); - } else if (id == 0) { - // successful response is inside field id 0 - reply_type = ReplyType::Success; - } else { - // error (IDL exception) is in field id greater than 0 - reply_type = ReplyType::Error; } - + // successful response struct in field id 0, error (IDL exception) in field id greater than 0 + reply_type = id == 0 ? ReplyType::Success : ReplyType::Error; return true; } diff --git a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc index 15dad23d83696..3aef9e205aa71 100644 --- a/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/compact_protocol_impl.cc @@ -113,14 +113,9 @@ bool CompactProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& if (id < 0 || id > std::numeric_limits::max()) { throw EnvoyException(absl::StrCat("invalid compact protocol field id ", id)); - } else if (id == 0) { - // successful response is inside field id 0 - reply_type = ReplyType::Success; - } else { - // error (IDL exception) is in field id greater than 0 - reply_type = ReplyType::Error; } - + // successful response struct in field id 0, error (IDL exception) in field id greater than 0 + reply_type = id == 0 ? ReplyType::Success : ReplyType::Error; return true; } diff --git a/source/extensions/filters/network/thrift_proxy/protocol.h b/source/extensions/filters/network/thrift_proxy/protocol.h index d57941dea8b17..13b2586c1b55a 100644 --- a/source/extensions/filters/network/thrift_proxy/protocol.h +++ b/source/extensions/filters/network/thrift_proxy/protocol.h @@ -77,6 +77,7 @@ class Protocol { * Peeks the start of a Thrift protocol reply payload in the buffer and updates the reply * type parameter with the reply type of the payload. * @param buffer the buffer to peek from + * @param reply_type ReplyType to set the payload's reply type to success or error * @return true if reply type was successfully read, false if more data is required * @throw EnvoyException if the data is not a valid payload */ diff --git a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h index b9651d1258d03..841536b3de5c7 100644 --- a/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h +++ b/source/extensions/filters/network/thrift_proxy/router/shadow_writer_impl.h @@ -60,7 +60,7 @@ struct NullResponseDecoder : public DecoderCallbacks, public ProtocolConverter { FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override { metadata_ = metadata; if (metadata_->hasReplyType()) { - success_ = (metadata_->replyType() == ReplyType::Success); + success_ = metadata_->replyType() == ReplyType::Success; } return FilterStatus::Continue; } From 379aff982cbd1c5437c40c45359f3a5c3c631114 Mon Sep 17 00:00:00 2001 From: James Fish Date: Mon, 4 Oct 2021 15:55:09 -0700 Subject: [PATCH 3/4] Fix more nits Signed-off-by: James Fish --- source/extensions/filters/network/thrift_proxy/conn_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index 27e77ef726b9d..531340105ca02 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -220,7 +220,7 @@ FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSha metadata_->setSequenceId(parent_.original_sequence_id_); if (metadata->hasReplyType()) { - success_ = (metadata->replyType() == ReplyType::Success); + success_ = metadata->replyType() == ReplyType::Success; } return ProtocolConverter::messageBegin(metadata); } From 89ac76016b6d4b0d61353153606330484a26e568 Mon Sep 17 00:00:00 2001 From: James Fish Date: Tue, 5 Oct 2021 09:15:36 -0700 Subject: [PATCH 4/4] Fix history after merge Signed-off-by: James Fish --- docs/root/version_history/current.rst | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 86ac393f3d8d2..e4444805cd8b9 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -13,22 +13,7 @@ Bug Fixes --------- *Changes expected to improve the state of the world and are unlikely to have negative effects* -* access log: fix ``%UPSTREAM_CLUSTER%`` when used in http upstream access logs. Previously, it was always logging as an unset value. -* aws request signer: fix the AWS Request Signer extension to correctly normalize the path and query string to be signed according to AWS' guidelines, so that the hash on the server side matches. See `AWS SigV4 documentation `_. -* cluster: delete pools when they're idle to fix unbounded memory use when using PROXY protocol upstream with tcp_proxy. This behavior can be temporarily reverted by setting the ``envoy.reloadable_features.conn_pool_delete_when_idle`` runtime guard to false. -* cluster: finish cluster warming even if hosts are removed before health check initialization. This only affected clusters with :ref:`ignore_health_on_host_removal `. -* compressor: fix a bug where if trailers were added and a subsequent filter paused the filter chain, the request could be stalled. This behavior can be reverted by setting ``envoy.reloadable_features.fix_added_trailers`` to false. -* dynamic forward proxy: fixing a validation bug where san and sni checks were not applied setting :ref:`http_protocol_options ` via :ref:`typed_extension_protocol_options `. -* ext_authz: fix the ext_authz filter to correctly merge multiple same headers using the ',' as separator in the check request to the external authorization service. -* ext_authz: fix the use of ``append`` field of :ref:`response_headers_to_add ` to set or append encoded response headers from a gRPC auth server. -* ext_authz: fix the HTTP ext_authz filter to respond with ``403 Forbidden`` when a gRPC auth server sends a denied check response with an empty HTTP status code. -* ext_authz: the network ext_authz filter now correctly sets dynamic metadata returned by the authorization service for non-OK responses. This behavior now matches the http ext_authz filter. -* hcm: remove deprecation for :ref:`xff_num_trusted_hops ` and forbid mixing ip detection extensions with old related knobs. -* http: limit use of deferred resets in the http2 codec to server-side connections. Use of deferred reset for client connections can result in incorrect behavior and performance problems. -* listener: fixed an issue on Windows where connections are not handled by all worker threads. -* lua: fix ``BodyBuffer`` setting a Lua string and printing Lua string containing hex characters. Previously, ``BodyBuffer`` setting a Lua string or printing strings with hex characters will be truncated. * thrift_proxy: fix the thrift_proxy connection manager to correctly report success/error response metrics when performing :ref:`payload passthrough `. -* xray: fix the AWS X-Ray tracer bug where span's error, fault and throttle information was not reported properly as per the `AWS X-Ray documentation `_. Before this fix, server error was reported under the 'annotations' section of the segment data. Removed Config or Runtime -------------------------