Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,17 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Adds decoded trailers. May only be called in decodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
*
* When called in decodeData, the trailers map will be initialized to an empty map and returned by
* reference. Calling this function more than once is invalid.
*
* @return a reference to the newly created trailers map.
*/
virtual HeaderMap& addDecodedTrailers() PURE;

/**
* Create a locally generated response using the provided response_code and body_text parameters.
* If the request was a gRPC request the local reply will be encoded as a gRPC response with a 200
Expand Down Expand Up @@ -395,6 +406,17 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
*/
virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Adds encoded trailers. May only be called in encodeData when end_stream is set to true.
* If called in any other context, an assertion will be triggered.
*
* When called in encodeData, the trailers map will be initialized to an empty map and returned by
* reference. Calling this function more than once is invalid.
*
* @return a reference to the newly created trailers map.
*/
virtual HeaderMap& addEncodedTrailers() PURE;

/**
* Called when an encoder filter goes over its high watermark.
*/
Expand Down
1 change: 1 addition & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Tracing::Span& activeSpan() override { return active_span_; }
const Tracing::Config& tracingConfig() override { return tracing_config_; }
void continueDecoding() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
HeaderMap& addDecodedTrailers() override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
void sendLocalReply(Code code, const std::string& body,
Expand Down
96 changes: 91 additions & 5 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,8 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*
}

std::list<ActiveStreamDecoderFilterPtr>::iterator entry;
auto trailers_added_entry = decoder_filters_.end();
const bool trailers_exists_at_start = request_trailers_ != nullptr;
if (!filter) {
entry = decoder_filters_.begin();
} else {
Expand All @@ -775,15 +777,52 @@ void ConnectionManagerImpl::ActiveStream::decodeData(ActiveStreamDecoderFilter*

for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeData));

// We check the request_trailers_ pointer here in case addDecodedTrailers
// is called in decodeData during a previous filter invocation, at which point we communicate to
// the current and future filters that the stream has not yet ended.
if (end_stream) {
state_.filter_call_state_ |= FilterCallState::LastDataFrame;
}
state_.filter_call_state_ |= FilterCallState::DecodeData;
FilterDataStatus status = (*entry)->handle_->decodeData(data, end_stream);
FilterDataStatus status = (*entry)->handle_->decodeData(data, end_stream && !request_trailers_);
state_.filter_call_state_ &= ~FilterCallState::DecodeData;
if (end_stream) {
state_.filter_call_state_ &= ~FilterCallState::LastDataFrame;
}
ENVOY_STREAM_LOG(trace, "decode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));
if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_)) {

if (!trailers_exists_at_start && request_trailers_ &&
trailers_added_entry == decoder_filters_.end()) {
trailers_added_entry = entry;
}

if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.decoder_filters_streaming_) &&
std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
// a previous filter has added trailers.
return;
}
}

// If trailers were adding during decodeData we need to trigger decodeTrailers in order
// to allow filters to process the trailers.
if (trailers_added_entry != decoder_filters_.end()) {
decodeTrailers(trailers_added_entry->get(), *request_trailers_);
}
}

HeaderMap& ConnectionManagerImpl::ActiveStream::addDecodedTrailers() {
// Trailers can only be added during the last data frame (i.e. end_stream = true).
ASSERT(state_.filter_call_state_ & FilterCallState::LastDataFrame);

// Trailers can only be added once.
ASSERT(!request_trailers_);

request_trailers_ = std::make_unique<HeaderMapImpl>();
return *request_trailers_;
}

void ConnectionManagerImpl::ActiveStream::addDecodedData(ActiveStreamDecoderFilter& filter,
Expand Down Expand Up @@ -1058,6 +1097,17 @@ void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilte
}
}

HeaderMap& ConnectionManagerImpl::ActiveStream::addEncodedTrailers() {
// Trailers can only be added during the last data frame (i.e. end_stream = true).
ASSERT(state_.filter_call_state_ & FilterCallState::LastDataFrame);

// Trailers can only be added once.
ASSERT(!response_trailers_);

response_trailers_ = std::make_unique<HeaderMapImpl>();
return *response_trailers_;
}

void ConnectionManagerImpl::ActiveStream::addEncodedData(ActiveStreamEncoderFilter& filter,
Buffer::Instance& data, bool streaming) {
if (state_.filter_call_state_ == 0 ||
Expand All @@ -1083,13 +1133,33 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter*
Buffer::Instance& data, bool end_stream) {
resetIdleTimer();
std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream);
auto trailers_added_entry = encoder_filters_.end();

const bool trailers_exists_at_start = response_trailers_ != nullptr;
for (; entry != encoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeData));

// We check the response_trailers_ pointer here in case addEncodedTrailers
// is called in encodeData during a previous filter invocation, at which point we communicate to
// the current and future filters that the stream has not yet ended.
state_.filter_call_state_ |= FilterCallState::EncodeData;
FilterDataStatus status = (*entry)->handle_->encodeData(data, end_stream);
if (end_stream) {
state_.filter_call_state_ |= FilterCallState::LastDataFrame;
}
FilterDataStatus status =
(*entry)->handle_->encodeData(data, end_stream && !response_trailers_);
state_.filter_call_state_ &= ~FilterCallState::EncodeData;
if (end_stream) {
state_.filter_call_state_ &= ~FilterCallState::LastDataFrame;
}
ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this,
static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status));

if (!trailers_exists_at_start && response_trailers_ &&
trailers_added_entry == encoder_filters_.end()) {
trailers_added_entry = entry;
}

if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) {
return;
}
Expand All @@ -1099,8 +1169,16 @@ void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter*
end_stream);

request_info_.addBytesSent(data.length());
response_encoder_->encodeData(data, end_stream);
maybeEndEncode(end_stream);

// If trailers were adding during encodeData we need to trigger decodeTrailers in order
// to allow filters to process the trailers.
if (trailers_added_entry != encoder_filters_.end()) {
response_encoder_->encodeData(data, false);
encodeTrailers(trailers_added_entry->get(), *response_trailers_);
} else {
response_encoder_->encodeData(data, end_stream);
maybeEndEncode(end_stream);
}
}

void ConnectionManagerImpl::ActiveStream::encodeTrailers(ActiveStreamEncoderFilter* filter,
Expand Down Expand Up @@ -1380,6 +1458,10 @@ Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamDecoderFilter::cre
return buffer;
}

HeaderMap& ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedTrailers() {
return parent_.addDecodedTrailers();
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::addDecodedData(Buffer::Instance& data,
bool streaming) {
parent_.addDecodedData(*this, data, streaming);
Expand Down Expand Up @@ -1473,6 +1555,10 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedData(Buffer::In
return parent_.addEncodedData(*this, data, streaming);
}

HeaderMap& ConnectionManagerImpl::ActiveStreamEncoderFilter::addEncodedTrailers() {
return parent_.addEncodedTrailers();
}

void ConnectionManagerImpl::ActiveStreamEncoderFilter::
onEncoderFilterAboveWriteBufferHighWatermark() {
ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to filter callbacks.", parent_);
Expand Down
7 changes: 7 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamDecoderFilterCallbacks
void addDecodedData(Buffer::Instance& data, bool streaming) override;
HeaderMap& addDecodedTrailers() override;
void continueDecoding() override;
const Buffer::Instance* decodingBuffer() override {
return parent_.buffered_request_data_.get();
Expand Down Expand Up @@ -229,6 +230,7 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Http::StreamEncoderFilterCallbacks
void addEncodedData(Buffer::Instance& data, bool streaming) override;
HeaderMap& addEncodedTrailers() override;
void onEncoderFilterAboveWriteBufferHighWatermark() override;
void onEncoderFilterBelowWriteBufferLowWatermark() override;
void setEncoderBufferLimit(uint32_t limit) override { parent_.setBufferLimit(limit); }
Expand Down Expand Up @@ -267,11 +269,13 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream);
const Network::Connection* connection();
void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming);
HeaderMap& addDecodedTrailers();
void decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream);
void decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data, bool end_stream);
void decodeTrailers(ActiveStreamDecoderFilter* filter, HeaderMap& trailers);
void maybeEndDecode(bool end_stream);
void addEncodedData(ActiveStreamEncoderFilter& filter, Buffer::Instance& data, bool streaming);
HeaderMap& addEncodedTrailers();
void sendLocalReply(bool is_grpc_request, Code code, const std::string& body,
std::function<void(HeaderMap& headers)> modify_headers);
void encode100ContinueHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers);
Expand Down Expand Up @@ -339,6 +343,9 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
// to verify we do not encode100Continue headers more than once per
// filter.
static constexpr uint32_t Encode100ContinueHeaders = 0x40;
// Used to indicate that we're processing the final [En|De]codeData frame,
// i.e. end_stream = true
static constexpr uint32_t LastDataFrame = 0x80;
};
// clang-format on

Expand Down
Loading