Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
976cb9c
Support Encoder filter in thrift proxy
Mar 31, 2022
0027445
simple unit test
Apr 5, 2022
d2f1ab4
pipe transport begin for encoder
Apr 6, 2022
e5faa5c
pipe transportBegin for bidirectionFilter and test
Apr 9, 2022
b2f0e10
refine callback interface and extract apply filters
Apr 11, 2022
885980b
refactor filter_action_/context_ for decoder filters
Apr 13, 2022
b646c70
implement ResponseDecoder::* and test
Apr 14, 2022
beb4e5b
fix MockFilterChainFactoryCallbacks
Apr 14, 2022
49a3712
clang and passthrough filter test
Apr 15, 2022
95717b3
clang and more passthrough filter test
Apr 15, 2022
21ef3c2
Merge remote-tracking branch 'upstream/main' into encoder
Apr 18, 2022
3ad5889
Merge remote-tracking branch 'upstream/main' into encoder
Apr 19, 2022
c80b21b
let test more specific
Apr 19, 2022
288ae7f
add one comment
Apr 20, 2022
060ff15
add more test
Apr 21, 2022
03d9bae
doc and disable continueEncoding
Apr 21, 2022
04bf146
format
Apr 21, 2022
f6f434f
test field modification in filters
Apr 21, 2022
d84c4c0
refactor
Apr 22, 2022
72e5b81
format
Apr 22, 2022
ba4bd80
remove setup functions
Apr 22, 2022
dd344c8
doc and test passthroughData
Apr 22, 2022
78d07f2
Merge remote-tracking branch 'upstream/main' into encoder
Apr 25, 2022
1d0f90c
more doc
Apr 25, 2022
35d4b7f
Merge remote-tracking branch 'upstream/main' into encoder
Apr 28, 2022
4159dc6
address zuercher's comment
Apr 28, 2022
be1b28b
rename from bidirection to bidirectional
Apr 28, 2022
143e851
current.rst
Apr 28, 2022
63ddc73
address rgs1's comment
Apr 29, 2022
a89e564
Merge branch 'main' into encoder
May 3, 2022
053e6ba
fix comment
May 3, 2022
1aac4a4
Merge branch 'main' into encoder, and transfer current.rst to changel…
May 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/1.23.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ new_features:
- area: thrift
change: |
added flag to router to control downstream local close. :ref:`close_downstream_on_upstream_error <envoy_v3_api_field_extensions.filters.network.thrift_proxy.router.v3.Router.close_downstream_on_upstream_error>`.
- area: thrift
change: |
introduced thrift configurable encoder and bidirectional filters, which allows peeking and modifying the thrift response message.
- area: on_demand
change: |
:ref:`OnDemand <envoy_v3_api_msg_extensions.filters.http.on_demand.v3.OnDemand>` got extended to hold configuration for on-demand cluster discovery. A similar message for :ref:`per-route configuration <envoy_v3_api_msg_extensions.filters.http.on_demand.v3.PerRouteConfig>` is also added.
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/network/thrift_proxy/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ envoy_cc_library(
deps = [
":app_exception_lib",
":decoder_lib",
":filter_utils_lib",
":protocol_converter_lib",
":protocol_interface",
":stats_lib",
Expand Down Expand Up @@ -115,6 +116,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "filter_utils_lib",
srcs = ["filter_utils.cc"],
hdrs = ["filter_utils.h"],
deps = [
"//source/extensions/filters/network/thrift_proxy/filters:filter_interface",
],
)

envoy_cc_library(
name = "metadata_lib",
hdrs = ["metadata.h"],
Expand Down
553 changes: 364 additions & 189 deletions source/extensions/filters/network/thrift_proxy/conn_manager.cc

Large diffs are not rendered by default.

150 changes: 122 additions & 28 deletions source/extensions/filters/network/thrift_proxy/conn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "source/common/stats/timespan_impl.h"
#include "source/common/stream_info/stream_info_impl.h"
#include "source/extensions/filters/network/thrift_proxy/decoder.h"
#include "source/extensions/filters/network/thrift_proxy/filter_utils.h"
#include "source/extensions/filters/network/thrift_proxy/filters/filter.h"
#include "source/extensions/filters/network/thrift_proxy/protocol.h"
#include "source/extensions/filters/network/thrift_proxy/protocol_converter.h"
Expand Down Expand Up @@ -72,57 +73,94 @@ class ConnectionManager : public Network::ReadFilter,
private:
struct ActiveRpc;

struct ResponseDecoder : public DecoderCallbacks, public ProtocolConverter {
struct ResponseDecoder : public DecoderCallbacks, public DecoderEventHandler {
ResponseDecoder(ActiveRpc& parent, Transport& transport, Protocol& protocol)
: parent_(parent), decoder_(std::make_unique<Decoder>(transport, protocol, *this)),
complete_(false), passthrough_{false} {
initProtocolConverter(*parent_.parent_.protocol_, parent_.response_buffer_);
protocol_converter_(std::make_shared<ProtocolConverter>()), complete_{false},
passthrough_{false}, pending_transport_end_{false} {
;
protocol_converter_->initProtocolConverter(*parent_.parent_.protocol_,
parent_.response_buffer_);
}

bool onData(Buffer::Instance& data);

// ProtocolConverter
// DecoderEventHandler
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus transportEnd() override;
FilterStatus passthroughData(Buffer::Instance& data) override;
FilterStatus messageBegin(MessageMetadataSharedPtr metadata) override;
FilterStatus transportBegin(MessageMetadataSharedPtr metadata) override {
UNREFERENCED_PARAMETER(metadata);
return FilterStatus::Continue;
}
FilterStatus transportEnd() override;
FilterStatus messageEnd() override;
FilterStatus structBegin(absl::string_view name) override;
FilterStatus structEnd() override;
FilterStatus fieldBegin(absl::string_view name, FieldType& field_type,
int16_t& field_id) override;
FilterStatus fieldEnd() override;
FilterStatus boolValue(bool& value) override;
FilterStatus byteValue(uint8_t& value) override;
FilterStatus int16Value(int16_t& value) override;
FilterStatus int32Value(int32_t& value) override;
FilterStatus int64Value(int64_t& value) override;
FilterStatus doubleValue(double& value) override;
FilterStatus stringValue(absl::string_view value) override;
FilterStatus mapBegin(FieldType& key_type, FieldType& value_type, uint32_t& size) override;
FilterStatus mapEnd() override;
FilterStatus listBegin(FieldType& elem_type, uint32_t& size) override;
FilterStatus listEnd() override;
FilterStatus setBegin(FieldType& elem_type, uint32_t& size) override;
FilterStatus setEnd() override;

// DecoderCallbacks
DecoderEventHandler& newDecoderEventHandler() override { return *this; }
bool passthroughEnabled() const override;

void finalizeResponse();

ActiveRpc& parent_;
DecoderPtr decoder_;
Buffer::OwnedImpl upstream_buffer_;
MessageMetadataSharedPtr metadata_;
ProtocolConverterSharedPtr protocol_converter_;
absl::optional<bool> success_;
bool complete_ : 1;
bool passthrough_ : 1;
bool pending_transport_end_ : 1;
};
using ResponseDecoderPtr = std::unique_ptr<ResponseDecoder>;

// Wraps a DecoderFilter and acts as the DecoderFilterCallbacks for the filter, enabling filter
// chain continuation.
struct ActiveRpcDecoderFilter : public ThriftFilters::DecoderFilterCallbacks,
LinkedObject<ActiveRpcDecoderFilter> {
ActiveRpcDecoderFilter(ActiveRpc& parent, ThriftFilters::DecoderFilterSharedPtr filter)
: parent_(parent), handle_(filter) {}
struct ActiveRpcFilterBase : public virtual ThriftFilters::FilterCallbacks {
ActiveRpcFilterBase(ActiveRpc& parent) : parent_(parent) {}

// ThriftFilters::DecoderFilterCallbacks
// ThriftFilters::FilterCallbacks
uint64_t streamId() const override { return parent_.stream_id_; }
const Network::Connection* connection() const override { return parent_.connection(); }
Event::Dispatcher& dispatcher() override { return parent_.dispatcher(); }
void continueDecoding() override;
Router::RouteConstSharedPtr route() override { return parent_.route(); }
TransportType downstreamTransportType() const override {
return parent_.downstreamTransportType();
}
ProtocolType downstreamProtocolType() const override {
return parent_.downstreamProtocolType();
}

void resetDownstreamConnection() override { parent_.resetDownstreamConnection(); }
StreamInfo::StreamInfo& streamInfo() override { return parent_.streamInfo(); }
MessageMetadataSharedPtr responseMetadata() override { return parent_.responseMetadata(); }
bool responseSuccess() override { return parent_.responseSuccess(); }
void onReset() override { parent_.onReset(); }

ActiveRpc& parent_;
};

// Wraps a DecoderFilter and acts as the DecoderFilterCallbacks for the filter, enabling filter
// chain continuation.
struct ActiveRpcDecoderFilter : public ActiveRpcFilterBase,
public virtual ThriftFilters::DecoderFilterCallbacks,
LinkedObject<ActiveRpcDecoderFilter> {
ActiveRpcDecoderFilter(ActiveRpc& parent, ThriftFilters::DecoderFilterSharedPtr filter)
: ActiveRpcFilterBase(parent), decoder_handle_(filter) {}

// ThriftFilters::DecoderFilterCallbacks
void sendLocalReply(const DirectResponse& response, bool end_stream) override {
parent_.sendLocalReply(response, end_stream);
}
Expand All @@ -132,22 +170,33 @@ class ConnectionManager : public Network::ReadFilter,
ThriftFilters::ResponseStatus upstreamData(Buffer::Instance& buffer) override {
return parent_.upstreamData(buffer);
}
void resetDownstreamConnection() override { parent_.resetDownstreamConnection(); }
StreamInfo::StreamInfo& streamInfo() override { return parent_.streamInfo(); }
MessageMetadataSharedPtr responseMetadata() override { return parent_.responseMetadata(); }
bool responseSuccess() override { return parent_.responseSuccess(); }
void onReset() override { parent_.onReset(); }

ActiveRpc& parent_;
ThriftFilters::DecoderFilterSharedPtr handle_;
void continueDecoding() override;
DecoderEventHandler* decodeEventHandler() { return decoder_handle_.get(); }
ThriftFilters::DecoderFilterSharedPtr decoder_handle_;
};
using ActiveRpcDecoderFilterPtr = std::unique_ptr<ActiveRpcDecoderFilter>;

// Wraps a EncoderFilter and acts as the EncoderFilterCallbacks for the filter, enabling filter
// chain continuation.
struct ActiveRpcEncoderFilter : public ActiveRpcFilterBase,
public virtual ThriftFilters::EncoderFilterCallbacks,
LinkedObject<ActiveRpcEncoderFilter> {
ActiveRpcEncoderFilter(ActiveRpc& parent, ThriftFilters::EncoderFilterSharedPtr filter)
: ActiveRpcFilterBase(parent), encoder_handle_(filter) {}

// ThriftFilters::EncoderFilterCallbacks
void continueEncoding() override;
DecoderEventHandler* decodeEventHandler() { return encoder_handle_.get(); }
ThriftFilters::EncoderFilterSharedPtr encoder_handle_;
};
using ActiveRpcEncoderFilterPtr = std::unique_ptr<ActiveRpcEncoderFilter>;

// ActiveRpc tracks request/response pairs.
struct ActiveRpc : LinkedObject<ActiveRpc>,
public Event::DeferredDeletable,
public DecoderEventHandler,
public ThriftFilters::DecoderFilterCallbacks,
public ThriftFilters::EncoderFilterCallbacks,
public ThriftFilters::FilterChainFactoryCallbacks {
ActiveRpc(ConnectionManager& parent)
: parent_(parent), request_timer_(new Stats::HistogramCompletableTimespanImpl(
Expand All @@ -162,8 +211,8 @@ class ConnectionManager : public Network::ReadFilter,
request_timer_->complete();
parent_.stats_.request_active_.dec();

for (auto& filter : decoder_filters_) {
filter->handle_->onDestroy();
for (auto& filter : base_filters_) {
filter->onDestroy();
}
}

Expand Down Expand Up @@ -199,6 +248,7 @@ class ConnectionManager : public Network::ReadFilter,
return parent_.read_callbacks_->connection().dispatcher();
}
void continueDecoding() override { parent_.continueDecoding(); }
void continueEncoding() override {}
Router::RouteConstSharedPtr route() override;
TransportType downstreamTransportType() const override {
return parent_.decoder_->transportType();
Expand All @@ -220,10 +270,52 @@ class ConnectionManager : public Network::ReadFilter,
ActiveRpcDecoderFilterPtr wrapper = std::make_unique<ActiveRpcDecoderFilter>(*this, filter);
filter->setDecoderFilterCallbacks(*wrapper);
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_);
base_filters_.emplace_back(filter);
}

void addEncoderFilter(ThriftFilters::EncoderFilterSharedPtr filter) override {
ActiveRpcEncoderFilterPtr wrapper = std::make_unique<ActiveRpcEncoderFilter>(*this, filter);
filter->setEncoderFilterCallbacks(*wrapper);
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);
base_filters_.emplace_back(filter);
}

void addBidirectionalFilter(ThriftFilters::BidirectionalFilterSharedPtr filter) override {
ThriftFilters::BidirectionalFilterWrapperSharedPtr wrapper =
std::make_unique<ThriftFilters::BidirectionalFilterWrapper>(filter);

ActiveRpcDecoderFilterPtr decoder_wrapper =
std::make_unique<ActiveRpcDecoderFilter>(*this, wrapper->decoder_filter_);
filter->setDecoderFilterCallbacks(*decoder_wrapper);
LinkedList::moveIntoListBack(std::move(decoder_wrapper), decoder_filters_);

ActiveRpcEncoderFilterPtr encoder_wrapper =
std::make_unique<ActiveRpcEncoderFilter>(*this, wrapper->encoder_filter_);
filter->setEncoderFilterCallbacks(*encoder_wrapper);
LinkedList::moveIntoList(std::move(encoder_wrapper), encoder_filters_);

base_filters_.emplace_back(wrapper);
}

bool passthroughSupported() const;
FilterStatus applyDecoderFilters(ActiveRpcDecoderFilter* filter);

// Apply filters to the decoder_event.
// @param filter the last filter which is already applied to the decoder_event.
// nullptr indicates none is applied and the decoder_event is applied from the
// first filter.
FilterStatus applyDecoderFilters(DecoderEvent state, absl::any data,
ActiveRpcDecoderFilter* filter = nullptr);
FilterStatus applyEncoderFilters(DecoderEvent state, absl::any data,
ProtocolConverterSharedPtr protocol_converter,
ActiveRpcEncoderFilter* filter = nullptr);
template <typename FilterType>
FilterStatus applyFilters(FilterType* filter,
std::list<std::unique_ptr<FilterType>>& filter_list,
ProtocolConverterSharedPtr protocol_converter = nullptr);

// Helper to setup filter_action_ and filter_context_
void prepareFilterAction(DecoderEvent event, absl::any data);

void finalizeRequest();

void createFilterChain();
Expand All @@ -235,6 +327,8 @@ class ConnectionManager : public Network::ReadFilter,
StreamInfo::StreamInfoImpl stream_info_;
MessageMetadataSharedPtr metadata_;
std::list<ActiveRpcDecoderFilterPtr> decoder_filters_;
std::list<ActiveRpcEncoderFilterPtr> encoder_filters_;
std::list<ThriftFilters::FilterBaseSharedPtr> base_filters_;
DecoderEventHandlerSharedPtr upgrade_handler_;
ResponseDecoderPtr response_decoder_;
absl::optional<Router::RouteConstSharedPtr> cached_route_;
Expand Down
26 changes: 26 additions & 0 deletions source/extensions/filters/network/thrift_proxy/decoder_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,32 @@ enum class FilterStatus {
StopIteration
};

enum class DecoderEvent {
TransportBegin,
TransportEnd,
PassthroughData,
MessageBegin,
MessageEnd,
StructBegin,
StructEnd,
FieldBegin,
FieldEnd,
BoolValue,
ByteValue,
DoubleValue,
Int16Value,
Int32Value,
Int64Value,
StringValue,
ListBegin,
ListEnd,
SetBegin,
SetEnd,
MapBegin,
MapEnd,
ContinueDecode
};

class DecoderEventHandler {
public:
virtual ~DecoderEventHandler() = default;
Expand Down
Loading