Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Bug Fixes
---------
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* thrift_proxy: fix the thrift_proxy connection manager to correctly report success/error response metrics when performing :ref:`payload passthrough <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.payload_passthrough>`.

Removed Config or Runtime
-------------------------
*Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "source/common/common/fmt.h"
#include "source/extensions/filters/network/thrift_proxy/protocol.h"
#include "source/extensions/filters/network/thrift_proxy/thrift.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -33,6 +34,9 @@ class AutoProtocolImpl : public Protocol {

bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override;
bool readMessageEnd(Buffer::Instance& buffer) override;
bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override {
return protocol_->peekReplyPayload(buffer, reply_type);
}
bool readStructBegin(Buffer::Instance& buffer, std::string& name) override {
return protocol_->readStructBegin(buffer, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,33 @@ bool BinaryProtocolImpl::readMessageEnd(Buffer::Instance& buffer) {
return true;
}

bool BinaryProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) {
// binary protocol does not transmit struct names so go straight to peek at field begin
// FieldType::Stop is encoded as 1 byte.
if (buffer.length() < 1) {
return false;
}

FieldType type = static_cast<FieldType>(buffer.peekInt<int8_t>());
if (type == FieldType::Stop) {
// If the first field is stop then response is void success
reply_type = ReplyType::Success;
return true;
}

if (buffer.length() < 3) {
return false;
}

int16_t id = buffer.peekBEInt<int16_t>(1);
if (id < 0) {
throw EnvoyException(absl::StrCat("invalid binary protocol field id ", id));
}
// successful response struct in field id 0, error (IDL exception) in field id greater than 0
reply_type = id == 0 ? ReplyType::Success : ReplyType::Error;
return true;
}

bool BinaryProtocolImpl::readStructBegin(Buffer::Instance& buffer, std::string& name) {
UNREFERENCED_PARAMETER(buffer);
name.clear(); // binary protocol does not transmit struct names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/common/pure.h"

#include "source/extensions/filters/network/thrift_proxy/protocol.h"
#include "source/extensions/filters/network/thrift_proxy/thrift.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -25,6 +26,7 @@ class BinaryProtocolImpl : public Protocol {
ProtocolType type() const override { return ProtocolType::Binary; }
bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override;
bool readMessageEnd(Buffer::Instance& buffer) override;
bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override;
bool readStructBegin(Buffer::Instance& buffer, std::string& name) override;
bool readStructEnd(Buffer::Instance& buffer) override;
bool readFieldBegin(Buffer::Instance& buffer, std::string& name, FieldType& field_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,46 @@ bool CompactProtocolImpl::readMessageEnd(Buffer::Instance& buffer) {
return true;
}

bool CompactProtocolImpl::peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) {
// compact protocol does not transmit struct names so go straight to peek for field begin
// Minimum size: FieldType::Stop is encoded as 1 byte.
if (buffer.length() < 1) {
return false;
}

uint8_t delta_and_type = buffer.peekInt<int8_t>();
if ((delta_and_type & 0x0f) == 0) {
// Type is stop, no need to do further decoding
// If the first field is stop then response is void success
reply_type = ReplyType::Success;
return true;
}

if ((delta_and_type >> 4) != 0) {
// field id delta is non zero and so is an IDL exception (success field id is 0)
reply_type = ReplyType::Error;
return true;
}

int id_size = 0;
// Field ID delta is zero: this is a long-form field header, followed by zig-zag field id.
if (buffer.length() < 2) {
return false;
}

int32_t id = BufferHelper::peekZigZagI32(buffer, 1, id_size);
if (id_size < 0) {
return false;
}

if (id < 0 || id > std::numeric_limits<int16_t>::max()) {
throw EnvoyException(absl::StrCat("invalid compact protocol field id ", id));
}
// successful response struct in field id 0, error (IDL exception) in field id greater than 0
reply_type = id == 0 ? ReplyType::Success : ReplyType::Error;
return true;
}

bool CompactProtocolImpl::readStructBegin(Buffer::Instance& buffer, std::string& name) {
UNREFERENCED_PARAMETER(buffer);
name.clear(); // compact protocol does not transmit struct names
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "envoy/common/pure.h"

#include "source/extensions/filters/network/thrift_proxy/protocol.h"
#include "source/extensions/filters/network/thrift_proxy/thrift.h"

#include "absl/types/optional.h"

Expand All @@ -28,6 +29,7 @@ class CompactProtocolImpl : public Protocol {
ProtocolType type() const override { return ProtocolType::Compact; }
bool readMessageBegin(Buffer::Instance& buffer, MessageMetadata& metadata) override;
bool readMessageEnd(Buffer::Instance& buffer) override;
bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) override;
bool readStructBegin(Buffer::Instance& buffer, std::string& name) override;
bool readStructEnd(Buffer::Instance& buffer) override;
bool readFieldBegin(Buffer::Instance& buffer, std::string& name, FieldType& field_type,
Expand Down
44 changes: 3 additions & 41 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,52 +212,17 @@ bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) {

FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) {
passthrough_ = true;
// If passing through data, ensure that first reply field is false as if handling a
// fieldBegin. Otherwise all requests will be marked as a success, as if void response,
// in messageEnd. Therefore later will only increment reply and not the inferred subtype
// success/error which requires reading the field id of the first field, see fieldBegin.
first_reply_field_ = false;
return ProtocolConverter::passthroughData(data);
}

FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) {
metadata_ = metadata;
metadata_->setSequenceId(parent_.original_sequence_id_);

first_reply_field_ =
(metadata->hasMessageType() && metadata->messageType() == MessageType::Reply);
return ProtocolConverter::messageBegin(metadata);
}

FilterStatus ConnectionManager::ResponseDecoder::fieldBegin(absl::string_view name,
FieldType& field_type,
int16_t& field_id) {
if (first_reply_field_) {
// Reply messages contain a struct where field 0 is the call result and fields 1+ are
// exceptions, if defined. At most one field may be set. Therefore, the very first field we
// encounter in a reply is either field 0 (success) or not (IDL exception returned).
// If first fieldType is FieldType::Stop then it is a void success and handled in messageEnd()
// because decoder state machine does not call decoder event callback fieldBegin on
// FieldType::Stop.
success_ = (field_id == 0);
first_reply_field_ = false;
if (metadata->hasReplyType()) {
success_ = metadata->replyType() == ReplyType::Success;
}

return ProtocolConverter::fieldBegin(name, field_type, field_id);
}

FilterStatus ConnectionManager::ResponseDecoder::messageEnd() {
if (first_reply_field_) {
// When the response is thrift void type there is never a fieldBegin call on a success
// because the response struct has no fields and so the first field type is FieldType::Stop.
// The decoder state machine handles FieldType::Stop by going immediately to structEnd,
// skipping fieldBegin callback. Therefore if we are still waiting for the first reply field
// at end of message then it is a void success.
success_ = true;
first_reply_field_ = false;
}

return ProtocolConverter::messageEnd();
return ProtocolConverter::messageBegin(metadata);
}

FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {
Expand Down Expand Up @@ -292,9 +257,6 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {
switch (metadata_->messageType()) {
case MessageType::Reply:
cm.stats_.response_reply_.inc();
// success_ is set by inspecting the payload, which wont
// occur when passthrough is enabled as parsing the payload
// is skipped entirely.
if (success_) {
if (success_.value()) {
cm.stats_.response_success_.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ConnectionManager : public Network::ReadFilter,
struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter {
ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol)
: parent_(parent), decoder_(std::make_unique<Decoder>(transport, protocol, *this)),
complete_(false), first_reply_field_(false), passthrough_{false} {
complete_(false), passthrough_{false} {
initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_);
}

Expand All @@ -83,9 +83,6 @@ class ConnectionManager : public Network::ReadFilter,
// ProtocolConverter
FilterStatus passthroughData(Buffer::Instance& data) override;
FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus messageEnd() override;
FilterStatus fieldBegin(absl::string_view name, FieldType& field_type,
int16_t& field_id) override;
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override {
UNREFERENCED_PARAMETER(metadata);
return FilterStatus::Continue;
Expand All @@ -102,7 +99,6 @@ class ConnectionManager : public Network::ReadFilter,
MessageMetadataSharedPtr metadata_;
absl::optional<bool> success_;
bool complete_ : 1;
bool first_reply_field_ : 1;
bool passthrough_ : 1;
};
using ResponseDecoderPtr = std::unique_ptr<ResponseDecoder>;
Expand Down
26 changes: 25 additions & 1 deletion source/extensions/filters/network/thrift_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "source/common/common/assert.h"
#include "source/common/common/macros.h"
#include "source/extensions/filters/network/thrift_proxy/app_exception_impl.h"
#include "source/extensions/filters/network/thrift_proxy/thrift.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -26,19 +27,29 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::passthroughData(Buffer::
}

// MessageBegin -> StructBegin
// MessageBegin -> ReplyPayload (reply received, get reply type)
DecoderStateMachine::DecoderStatus DecoderStateMachine::messageBegin(Buffer::Instance& buffer) {
const auto total = buffer.length();
if (!proto_.readMessageBegin(buffer, *metadata_)) {
return {ProtocolState::WaitForData};
}

body_start_ = total - buffer.length();
stack_.clear();
stack_.emplace_back(Frame(ProtocolState::MessageEnd));
// If a reply peek at the payload to see if success or error (IDL exception)
if (metadata_->hasMessageType() && metadata_->messageType() == MessageType::Reply) {
return {ProtocolState::ReplyPayload, FilterStatus::Continue};
}

return handleMessageBegin();
}

DecoderStateMachine::DecoderStatus DecoderStateMachine::handleMessageBegin() {
const auto status = handler_.messageBegin(metadata_);

if (callbacks_.passthroughEnabled()) {
body_bytes_ = metadata_->frameSize() - (total - buffer.length());
body_bytes_ = metadata_->frameSize() - body_start_;
return {ProtocolState::PassthroughData, status};
}

Expand All @@ -54,6 +65,17 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::messageEnd(Buffer::Insta
return {ProtocolState::Done, handler_.messageEnd()};
}

// ReplyPayload -> StructBegin
DecoderStateMachine::DecoderStatus DecoderStateMachine::replyPayload(Buffer::Instance& buffer) {
ReplyType reply_type;
if (!proto_.peekReplyPayload(buffer, reply_type)) {
return {ProtocolState::WaitForData};
}

metadata_->setReplyType(reply_type);
return handleMessageBegin();
}

// StructBegin -> FieldBegin
DecoderStateMachine::DecoderStatus DecoderStateMachine::structBegin(Buffer::Instance& buffer) {
std::string name;
Expand Down Expand Up @@ -318,6 +340,8 @@ DecoderStateMachine::DecoderStatus DecoderStateMachine::handleState(Buffer::Inst
return passthroughData(buffer);
case ProtocolState::MessageBegin:
return messageBegin(buffer);
case ProtocolState::ReplyPayload:
return replyPayload(buffer);
case ProtocolState::StructBegin:
return structBegin(buffer);
case ProtocolState::StructEnd:
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/filters/network/thrift_proxy/decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace ThriftProxy {
FUNCTION(PassthroughData) \
FUNCTION(MessageBegin) \
FUNCTION(MessageEnd) \
FUNCTION(ReplyPayload) \
FUNCTION(StructBegin) \
FUNCTION(StructEnd) \
FUNCTION(FieldBegin) \
Expand Down Expand Up @@ -134,6 +135,7 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
DecoderStatus passthroughData(Buffer::Instance& buffer);
DecoderStatus messageBegin(Buffer::Instance& buffer);
DecoderStatus messageEnd(Buffer::Instance& buffer);
DecoderStatus replyPayload(Buffer::Instance& buffer);
DecoderStatus structBegin(Buffer::Instance& buffer);
DecoderStatus structEnd(Buffer::Instance& buffer);
DecoderStatus fieldBegin(Buffer::Instance& buffer);
Expand All @@ -150,6 +152,10 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
DecoderStatus setValue(Buffer::Instance& buffer);
DecoderStatus setEnd(Buffer::Instance& buffer);

// handleMessageBegin calls the handler for messageBegin and then determines whether to
// perform payload passthrough or not
DecoderStatus handleMessageBegin();

// handleValue represents the generic Value state from the state machine documentation. It
// returns either ProtocolState::WaitForData if more data is required or the next state. For
// structs, lists, maps, or sets the return_state is pushed onto the stack and the next state is
Expand All @@ -171,6 +177,7 @@ class DecoderStateMachine : public Logger::Loggable<Logger::Id::thrift> {
DecoderCallbacks& callbacks_;
ProtocolState state_;
std::vector<Frame> stack_;
uint32_t body_start_{};
uint32_t body_bytes_{};
};

Expand Down
9 changes: 9 additions & 0 deletions source/extensions/filters/network/thrift_proxy/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class MessageMetadata {
copy->setMessageType(messageType());
}

if (hasReplyType()) {
copy->setReplyType(replyType());
}

Http::HeaderMapImpl::copyFrom(copy->headers(), headers());
copy->mutableSpans().assign(spans().begin(), spans().end());

Expand Down Expand Up @@ -115,6 +119,10 @@ class MessageMetadata {
MessageType messageType() const { return msg_type_.value(); }
void setMessageType(MessageType msg_type) { msg_type_ = msg_type; }

bool hasReplyType() const { return reply_type_.has_value(); }
ReplyType replyType() const { return reply_type_.value(); }
void setReplyType(ReplyType reply_type) { reply_type_ = reply_type; }

/**
* @return HeaderMap of current headers (never throws)
*/
Expand Down Expand Up @@ -168,6 +176,7 @@ class MessageMetadata {
absl::optional<std::string> method_name_{};
absl::optional<int32_t> seq_id_{};
absl::optional<MessageType> msg_type_{};
absl::optional<ReplyType> reply_type_{};
Http::HeaderMapPtr headers_{Http::RequestHeaderMapImpl::create()};
absl::optional<AppExceptionType> app_ex_type_;
absl::optional<std::string> app_ex_msg_;
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/network/thrift_proxy/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ class Protocol {
*/
virtual bool readMessageEnd(Buffer::Instance& buffer) PURE;

/**
* Peeks the start of a Thrift protocol reply payload in the buffer and updates the reply
* type parameter with the reply type of the payload.
* @param buffer the buffer to peek from
* @param reply_type ReplyType to set the payload's reply type to success or error
* @return true if reply type was successfully read, false if more data is required
* @throw EnvoyException if the data is not a valid payload
*/
virtual bool peekReplyPayload(Buffer::Instance& buffer, ReplyType& reply_type) PURE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be body const?

Copy link
Contributor Author

@fishcakez fishcakez Oct 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be but that opens a can of worms on the peek buffer helpers we have. Ill look into fixing that in a later PR i think unless you want to solve that here.


/**
* Reads the start of a Thrift struct from the buffer and updates the name parameter with the
* value from the struct header. If successful, the struct header is removed from the buffer.
Expand Down
Loading