Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a604e38
filter flow control
alyssawilk Aug 9, 2017
4804be6
nobuffer
alyssawilk Aug 9, 2017
1040f67
Addressing review comments re fault filter, initializer, NoBuffer
alyssawilk Aug 9, 2017
647845a
camelCaseForAll
alyssawilk Aug 9, 2017
3a8cf94
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 10, 2017
a445e52
Work in progress rewrite
alyssawilk Aug 10, 2017
4f80dec
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 16, 2017
9267859
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 17, 2017
8ac29e1
filter return status
alyssawilk Aug 17, 2017
95de98b
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 21, 2017
ae42c02
opt-in filter callbacks. Still insufficient tests
alyssawilk Aug 21, 2017
f705fed
minor cleanup + TODO
alyssawilk Aug 21, 2017
f99ca46
router test, one of several necessary integration tests
alyssawilk Aug 21, 2017
d0a823a
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 23, 2017
7e5b7c1
connection manager unit tests
alyssawilk Aug 23, 2017
471fe64
integration tests
alyssawilk Aug 23, 2017
668932f
handling add[De|En|codedData
alyssawilk Aug 23, 2017
37d940e
renaming dynamo urls just to make config blocks clear
alyssawilk Aug 24, 2017
19aaf38
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 24, 2017
1292a58
doxygen fix
alyssawilk Aug 24, 2017
558fb59
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
9ac14b1
fixing merge format fail. bah! =P
alyssawilk Aug 28, 2017
6b5f615
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
220fcf3
addressing reviewer comments
alyssawilk Aug 28, 2017
9685ee9
hopefully all outstanding comments
alyssawilk Aug 30, 2017
a970125
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 30, 2017
1409730
addressing reviewer comments
alyssawilk Aug 31, 2017
eaed1ae
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 31, 2017
9bf5df7
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 7, 2017
0bead86
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 8, 2017
612eacb
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 11, 2017
3bfbc51
removing a deprecated line, adding tests that sendLocalReply works *s…
alyssawilk Sep 11, 2017
a94b527
fix format
alyssawilk Sep 11, 2017
be29d74
adding a hopefully helpful comment
alyssawilk Sep 11, 2017
0d534df
Removing a check because no, it is not needed
alyssawilk Sep 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ class Stream {
* @param disable informs if reads should be disabled (true) or re-enabled (false).
*/
virtual void readDisable(bool disable) PURE;

/*
* Return the number of bytes this stream is allowed to buffer, or 0 if there is no limit
* configured.
* @return the stream's configured buffer limits.
*/
virtual uint32_t bufferLimit() PURE;
};

/**
Expand Down
33 changes: 33 additions & 0 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
virtual void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE;
};

// FIXME alyssar doxygen.
enum class FilterType { STREAMING, BUFFERING };

struct BufferLimitSettings {
uint32_t buffer_limit_;
FilterType filter_type_;
};

/**
* Common base class for both decoder and encoder filters.
*/
Expand All @@ -246,6 +254,22 @@ class StreamFilterBase {
*/
class StreamDecoderFilter : public StreamFilterBase {
public:
/**
* This routine is called on filter creation, setting the buffer limit for the
* filter. Filters should abide by these limits unless custom configuration
* overrides the limit. A buffer limit of 0 bytes indicates no limits are applied.
*
* If filters buffer enough bytes to hit the high watermark, it should either
* call on[Encoder|Decoder]FilterAboveWriteBufferHighWatermark to halt the
* flow of data or send an error response such as 413 (Payload Too Large).
*
* If the filter will return StopIterationAndBuffer it may override the buffer
* limit in settings, or the filter type.
*
* @param settings supplies the default buffer limit and filter type for this filter.
*/
virtual void setDecoderBufferLimit(BufferLimitSettings& settings) PURE;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: It might be better to just return a struct here to make it more clear that the filter needs to tell the connection manager what it wants. We could even make the default constructor for the struct set sane defaults. I think for this type of struct the compiler will optimize well.


/**
* Called with decoded headers, optionally indicating end of stream.
* @param headers supplies the decoded headers map.
Expand Down Expand Up @@ -363,6 +387,15 @@ class StreamEncoderFilter : public StreamFilterBase {
* use. Callbacks will not be invoked by the filter after onDestroy() is called.
*/
virtual void setEncoderFilterCallbacks(StreamEncoderFilterCallbacks& callbacks) PURE;

/**
* This routine is called on filter creation, setting the buffer limit for the
* filter. Filters should abide by these limits unless custom configuration
* overrides the limit. A buffer limit of 0 bytes indicates no limits are applied.
*
* FIXME(alyssar) comment.
*/
virtual void setEncoderBufferLimit(BufferLimitSettings& settings) PURE;
};

typedef std::shared_ptr<StreamEncoderFilter> StreamEncoderFilterSharedPtr;
Expand Down
3 changes: 3 additions & 0 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class WatermarkBuffer : public LibEventInstance {
void postProcess() override { checkLowWatermark(); }

void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }

private:
void checkHighWatermark();
Expand All @@ -73,5 +74,7 @@ class WatermarkBuffer : public LibEventInstance {
bool above_high_watermark_called_{false};
};

typedef std::unique_ptr<WatermarkBuffer> WatermarkBufferPtr;

} // namespace Buffer
} // namespace Envoy
6 changes: 6 additions & 0 deletions source/common/dynamo/dynamo_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class DynamoFilter : public Http::StreamFilter {
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}
void setDecoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::BUFFERING;
}

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override;
Expand All @@ -44,6 +47,9 @@ class DynamoFilter : public Http::StreamFilter {
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override {
encoder_callbacks_ = &callbacks;
}
void setEncoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::BUFFERING;
}

private:
void onDecodeComplete(const Buffer::Instance& data);
Expand Down
3 changes: 3 additions & 0 deletions source/common/grpc/grpc_web_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <arpa/inet.h>

#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/common/utility.h"
#include "common/grpc/common.h"
Expand Down Expand Up @@ -90,6 +91,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool) {
decoding_buffer_.drain(decoding_buffer_.length());
decoding_buffer_.move(data);
data.add(decoded);
// Any block of 4 bytes or more should have been decoded and passed through.
ASSERT(decoding_buffer_.length() < 4);
return Http::FilterDataStatus::Continue;
}

Expand Down
9 changes: 8 additions & 1 deletion source/common/grpc/grpc_web_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable {
virtual ~GrpcWebFilter(){};

// Http::StreamFilterBase
void onDestroy() override{};
void onDestroy() override {}

// Implements StreamDecoderFilter.
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override;
Expand All @@ -32,6 +32,10 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable {
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}
void setDecoderBufferLimit(Http::BufferLimitSettings& settings) override {
// Ignore buffer limits: see ASSERT in decodeData: decoding_buffer_ buffers less than 4 bytes.
settings.filter_type_ = Http::FilterType::STREAMING;
}

// Implements StreamEncoderFilter.
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override;
Expand All @@ -40,6 +44,9 @@ class GrpcWebFilter : public Http::StreamFilter, NonCopyable {
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override {
encoder_callbacks_ = &callbacks;
}
void setEncoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::STREAMING;
}

private:
friend class GrpcWebFilterTest;
Expand Down
6 changes: 6 additions & 0 deletions source/common/grpc/http1_bridge_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Http1BridgeFilter : public Http::StreamFilter {
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}
void setDecoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::BUFFERING;
}

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Expand All @@ -37,6 +40,9 @@ class Http1BridgeFilter : public Http::StreamFilter {
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override {
encoder_callbacks_ = &callbacks;
}
void setEncoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::BUFFERING;
}

private:
void chargeStat(const Http::HeaderMap& headers);
Expand Down
6 changes: 6 additions & 0 deletions source/common/grpc/json_transcoder_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,18 @@ class JsonTranscoderFilter : public Http::StreamFilter, public Logger::Loggable<
Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override;
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override;
void setDecoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::STREAMING;
}

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap& trailers) override;
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override;
void setEncoderBufferLimit(Http::BufferLimitSettings& settings) override {
settings.filter_type_ = Http::FilterType::BUFFERING;
}

// Http::StreamFilterBase
void onDestroy() override { stream_reset_ = true; }
Expand Down
71 changes: 70 additions & 1 deletion source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream) {
}

void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
stream.destroyed_ = true;
for (auto& filter : stream.decoder_filters_) {
filter->handle_->onDestroy();
}
Expand All @@ -172,6 +173,7 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder)
ActiveStreamPtr new_stream(new ActiveStream(*this));
new_stream->response_encoder_ = &response_encoder;
new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit();
config_.filterFactory().createFilterChain(*new_stream);
new_stream->moveIntoList(std::move(new_stream), streams_);
return **streams_.begin();
Expand Down Expand Up @@ -367,13 +369,31 @@ void ConnectionManagerImpl::ActiveStream::addStreamDecoderFilterWorker(
StreamDecoderFilterSharedPtr filter, bool dual_filter) {
ActiveStreamDecoderFilterPtr wrapper(new ActiveStreamDecoderFilter(*this, filter, dual_filter));
filter->setDecoderFilterCallbacks(*wrapper);

BufferLimitSettings settings{buffer_limit_, Http::FilterType::STREAMING};
filter->setDecoderBufferLimit(settings);
// FIXME corner cases with 0 here and below.
if (settings.buffer_limit_ > buffer_limit_) {
buffer_limit_ = settings.buffer_limit_;
}
if (settings.filter_type_ == Http::FilterType::BUFFERING) {
decoder_filters_all_streaming_ = false;
}
wrapper->moveIntoListBack(std::move(wrapper), decoder_filters_);
}

void ConnectionManagerImpl::ActiveStream::addStreamEncoderFilterWorker(
StreamEncoderFilterSharedPtr filter, bool dual_filter) {
ActiveStreamEncoderFilterPtr wrapper(new ActiveStreamEncoderFilter(*this, filter, dual_filter));
filter->setEncoderFilterCallbacks(*wrapper);
BufferLimitSettings settings{buffer_limit_, Http::FilterType::STREAMING};
filter->setEncoderBufferLimit(settings);
if (settings.buffer_limit_ > buffer_limit_) {
buffer_limit_ = settings.buffer_limit_;
}
if (settings.filter_type_ == Http::FilterType::BUFFERING) {
encoder_filters_all_streaming_ = false;
}
wrapper->moveIntoListBack(std::move(wrapper), encoder_filters_);
}

Expand Down Expand Up @@ -966,7 +986,7 @@ void ConnectionManagerImpl::ActiveStreamFilterBase::commonHandleBufferData(
// rebuffer, because we assume the filter has modified the buffer as it wishes in place.
if (bufferedData().get() != &provided_data) {
if (!bufferedData()) {
bufferedData().reset(new Buffer::OwnedImpl());
bufferedData().reset(createBuffer().release());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: bufferedData() = createBuffer(); (might need std::move())

}
bufferedData()->move(provided_data);
}
Expand Down Expand Up @@ -1067,6 +1087,30 @@ void ConnectionManagerImpl::ActiveStreamDecoderFilter::
parent_.connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataTooLarge() {
if (parent_.encoder_filters_all_streaming_) {
onDecoderFilterAboveWriteBufferHighWatermark();
} else {
HeaderMapPtr response_headers{new HeaderMapImpl{

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use sendLocalReply here.

{Headers::get().Status, std::to_string(enumToInt(Http::Code::PayloadTooLarge))}}};
std::string body_text = CodeUtility::toString(Http::Code::PayloadTooLarge);
response_headers->insertContentLength().value(body_text.size());
response_headers->insertContentType().value(Headers::get().ContentTypeValues.Text);

encodeHeaders(std::move(response_headers), false);
if (!parent_.destroyed_) {
Buffer::OwnedImpl buffer(body_text);
encodeData(buffer, true);
}
}
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestDataDrained() {
if (parent_.encoder_filters_all_streaming_) {
onDecoderFilterBelowWriteBufferLowWatermark();
}
}

void ConnectionManagerImpl::ActiveStreamDecoderFilter::
onDecoderFilterBelowWriteBufferLowWatermark() {
ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", parent_);
Expand All @@ -1092,6 +1136,31 @@ void ConnectionManagerImpl::ActiveStreamEncoderFilter::

void ConnectionManagerImpl::ActiveStreamEncoderFilter::continueEncoding() { commonContinue(); }

void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataTooLarge() {
if (parent_.encoder_filters_all_streaming_) {
onEncoderFilterAboveWriteBufferHighWatermark();
} else {
// FIXME 500
HeaderMapPtr response_headers{new HeaderMapImpl{

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this case needs a bit of thinking. It is technically possible for the response to start, but then have buffering occur, so I think we need to check if response has started and if so reset, otherwise we can send a 500.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess one could argue this is filter bug. Not sure. (Like if filter continues headers but then buffers data it's really a streaming filter). Anyway needs a bit of thinking IMO.

{Headers::get().Status, std::to_string(enumToInt(Http::Code::InternalServerError))}}};
std::string body_text = CodeUtility::toString(Http::Code::InternalServerError);
response_headers->insertContentLength().value(body_text.size());
response_headers->insertContentType().value(Headers::get().ContentTypeValues.Text);

parent_.response_headers_ = std::move(response_headers);
parent_.encodeHeaders(nullptr, *parent_.response_headers_, false);
if (!parent_.destroyed_) {
Buffer::OwnedImpl buffer(body_text);
parent_.encodeData(nullptr, buffer, true);
}
}
}
void ConnectionManagerImpl::ActiveStreamEncoderFilter::responseDataDrained() {
if (parent_.encoder_filters_all_streaming_) {
onEncoderFilterBelowWriteBufferLowWatermark();
}
}

void ConnectionManagerImpl::ActiveStreamFilterBase::resetStream() {
parent_.connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc();
parent_.connection_manager_.doEndStream(this->parent_);
Expand Down
Loading