Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ message ThriftProxy {
repeated ThriftFilter thrift_filters = 5;

// If set to true, Envoy will try to skip decode data after metadata in the Thrift message.
// This mode will only work if the upstream and downstream protocols are the same and the transport
// is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will
// This mode will only work if the upstream and downstream protocols are the same and the transports
// are Framed or Header, and the protocol is not Twitter. Otherwise Envoy will
// fallback to decode the data.
bool payload_passthrough = 6;

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Minor Behavior Changes
information.
* listener: destroy per network filter chain stats when a network filter chain is removed during the listener in-place update.
* quic: enables IETF connection migration. This feature requires a stable UDP packet routine in the L4 load balancer with the same first-4-bytes in connection id. It can be turned off by setting runtime guard ``envoy.reloadable_features.FLAGS_quic_reloadable_flag_quic_connection_migration_use_new_cid_v2`` to false.
* thrift_proxy: allow Framed and Header transport combinations to perform :ref:`payload passthrough <envoy_v3_api_field_extensions.filters.network.thrift_proxy.v3.ThriftProxy.payload_passthrough>`.

Bug Fixes
---------
Expand Down
31 changes: 27 additions & 4 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) {
return complete_;
}

FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) {
passthrough_ = true;
// If passing through data, ensure that first reply field is false as if handling a
// fieldBegin. Otherwise all requests will be marked as a success, as if void response,
// in messageEnd. Therefore later will only increment reply and not the inferred subtype
// success/error which requires reading the field id of the first field, see fieldBegin.
first_reply_field_ = false;
return ProtocolConverter::passthroughData(data);
}

FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) {
metadata_ = metadata;
metadata_->setSequenceId(parent_.original_sequence_id_);
Expand Down Expand Up @@ -275,14 +285,22 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() {
cm.read_callbacks_->connection().write(buffer, false);

cm.stats_.response_.inc();
if (passthrough_) {
cm.stats_.response_passthrough_.inc();
}

switch (metadata_->messageType()) {
case MessageType::Reply:
cm.stats_.response_reply_.inc();
if (success_.value_or(false)) {
cm.stats_.response_success_.inc();
} else {
cm.stats_.response_error_.inc();
// success_ is set by inspecting the payload, which wont
// occur when passthrough is enabled as parsing the payload
// is skipped entirely.
if (success_) {
if (success_.value()) {
cm.stats_.response_success_.inc();
} else {
cm.stats_.response_error_.inc();
}
}

break;
Expand Down Expand Up @@ -419,6 +437,10 @@ void ConnectionManager::ActiveRpc::finalizeRequest() {
parent_.stats_.downstream_cx_max_requests_.inc();
}

if (passthrough_) {
parent_.stats_.request_passthrough_.inc();
}

bool destroy_rpc = false;
switch (original_msg_type_) {
case MessageType::Call:
Expand Down Expand Up @@ -458,6 +480,7 @@ bool ConnectionManager::ActiveRpc::passthroughSupported() const {
}

FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data) {
passthrough_ = true;
filter_context_ = &data;
filter_action_ = [this](DecoderEventHandler* filter) -> FilterStatus {
Buffer::Instance* data = absl::any_cast<Buffer::Instance*>(filter_context_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ class ConnectionManager : public Network::ReadFilter,
struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter {
ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol)
: parent_(parent), decoder_(std::make_unique<Decoder>(transport, protocol, *this)),
complete_(false), first_reply_field_(false) {
complete_(false), first_reply_field_(false), passthrough_{false} {
initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_);
}

bool onData(Buffer::Instance& data);

// ProtocolConverter
FilterStatus passthroughData(Buffer::Instance& data) override;
FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus messageEnd() override;
FilterStatus fieldBegin(absl::string_view name, FieldType& field_type,
Expand All @@ -102,6 +103,7 @@ class ConnectionManager : public Network::ReadFilter,
absl::optional<bool> success_;
bool complete_ : 1;
bool first_reply_field_ : 1;
bool passthrough_ : 1;
};
using ResponseDecoderPtr = std::unique_ptr<ResponseDecoder>;

Expand Down Expand Up @@ -155,7 +157,7 @@ class ConnectionManager : public Network::ReadFilter,
stream_id_(parent_.random_generator_.random()),
stream_info_(parent_.time_source_,
parent_.read_callbacks_->connection().connectionInfoProviderSharedPtr()),
local_response_sent_{false}, pending_transport_end_{false} {
local_response_sent_{false}, pending_transport_end_{false}, passthrough_{false} {
parent_.stats_.request_active_.inc();
}
~ActiveRpc() override {
Expand Down Expand Up @@ -245,6 +247,7 @@ class ConnectionManager : public Network::ReadFilter,
absl::any filter_context_;
bool local_response_sent_ : 1;
bool pending_transport_end_ : 1;
bool passthrough_ : 1;
};

using ActiveRpcPtr = std::unique_ptr<ActiveRpc>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggable<Logger::I
}

const auto passthrough_supported =
transport == TransportType::Framed && final_transport == TransportType::Framed &&
(transport == TransportType::Framed || transport == TransportType::Header) &&
(final_transport == TransportType::Framed || final_transport == TransportType::Header) &&
protocol == final_protocol && final_protocol != ProtocolType::Twitter;
UpstreamRequestInfo result = {passthrough_supported, final_transport, final_protocol,
conn_pool_data};
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/network/thrift_proxy/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ namespace ThriftProxy {
COUNTER(request_decoding_error) \
COUNTER(request_invalid_type) \
COUNTER(request_oneway) \
COUNTER(request_passthrough) \
COUNTER(response) \
COUNTER(response_decoding_error) \
COUNTER(response_error) \
COUNTER(response_exception) \
COUNTER(response_invalid_type) \
COUNTER(response_passthrough) \
COUNTER(response_reply) \
COUNTER(response_success) \
GAUGE(request_active, Accumulate) \
Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ envoy_extension_cc_test(
"//test/extensions/filters/network/thrift_proxy/driver:generate_fixture",
],
extension_names = ["envoy.filters.network.thrift_proxy"],
shard_count = 4,
deps = [
":integration_lib",
":utility_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,8 @@ payload_passthrough: true
EXPECT_EQ(0U, store_.counter("test.response_exception").value());
EXPECT_EQ(0U, store_.counter("test.response_invalid_type").value());
// In payload_passthrough mode, Envoy cannot detect response error.
EXPECT_EQ(1U, store_.counter("test.response_success").value());
EXPECT_EQ(1U, store_.counter("test.response_passthrough").value());
EXPECT_EQ(0U, store_.counter("test.response_success").value());
EXPECT_EQ(0U, store_.counter("test.response_error").value());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,20 @@ TEST_P(ThriftConnManagerIntegrationTest, Success) {

Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
if (payload_passthrough_ &&
(transport_ == TransportType::Framed || transport_ == TransportType::Header) &&
protocol_ != ProtocolType::Twitter) {
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
EXPECT_EQ(0U, counter->value());
} else {
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(0U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
EXPECT_EQ(1U, counter->value());
}
counter = test_server_->counter("thrift.thrift_stats.response_reply");
EXPECT_EQ(1U, counter->value());
}

Expand All @@ -252,13 +265,21 @@ TEST_P(ThriftConnManagerIntegrationTest, IDLException) {

Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_error");
if (payload_passthrough_ && transport_ == TransportType::Framed &&
if (payload_passthrough_ &&
(transport_ == TransportType::Framed || transport_ == TransportType::Header) &&
protocol_ != ProtocolType::Twitter) {
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_error");
EXPECT_EQ(0U, counter->value());
} else {
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(0U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_error");
EXPECT_EQ(1U, counter->value());
}
counter = test_server_->counter("thrift.thrift_stats.response_reply");
EXPECT_EQ(1U, counter->value());
}

TEST_P(ThriftConnManagerIntegrationTest, Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace ThriftProxy {

class ThriftTranslationIntegrationTest
: public testing::TestWithParam<
std::tuple<TransportType, ProtocolType, TransportType, ProtocolType>>,
std::tuple<TransportType, ProtocolType, TransportType, ProtocolType, bool>>,
public BaseThriftIntegrationTest {
public:
static void SetUpTestSuite() { // NOLINT(readability-identifier-naming)
Expand All @@ -42,13 +42,11 @@ class ThriftTranslationIntegrationTest
}

void initialize() override {
TransportType downstream_transport, upstream_transport;
ProtocolType downstream_protocol, upstream_protocol;
std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol) =
GetParam();
std::tie(downstream_transport_, downstream_protocol_, upstream_transport_, upstream_protocol_,
passthrough_) = GetParam();

auto upstream_transport_proto = transportTypeToProto(upstream_transport);
auto upstream_protocol_proto = protocolTypeToProto(upstream_protocol);
auto upstream_transport_proto = transportTypeToProto(upstream_transport_);
auto upstream_protocol_proto = protocolTypeToProto(upstream_protocol_);

envoy::extensions::filters::network::thrift_proxy::v3::ThriftProtocolOptions proto_opts;
proto_opts.set_transport(upstream_transport_proto);
Expand All @@ -61,53 +59,74 @@ class ThriftTranslationIntegrationTest
(*opts)[NetworkFilterNames::get().ThriftProxy].PackFrom(proto_opts);
});

if (passthrough_) {
config_helper_.addFilterConfigModifier<
envoy::extensions::filters::network::thrift_proxy::v3::ThriftProxy>(
"thrift", [](Protobuf::Message& filter) {
auto& conn_manager =
dynamic_cast<envoy::extensions::filters::network::thrift_proxy::v3::ThriftProxy&>(
filter);
conn_manager.set_payload_passthrough(true);
});
}

// Invent some varying, but deterministic, values to add. We use the add method instead of
// execute because the default execute params contains a set and the ordering can vary across
// generated payloads.
std::vector<std::string> args({
fmt::format("{}", (static_cast<int>(downstream_transport) << 8) +
static_cast<int>(downstream_protocol)),
fmt::format("{}", (static_cast<int>(upstream_transport) << 8) +
static_cast<int>(upstream_protocol)),
fmt::format("{}", (static_cast<int>(downstream_transport_) << 8) +
static_cast<int>(downstream_protocol_)),
fmt::format("{}", (static_cast<int>(upstream_transport_) << 8) +
static_cast<int>(upstream_protocol_)),
});

PayloadOptions downstream_opts(downstream_transport, downstream_protocol, DriverMode::Success,
PayloadOptions downstream_opts(downstream_transport_, downstream_protocol_, DriverMode::Success,
{}, "add", args);
preparePayloads(downstream_opts, downstream_request_bytes_, downstream_response_bytes_);

PayloadOptions upstream_opts(upstream_transport, upstream_protocol, DriverMode::Success, {},
PayloadOptions upstream_opts(upstream_transport_, upstream_protocol_, DriverMode::Success, {},
"add", args);
preparePayloads(upstream_opts, upstream_request_bytes_, upstream_response_bytes_);

BaseThriftIntegrationTest::initialize();
}

TransportType downstream_transport_;
ProtocolType downstream_protocol_;
TransportType upstream_transport_;
ProtocolType upstream_protocol_;
bool passthrough_;
Buffer::OwnedImpl downstream_request_bytes_;
Buffer::OwnedImpl downstream_response_bytes_;
Buffer::OwnedImpl upstream_request_bytes_;
Buffer::OwnedImpl upstream_response_bytes_;
};

static std::string paramToString(
const TestParamInfo<std::tuple<TransportType, ProtocolType, TransportType, ProtocolType>>&
const TestParamInfo<std::tuple<TransportType, ProtocolType, TransportType, ProtocolType, bool>>&
params) {
TransportType downstream_transport, upstream_transport;
ProtocolType downstream_protocol, upstream_protocol;
std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol) =
params.param;

return fmt::format("From{}{}To{}{}", transportNameForTest(downstream_transport),
protocolNameForTest(downstream_protocol),
transportNameForTest(upstream_transport),
protocolNameForTest(upstream_protocol));
bool passthrough;
std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol,
passthrough) = params.param;

auto result =
fmt::format("From{}{}To{}{}", transportNameForTest(downstream_transport),
protocolNameForTest(downstream_protocol),
transportNameForTest(upstream_transport), protocolNameForTest(upstream_protocol));
if (passthrough) {
result = fmt::format("{}Passthrough", result);
}
return result;
}

INSTANTIATE_TEST_SUITE_P(
TransportsAndProtocols, ThriftTranslationIntegrationTest,
Combine(Values(TransportType::Framed, TransportType::Unframed, TransportType::Header),
Values(ProtocolType::Binary, ProtocolType::Compact),
Values(TransportType::Framed, TransportType::Unframed, TransportType::Header),
Values(ProtocolType::Binary, ProtocolType::Compact)),
Values(ProtocolType::Binary, ProtocolType::Compact), Values(false, true)),
paramToString);

// Tests that the proxy will translate between different downstream and upstream transports and
Expand Down Expand Up @@ -135,7 +154,27 @@ TEST_P(ThriftTranslationIntegrationTest, Translates) {

Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
if (passthrough_ &&
(downstream_transport_ == TransportType::Framed ||
downstream_transport_ == TransportType::Header) &&
(upstream_transport_ == TransportType::Framed ||
upstream_transport_ == TransportType::Header) &&
downstream_protocol_ == upstream_protocol_ && downstream_protocol_ != ProtocolType::Twitter) {
counter = test_server_->counter("thrift.thrift_stats.request_passthrough");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(1U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
EXPECT_EQ(0U, counter->value());
} else {
counter = test_server_->counter("thrift.thrift_stats.request_passthrough");
EXPECT_EQ(0U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_passthrough");
EXPECT_EQ(0U, counter->value());
counter = test_server_->counter("thrift.thrift_stats.response_success");
EXPECT_EQ(1U, counter->value());
}
counter = test_server_->counter("thrift.thrift_stats.response_reply");
EXPECT_EQ(1U, counter->value());
}

Expand Down