Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3835768
Enable StopAllTypesIteration for decoding path
Feb 13, 2019
9cf21dd
update comments
Feb 14, 2019
4429d12
fix clang_tidy
Feb 14, 2019
94c4d44
address comments
Feb 20, 2019
eabbeb6
add use case explanation
Feb 20, 2019
0bd2315
fix format
Feb 20, 2019
b437b67
merge from master
Feb 21, 2019
50b7e7c
fix format
Feb 21, 2019
5c9fc64
fix format and add new files
Feb 21, 2019
b3527f7
remove unused variable
Feb 21, 2019
eaa2ceb
cleanups
Feb 21, 2019
48a3833
emtpy commit
Feb 21, 2019
ac8a553
emtpy commit
Feb 21, 2019
f595c23
Merge stopped_ into iteration_state_
Mar 1, 2019
ee71a1f
remove iterate_from_current_filter_
Mar 1, 2019
6544a7c
fixed a test where two stop-all filters are back-to-back
Mar 1, 2019
3f2656b
enable stop all iteration tests on http2
Mar 4, 2019
6996475
address review comments
Mar 5, 2019
7bf4912
merge master
Mar 6, 2019
119e166
add unit test and fixed a bug
Mar 8, 2019
82588d3
merge from master
Mar 10, 2019
d2839d6
fix format
Mar 11, 2019
eb25b60
fix a bug in conn_manager_impl_test.cc
Mar 11, 2019
82e02e2
Enable stop-all for encoding path
Mar 11, 2019
e0726f0
add trailers to unit test
Mar 11, 2019
a43ba57
address review comments except filters combining.
Mar 12, 2019
701493a
move tests to protocol_integration_test.cc and set parameters in test…
Mar 12, 2019
66ebe16
format change
Mar 12, 2019
e9f387a
merge decode_headers_return_stop_all_filter and decode_headers_return…
Mar 12, 2019
b7b8113
Merge branch 'master' into stopall
Mar 12, 2019
dd80759
fix a bug where decodeData() returns stop, we reset iteration_state_ …
Mar 13, 2019
eef5003
address review comments
Mar 18, 2019
bcb917b
Add more comments
Mar 18, 2019
5f21a63
address review comments
Mar 20, 2019
e691414
Empty commmit
Mar 20, 2019
5dfb999
comments
Mar 25, 2019
c60f2df
test cleanup
Mar 26, 2019
c645f9b
Add integration test for the encoding path
Mar 26, 2019
0ddb3af
add new file test/integration/filters/encode_headers_return_stop_all_…
Mar 26, 2019
beb19b8
address conn_manager_impl_test.cc comments
Mar 27, 2019
d875ca4
address review comments
Mar 27, 2019
e7cebab
address comments
Mar 28, 2019
d02d056
avoid saved state in addEn/decodedData
Mar 28, 2019
c2becd0
address comments
Apr 1, 2019
9e2e461
address comment
Apr 2, 2019
4ca783e
merge master
Apr 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,30 @@ enum class FilterHeadersStatus {
StopIteration,
// Continue iteration to remaining filters, but ignore any subsequent data or trailers. This
// results in creating a header only request/response.
ContinueAndEndStream
ContinueAndEndStream,
// Do not iterate for headers as well as data and trailers for the current filter and the filters
// following, and buffer body data for later dispatching. ContinueDecoding() MUST
// be called if continued filter iteration is desired.
//
// Used when a filter wants to stop iteration on data and trailers while waiting for headers'
// iteration to resume.
//
// If buffering the request causes buffered data to exceed the configured buffer limit, a 413 will
// be sent to the user. On the response path exceeding buffer limits will result in a 500.
//
// TODO(soya3129): stop metadata parsing when StopAllIterationAndBuffer is set.
StopAllIterationAndBuffer,
// Do not iterate for headers as well as data and trailers for the current filter and the filters
// following, and buffer body data for later dispatching. continueDecoding() MUST
// be called if continued filter iteration is desired.
//
// Used when a filter wants to stop iteration on data and trailers while waiting for headers'
// iteration to resume.
//
// This will cause the flow of incoming data to cease until continueDecoding() function is called.
//
// TODO(soya3129): stop metadata parsing when StopAllIterationAndWatermark is set.
StopAllIterationAndWatermark,
};

/**
Expand Down
175 changes: 123 additions & 52 deletions source/common/http/conn_manager_impl.cc

Large diffs are not rendered by default.

75 changes: 66 additions & 9 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,26 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
*/
struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks {
ActiveStreamFilterBase(ActiveStream& parent, bool dual_filter)
: parent_(parent), headers_continued_(false), continue_headers_continued_(false),
stopped_(false), end_stream_(false), dual_filter_(dual_filter) {}
: iteration_state_(IterationState::Continue), iterate_from_current_filter_(false),
parent_(parent), headers_continued_(false), continue_headers_continued_(false),
end_stream_(false), dual_filter_(dual_filter) {}

// Functions in the following block are called after the filter finishes processing
// corresponding data. Those functions handle state updates and data storage (if needed)
// according to the status returned by filter's callback functions.
bool commonHandleAfter100ContinueHeadersCallback(FilterHeadersStatus status);
bool commonHandleAfterHeadersCallback(FilterHeadersStatus status, bool& headers_only);
void commonHandleBufferData(Buffer::Instance& provided_data);
bool commonHandleAfterDataCallback(FilterDataStatus status, Buffer::Instance& provided_data,
bool& buffer_was_streaming);
bool commonHandleAfterTrailersCallback(FilterTrailersStatus status);

// Buffers provided_data.
void commonHandleBufferData(Buffer::Instance& provided_data);

// If iteration has stopped for all frame types, calls this function to buffer the data before
// the filter processes data. The function also updates streaming state.
void commonBufferDataIfStopAll(Buffer::Instance& provided_data, bool& buffer_was_streaming);

void commonContinue();
virtual bool canContinue() PURE;
virtual Buffer::WatermarkBufferPtr createBuffer() PURE;
Expand All @@ -128,10 +138,35 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
Tracing::Span& activeSpan() override;
Tracing::Config& tracingConfig() override;

// Functions to set or get iteration state.
bool canIterate() { return iteration_state_ == IterationState::Continue; }
bool stoppedAll() {
return iteration_state_ == IterationState::StopAllBuffer ||
iteration_state_ == IterationState::StopAllWatermark;
}
void allowIteration() {
ASSERT(iteration_state_ != IterationState::Continue);
iteration_state_ = IterationState::Continue;
}

// The state of iteration.
enum class IterationState {
Continue, // Iteration has not stopped for any frame type.
StopSingleIteration, // Iteration has stopped for headers, 100-continue, or data.
StopAllBuffer, // Iteration has stopped for all frame types, and following data should
// be buffered.
StopAllWatermark, // Iteration has stopped for all frame types, and following data should
// be buffered until high watermark is reached.
};
IterationState iteration_state_;
// If the filter resumes iteration from a StopAllBuffer/Watermark state, the current filter
// hasn't parsed data and trailers. As a result, the filter iteration should start with the
// current filter instead of the next one. If true, filter iteration starts with the current
// filter. Otherwise, starts with the next filter in the chain.
bool iterate_from_current_filter_;
ActiveStream& parent_;
bool headers_continued_ : 1;
bool continue_headers_continued_ : 1;
bool stopped_ : 1;
// If true, end_stream is called for this filter.
bool end_stream_ : 1;
const bool dual_filter_ : 1;
Expand Down Expand Up @@ -164,7 +199,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
parent_.decodeHeaders(this, *parent_.request_headers_, end_stream);
}
void doData(bool end_stream) override {
parent_.decodeData(this, *parent_.buffered_request_data_, end_stream);
parent_.decodeData(this, *parent_.buffered_request_data_, end_stream,
ActiveStream::FilterIterationStartState::CanStartFromCurrent);
}
void doTrailers() override { parent_.decodeTrailers(this, *parent_.request_trailers_); }
const HeaderMapPtr& trailers() override { return parent_.request_trailers_; }
Expand Down Expand Up @@ -247,7 +283,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
parent_.encodeHeaders(this, *parent_.response_headers_, end_stream);
}
void doData(bool end_stream) override {
parent_.encodeData(this, *parent_.buffered_response_data_, end_stream);
parent_.encodeData(this, *parent_.buffered_response_data_, end_stream,
ActiveStream::FilterIterationStartState::CanStartFromCurrent);
}
void doTrailers() override { parent_.encodeTrailers(this, *parent_.response_trailers_); }
const HeaderMapPtr& trailers() override { return parent_.response_trailers_; }
Expand Down Expand Up @@ -290,16 +327,28 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
ActiveStream(ConnectionManagerImpl& connection_manager);
~ActiveStream();

// Indicates which filter to start the iteration with.
enum class FilterIterationStartState { AlwaysStartFromNext, CanStartFromCurrent };

void addStreamDecoderFilterWorker(StreamDecoderFilterSharedPtr filter, bool dual_filter);
void addStreamEncoderFilterWorker(StreamEncoderFilterSharedPtr filter, bool dual_filter);
void chargeStats(const HeaderMap& headers);
// Returns the encoder filter to start iteration with.
std::list<ActiveStreamEncoderFilterPtr>::iterator
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream);
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream,
FilterIterationStartState filter_iteration_start_state);
// Returns the decoder filter to start iteration with.
std::list<ActiveStreamDecoderFilterPtr>::iterator
commonDecodePrefix(ActiveStreamDecoderFilter* filter,
FilterIterationStartState filter_iteration_start_state);
const Network::Connection* connection();
void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming);
HeaderMap& addDecodedTrailers();
void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream);
void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream);
// Sends data through decoding filter chains. filter_iteration_start_state indicates which
// filter to start the iteration with.
void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream,
FilterIterationStartState filter_iteration_start_state);
void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers);
void disarmRequestTimeout();
void maybeEndDecode(bool end_stream);
Expand All @@ -311,11 +360,19 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
const absl::optional<Grpc::Status::GrpcStatus> grpc_status);
void encode100ContinueHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers);
void encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream);
void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream);
// Sends data through encoding filter chains. filter_iteration_start_state indicates which
// filter to start the iteration with.
void encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream,
FilterIterationStartState filter_iteration_start_state);
void encodeTrailers(ActiveStreamEncoderFilter* filter, HeaderMap& trailers);
void encodeMetadata(ActiveStreamEncoderFilter* filter, MetadataMapPtr&& metadata_map_ptr);
void maybeEndEncode(bool end_stream);
uint64_t streamId() { return stream_id_; }
// Returns true if filter has stopped iteration for all frame types. Otherwise, returns false.
// filter_streaming is the variable to indicate if stream is streaming, and its value may be
// changed by the function.
bool handleDataIfStopAll(ActiveStreamFilterBase& filter, Buffer::Instance& data,
bool& filter_streaming);

// Http::StreamCallbacks
void onResetStream(StreamResetReason reason,
Expand Down
Loading