diff --git a/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto b/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto index 4916330ec5f3a..01c41c77bb2b5 100644 --- a/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto +++ b/api/envoy/extensions/filters/network/thrift_proxy/v3/thrift_proxy.proto @@ -85,8 +85,8 @@ message ThriftProxy { repeated ThriftFilter thrift_filters = 5; // If set to true, Envoy will try to skip decode data after metadata in the Thrift message. - // This mode will only work if the upstream and downstream protocols are the same and the transport - // is the same, the transport type is framed and the protocol is not Twitter. Otherwise Envoy will + // This mode will only work if the upstream and downstream protocols are the same and the transports + // are Framed or Header, and the protocol is not Twitter. Otherwise Envoy will // fallback to decode the data. bool payload_passthrough = 6; diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 48df3345b5b68..24c7bc6bf3110 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -70,6 +70,7 @@ Minor Behavior Changes information. * listener: destroy per network filter chain stats when a network filter chain is removed during the listener in-place update. * quic: enables IETF connection migration. This feature requires a stable UDP packet routine in the L4 load balancer with the same first-4-bytes in connection id. It can be turned off by setting runtime guard ``envoy.reloadable_features.FLAGS_quic_reloadable_flag_quic_connection_migration_use_new_cid_v2`` to false. +* thrift_proxy: allow Framed and Header transport combinations to perform :ref:`payload passthrough `. Bug Fixes --------- diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.cc b/source/extensions/filters/network/thrift_proxy/conn_manager.cc index 1e100247e59cc..e7becf40c1d15 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.cc +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.cc @@ -210,6 +210,16 @@ bool ConnectionManager::ResponseDecoder::onData(Buffer::Instance& data) { return complete_; } +FilterStatus ConnectionManager::ResponseDecoder::passthroughData(Buffer::Instance& data) { + passthrough_ = true; + // If passing through data, ensure that first reply field is false as if handling a + // fieldBegin. Otherwise all requests will be marked as a success, as if void response, + // in messageEnd. Therefore later will only increment reply and not the inferred subtype + // success/error which requires reading the field id of the first field, see fieldBegin. + first_reply_field_ = false; + return ProtocolConverter::passthroughData(data); +} + FilterStatus ConnectionManager::ResponseDecoder::messageBegin(MessageMetadataSharedPtr metadata) { metadata_ = metadata; metadata_->setSequenceId(parent_.original_sequence_id_); @@ -275,14 +285,22 @@ FilterStatus ConnectionManager::ResponseDecoder::transportEnd() { cm.read_callbacks_->connection().write(buffer, false); cm.stats_.response_.inc(); + if (passthrough_) { + cm.stats_.response_passthrough_.inc(); + } switch (metadata_->messageType()) { case MessageType::Reply: cm.stats_.response_reply_.inc(); - if (success_.value_or(false)) { - cm.stats_.response_success_.inc(); - } else { - cm.stats_.response_error_.inc(); + // success_ is set by inspecting the payload, which wont + // occur when passthrough is enabled as parsing the payload + // is skipped entirely. + if (success_) { + if (success_.value()) { + cm.stats_.response_success_.inc(); + } else { + cm.stats_.response_error_.inc(); + } } break; @@ -419,6 +437,10 @@ void ConnectionManager::ActiveRpc::finalizeRequest() { parent_.stats_.downstream_cx_max_requests_.inc(); } + if (passthrough_) { + parent_.stats_.request_passthrough_.inc(); + } + bool destroy_rpc = false; switch (original_msg_type_) { case MessageType::Call: @@ -458,6 +480,7 @@ bool ConnectionManager::ActiveRpc::passthroughSupported() const { } FilterStatus ConnectionManager::ActiveRpc::passthroughData(Buffer::Instance& data) { + passthrough_ = true; filter_context_ = &data; filter_action_ = [this](DecoderEventHandler* filter) -> FilterStatus { Buffer::Instance* data = absl::any_cast(filter_context_); diff --git a/source/extensions/filters/network/thrift_proxy/conn_manager.h b/source/extensions/filters/network/thrift_proxy/conn_manager.h index 064d29b050607..851e1d10ad6f3 100644 --- a/source/extensions/filters/network/thrift_proxy/conn_manager.h +++ b/source/extensions/filters/network/thrift_proxy/conn_manager.h @@ -74,13 +74,14 @@ class ConnectionManager : public Network::ReadFilter, struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter { ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol) : parent_(parent), decoder_(std::make_unique(transport, protocol, *this)), - complete_(false), first_reply_field_(false) { + complete_(false), first_reply_field_(false), passthrough_{false} { initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_); } bool onData(Buffer::Instance& data); // ProtocolConverter + FilterStatus passthroughData(Buffer::Instance& data) override; FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override; FilterStatus messageEnd() override; FilterStatus fieldBegin(absl::string_view name, FieldType& field_type, @@ -102,6 +103,7 @@ class ConnectionManager : public Network::ReadFilter, absl::optional success_; bool complete_ : 1; bool first_reply_field_ : 1; + bool passthrough_ : 1; }; using ResponseDecoderPtr = std::unique_ptr; @@ -155,7 +157,7 @@ class ConnectionManager : public Network::ReadFilter, stream_id_(parent_.random_generator_.random()), stream_info_(parent_.time_source_, parent_.read_callbacks_->connection().connectionInfoProviderSharedPtr()), - local_response_sent_{false}, pending_transport_end_{false} { + local_response_sent_{false}, pending_transport_end_{false}, passthrough_{false} { parent_.stats_.request_active_.inc(); } ~ActiveRpc() override { @@ -245,6 +247,7 @@ class ConnectionManager : public Network::ReadFilter, absl::any filter_context_; bool local_response_sent_ : 1; bool pending_transport_end_ : 1; + bool passthrough_ : 1; }; using ActiveRpcPtr = std::unique_ptr; diff --git a/source/extensions/filters/network/thrift_proxy/router/router.h b/source/extensions/filters/network/thrift_proxy/router/router.h index dfda11ac34407..8806bbd5e4ac5 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router.h +++ b/source/extensions/filters/network/thrift_proxy/router/router.h @@ -357,7 +357,8 @@ class RequestOwner : public ProtocolConverter, public Logger::Loggablecounter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); + if (payload_passthrough_ && + (transport_ == TransportType::Framed || transport_ == TransportType::Header) && + protocol_ != ProtocolType::Twitter) { + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(0U, counter->value()); + } else { + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(0U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(1U, counter->value()); + } + counter = test_server_->counter("thrift.thrift_stats.response_reply"); EXPECT_EQ(1U, counter->value()); } @@ -252,13 +265,21 @@ TEST_P(ThriftConnManagerIntegrationTest, IDLException) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_error"); - if (payload_passthrough_ && transport_ == TransportType::Framed && + if (payload_passthrough_ && + (transport_ == TransportType::Framed || transport_ == TransportType::Header) && protocol_ != ProtocolType::Twitter) { + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_error"); EXPECT_EQ(0U, counter->value()); } else { + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(0U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_error"); EXPECT_EQ(1U, counter->value()); } + counter = test_server_->counter("thrift.thrift_stats.response_reply"); + EXPECT_EQ(1U, counter->value()); } TEST_P(ThriftConnManagerIntegrationTest, Exception) { diff --git a/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc b/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc index 9bead20a44bae..483cf74841b70 100644 --- a/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc +++ b/test/extensions/filters/network/thrift_proxy/translation_integration_test.cc @@ -20,7 +20,7 @@ namespace ThriftProxy { class ThriftTranslationIntegrationTest : public testing::TestWithParam< - std::tuple>, + std::tuple>, public BaseThriftIntegrationTest { public: static void SetUpTestSuite() { // NOLINT(readability-identifier-naming) @@ -42,13 +42,11 @@ class ThriftTranslationIntegrationTest } void initialize() override { - TransportType downstream_transport, upstream_transport; - ProtocolType downstream_protocol, upstream_protocol; - std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol) = - GetParam(); + std::tie(downstream_transport_, downstream_protocol_, upstream_transport_, upstream_protocol_, + passthrough_) = GetParam(); - auto upstream_transport_proto = transportTypeToProto(upstream_transport); - auto upstream_protocol_proto = protocolTypeToProto(upstream_protocol); + auto upstream_transport_proto = transportTypeToProto(upstream_transport_); + auto upstream_protocol_proto = protocolTypeToProto(upstream_protocol_); envoy::extensions::filters::network::thrift_proxy::v3::ThriftProtocolOptions proto_opts; proto_opts.set_transport(upstream_transport_proto); @@ -61,27 +59,43 @@ class ThriftTranslationIntegrationTest (*opts)[NetworkFilterNames::get().ThriftProxy].PackFrom(proto_opts); }); + if (passthrough_) { + config_helper_.addFilterConfigModifier< + envoy::extensions::filters::network::thrift_proxy::v3::ThriftProxy>( + "thrift", [](Protobuf::Message& filter) { + auto& conn_manager = + dynamic_cast( + filter); + conn_manager.set_payload_passthrough(true); + }); + } + // Invent some varying, but deterministic, values to add. We use the add method instead of // execute because the default execute params contains a set and the ordering can vary across // generated payloads. std::vector args({ - fmt::format("{}", (static_cast(downstream_transport) << 8) + - static_cast(downstream_protocol)), - fmt::format("{}", (static_cast(upstream_transport) << 8) + - static_cast(upstream_protocol)), + fmt::format("{}", (static_cast(downstream_transport_) << 8) + + static_cast(downstream_protocol_)), + fmt::format("{}", (static_cast(upstream_transport_) << 8) + + static_cast(upstream_protocol_)), }); - PayloadOptions downstream_opts(downstream_transport, downstream_protocol, DriverMode::Success, + PayloadOptions downstream_opts(downstream_transport_, downstream_protocol_, DriverMode::Success, {}, "add", args); preparePayloads(downstream_opts, downstream_request_bytes_, downstream_response_bytes_); - PayloadOptions upstream_opts(upstream_transport, upstream_protocol, DriverMode::Success, {}, + PayloadOptions upstream_opts(upstream_transport_, upstream_protocol_, DriverMode::Success, {}, "add", args); preparePayloads(upstream_opts, upstream_request_bytes_, upstream_response_bytes_); BaseThriftIntegrationTest::initialize(); } + TransportType downstream_transport_; + ProtocolType downstream_protocol_; + TransportType upstream_transport_; + ProtocolType upstream_protocol_; + bool passthrough_; Buffer::OwnedImpl downstream_request_bytes_; Buffer::OwnedImpl downstream_response_bytes_; Buffer::OwnedImpl upstream_request_bytes_; @@ -89,17 +103,22 @@ class ThriftTranslationIntegrationTest }; static std::string paramToString( - const TestParamInfo>& + const TestParamInfo>& params) { TransportType downstream_transport, upstream_transport; ProtocolType downstream_protocol, upstream_protocol; - std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol) = - params.param; - - return fmt::format("From{}{}To{}{}", transportNameForTest(downstream_transport), - protocolNameForTest(downstream_protocol), - transportNameForTest(upstream_transport), - protocolNameForTest(upstream_protocol)); + bool passthrough; + std::tie(downstream_transport, downstream_protocol, upstream_transport, upstream_protocol, + passthrough) = params.param; + + auto result = + fmt::format("From{}{}To{}{}", transportNameForTest(downstream_transport), + protocolNameForTest(downstream_protocol), + transportNameForTest(upstream_transport), protocolNameForTest(upstream_protocol)); + if (passthrough) { + result = fmt::format("{}Passthrough", result); + } + return result; } INSTANTIATE_TEST_SUITE_P( @@ -107,7 +126,7 @@ INSTANTIATE_TEST_SUITE_P( Combine(Values(TransportType::Framed, TransportType::Unframed, TransportType::Header), Values(ProtocolType::Binary, ProtocolType::Compact), Values(TransportType::Framed, TransportType::Unframed, TransportType::Header), - Values(ProtocolType::Binary, ProtocolType::Compact)), + Values(ProtocolType::Binary, ProtocolType::Compact), Values(false, true)), paramToString); // Tests that the proxy will translate between different downstream and upstream transports and @@ -135,7 +154,27 @@ TEST_P(ThriftTranslationIntegrationTest, Translates) { Stats::CounterSharedPtr counter = test_server_->counter("thrift.thrift_stats.request_call"); EXPECT_EQ(1U, counter->value()); - counter = test_server_->counter("thrift.thrift_stats.response_success"); + if (passthrough_ && + (downstream_transport_ == TransportType::Framed || + downstream_transport_ == TransportType::Header) && + (upstream_transport_ == TransportType::Framed || + upstream_transport_ == TransportType::Header) && + downstream_protocol_ == upstream_protocol_ && downstream_protocol_ != ProtocolType::Twitter) { + counter = test_server_->counter("thrift.thrift_stats.request_passthrough"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(1U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(0U, counter->value()); + } else { + counter = test_server_->counter("thrift.thrift_stats.request_passthrough"); + EXPECT_EQ(0U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_passthrough"); + EXPECT_EQ(0U, counter->value()); + counter = test_server_->counter("thrift.thrift_stats.response_success"); + EXPECT_EQ(1U, counter->value()); + } + counter = test_server_->counter("thrift.thrift_stats.response_reply"); EXPECT_EQ(1U, counter->value()); }