Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a604e38
filter flow control
alyssawilk Aug 9, 2017
4804be6
nobuffer
alyssawilk Aug 9, 2017
1040f67
Addressing review comments re fault filter, initializer, NoBuffer
alyssawilk Aug 9, 2017
647845a
camelCaseForAll
alyssawilk Aug 9, 2017
3a8cf94
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 10, 2017
a445e52
Work in progress rewrite
alyssawilk Aug 10, 2017
4f80dec
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 16, 2017
9267859
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 17, 2017
8ac29e1
filter return status
alyssawilk Aug 17, 2017
95de98b
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 21, 2017
ae42c02
opt-in filter callbacks. Still insufficient tests
alyssawilk Aug 21, 2017
f705fed
minor cleanup + TODO
alyssawilk Aug 21, 2017
f99ca46
router test, one of several necessary integration tests
alyssawilk Aug 21, 2017
d0a823a
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 23, 2017
7e5b7c1
connection manager unit tests
alyssawilk Aug 23, 2017
471fe64
integration tests
alyssawilk Aug 23, 2017
668932f
handling add[De|En|codedData
alyssawilk Aug 23, 2017
37d940e
renaming dynamo urls just to make config blocks clear
alyssawilk Aug 24, 2017
19aaf38
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 24, 2017
1292a58
doxygen fix
alyssawilk Aug 24, 2017
558fb59
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
9ac14b1
fixing merge format fail. bah! =P
alyssawilk Aug 28, 2017
6b5f615
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 28, 2017
220fcf3
addressing reviewer comments
alyssawilk Aug 28, 2017
9685ee9
hopefully all outstanding comments
alyssawilk Aug 30, 2017
a970125
Merge branch 'master' into h2-filter-flow-control
alyssawilk Aug 30, 2017
1409730
addressing reviewer comments
alyssawilk Aug 31, 2017
eaed1ae
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Aug 31, 2017
9bf5df7
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 7, 2017
0bead86
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 8, 2017
612eacb
Merge branch 'refs/heads/master' into h2-filter-flow-control
alyssawilk Sep 11, 2017
3bfbc51
removing a deprecated line, adding tests that sendLocalReply works *s…
alyssawilk Sep 11, 2017
a94b527
fix format
alyssawilk Sep 11, 2017
be29d74
adding a hopefully helpful comment
alyssawilk Sep 11, 2017
0d534df
Removing a check because no, it is not needed
alyssawilk Sep 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/cluster_manager/cluster_stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ Every cluster has a statistics tree rooted at *cluster.<name>.* with the followi
membership_change, Counter, Total cluster membership changes
membership_healthy, Gauge, Current cluster healthy total (inclusive of both health checking and outlier detection)
membership_total, Gauge, Current cluster membership total
retry_or_shadow_abandoned, Counter, Total number of times shadowing or retry buffering was canceled due to buffer limits.
update_attempt, Counter, Total cluster membership update attempts
update_success, Counter, Total cluster membership update successes
update_failure, Counter, Total cluster membership update failures
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/http_conn_man/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ statistics:
downstream_rq_rx_reset, Counter, Total request resets received
downstream_rq_tx_reset, Counter, Total request resets sent
downstream_rq_non_relative_path, Counter, Total requests with a non-relative HTTP path
downstream_rq_too_large, Counter, Total requests resulting in a 413 due to buffering an overly large body.
downstream_rq_2xx, Counter, Total 2xx responses
downstream_rq_3xx, Counter, Total 3xx responses
downstream_rq_4xx, Counter, Total 4xx responses
downstream_rq_5xx, Counter, Total 5xx responses
downstream_rq_ws_on_non_ws_route, Counter, Total WebSocket upgrade requests rejected by non WebSocket routes
downstream_rq_non_ws_on_ws_route, Counter, Total HTTP requests rejected by WebSocket enabled routes due to missing upgrade header
downstream_rq_time, Timer, Request time milliseconds
rs_too_large, Counter, Total response errors due to buffering an overly large body.

Per user agent statistics
-------------------------
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration/http_filters/buffer_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ with partial requests and high network latency.
}

max_request_bytes
*(required, integer)* The maximum request size that the filter will before before it stops
buffering and returns a 413 response.
*(required, integer)* The maximum request size that the filter will before the connection manager
will stop buffering and return a 413 response.

max_request_time_s
*(required, integer)* The maximum amount of time that the filter will wait for a complete request
Expand All @@ -37,4 +37,3 @@ prefix <config_http_conn_man_stat_prefix>` comes from the owning HTTP connection
:widths: 1, 1, 2

rq_timeout, Counter, Total requests that timed out waiting for a full request
rq_too_large, Counter, Total requests that failed due to being too large
7 changes: 7 additions & 0 deletions include/envoy/http/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ class Stream {
* @param disable informs if reads should be disabled (true) or re-enabled (false).
*/
virtual void readDisable(bool disable) PURE;

/*
* Return the number of bytes this stream is allowed to buffer, or 0 if there is no limit
* configured.
* @return uint32_t the stream's configured buffer limits.
*/
virtual uint32_t bufferLimit() PURE;
};

/**
Expand Down
59 changes: 57 additions & 2 deletions include/envoy/http/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,24 @@ enum class FilterDataStatus {
// Do not iterate to any of the remaining filters in the chain, and buffer body data for later
// dispatching. Returning FilterDataStatus::Continue from decodeData()/encodeData() or calling
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired.
//
// This should be called by filters which must parse a larger block of the incoming data before
// continuing processing and so can not push back on streaming data via watermarks.
//
// If buffering the request causes buffered data to exceed the configured buffer limit, a 413 will
// be sent to the user. On the response path exceeding buffer limits will result in a 500.
StopIterationAndBuffer,
// Do not iterate to any of the remaining filters in the chain, and buffer body data for later
// dispatching. Returning FilterDataStatus::Continue from decodeData()/encodeData() or calling
// continueDecoding()/continueEncoding() MUST be called if continued filter iteration is desired.
//
// This will cause the flow of incoming data to cease until one of the continue.*() functions is
// called.
//
// This should be returned by filters which can nominally stream data but have a transient back-up
// such as the configured delay of the fault filter, or if the router filter is still fetching an
// upstream connection.
StopIterationAndWatermark,
// Do not iterate to any of the remaining filters in the chain, but do not buffer any of the
// body data for later dispatching. Returning FilterDataStatus::Continue from
// decodeData()/encodeData() or calling continueDecoding()/continueEncoding() MUST be called if
Expand Down Expand Up @@ -163,8 +180,11 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
* followed by decodeTrailers().
*
* It is an error to call this method in any other case.
*
* @param data Buffer::Instance supplies the data to be decoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addDecodedData(Buffer::Instance& data) PURE;
virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Called with headers to be encoded, optionally indicating end of stream.
Expand Down Expand Up @@ -222,6 +242,22 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks {
* It is not safe to call this from under the stack of a DownstreamWatermarkCallbacks callback.
*/
virtual void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE;

/**
* This routine may be called to change the buffer limit for decoder filters.
*
* @param boolean supplies the desired buffer limit.
*/
virtual void setDecoderBufferLimit(uint32_t limit) PURE;

/**
* This routine returns the current buffer limit for decoder filters. Filters should abide by
* this limit or change it via setDecoderBufferLimit.
* A buffer limit of 0 bytes indicates no limits are applied.
*
* @return the buffer limit the filter should apply.
*/
virtual uint32_t decoderBufferLimit() PURE;
};

/**
Expand Down Expand Up @@ -319,8 +355,11 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
* followed by encodeTrailers().
*
* It is an error to call this method in any other case.
*
* @param data Buffer::Instance supplies the data to be encoded.
* @param streaming_filter boolean supplies if this filter streams data or buffers the full body.
*/
virtual void addEncodedData(Buffer::Instance& data) PURE;
virtual void addEncodedData(Buffer::Instance& data, bool streaming_filter) PURE;

/**
* Called when an encoder filter goes over its high watermark.
Expand All @@ -331,6 +370,22 @@ class StreamEncoderFilterCallbacks : public virtual StreamFilterCallbacks {
* Called when a encoder filter goes from over its high watermark to under its low watermark.
*/
virtual void onEncoderFilterBelowWriteBufferLowWatermark() PURE;

/**
* This routine may be called to change the buffer limit for encoder filters.
*
* @limit settings supplies the desired buffer limit.
*/
virtual void setEncoderBufferLimit(uint32_t limit) PURE;

/**
* This routine returns the current buffer limit for encoder filters. Filters should abide by
* this limit or change it via setEncoderBufferLimit.
* A buffer limit of 0 bytes indicates no limits are applied.
*
* @return the buffer limit the filter should apply.
*/
virtual uint32_t encoderBufferLimit() PURE;
};

/**
Expand Down
1 change: 1 addition & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class HostSet {
COUNTER(membership_change) \
GAUGE (membership_healthy) \
GAUGE (membership_total) \
COUNTER(retry_or_shadow_abandoned) \
COUNTER(update_attempt) \
COUNTER(update_success) \
COUNTER(update_failure)
Expand Down
5 changes: 3 additions & 2 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ int WatermarkBuffer::write(int fd) {
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
ASSERT(low_watermark < high_watermark);
ASSERT(low_watermark < high_watermark || (high_watermark == 0 && low_watermark == 0));
low_watermark_ = low_watermark;
high_watermark_ = high_watermark;
checkHighWatermark();
checkLowWatermark();
}

void WatermarkBuffer::checkLowWatermark() {
if (!above_high_watermark_called_ || OwnedImpl::length() >= low_watermark_) {
if (!above_high_watermark_called_ ||
(high_watermark_ != 0 && OwnedImpl::length() >= low_watermark_)) {
return;
}

Expand Down
9 changes: 4 additions & 5 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ class WatermarkBuffer : public OwnedImpl {
int write(int fd) override;
void postProcess() override { checkLowWatermark(); }

void setWatermarks(uint32_t watermark) {
if (watermark != 0) {
setWatermarks(watermark / 2, watermark);
}
}
void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
void setWatermarks(uint32_t low_watermark, uint32_t high_watermark);
uint32_t highWatermark() const { return high_watermark_; }

private:
void checkHighWatermark();
Expand All @@ -57,6 +54,8 @@ class WatermarkBuffer : public OwnedImpl {
bool above_high_watermark_called_{false};
};

typedef std::unique_ptr<WatermarkBuffer> WatermarkBufferPtr;

class WatermarkBufferFactory : public WatermarkFactory {
public:
// Buffer::WatermarkFactory
Expand Down
2 changes: 2 additions & 0 deletions source/common/dynamo/dynamo_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Http::FilterDataStatus DynamoFilter::decodeData(Buffer::Instance& data, bool end
if (end_stream) {
return Http::FilterDataStatus::Continue;
} else {
// Buffer until the complete request has been processed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I'm also happy to remove all the boilerplate comments once you've taken a look, Matt, I just wanted a way to call all unchanged returns of StopIterationAndBuffer to your attention. Let me know when/if you want them gone.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't referring to this (comment is fine with me if you want it), but the extra boilerplate functions every filter needs to implement, which I think we have agreed will now be opt-in. 👍

return Http::FilterDataStatus::StopIterationAndBuffer;
}
}
Expand Down Expand Up @@ -110,6 +111,7 @@ Http::FilterDataStatus DynamoFilter::encodeData(Buffer::Instance& data, bool end
if (end_stream) {
return Http::FilterDataStatus::Continue;
} else {
// Buffer until the complete response has been processed.
return Http::FilterDataStatus::StopIterationAndBuffer;
}
}
Expand Down
7 changes: 5 additions & 2 deletions source/common/grpc/grpc_web_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <arpa/inet.h>

#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/common/empty_string.h"
#include "common/common/utility.h"
Expand Down Expand Up @@ -103,6 +104,8 @@ Http::FilterDataStatus GrpcWebFilter::decodeData(Buffer::Instance& data, bool) {
decoding_buffer_.drain(decoding_buffer_.length());
decoding_buffer_.move(data);
data.add(decoded);
// Any block of 4 bytes or more should have been decoded and passed through.
ASSERT(decoding_buffer_.length() < 4);
return Http::FilterDataStatus::Continue;
}

Expand Down Expand Up @@ -189,9 +192,9 @@ Http::FilterTrailersStatus GrpcWebFilter::encodeTrailers(Http::HeaderMap& traile
buffer.move(temp);
if (is_text_response_) {
Buffer::OwnedImpl encoded(Base64::encode(buffer, buffer.length()));
encoder_callbacks_->addEncodedData(encoded);
encoder_callbacks_->addEncodedData(encoded, true);
} else {
encoder_callbacks_->addEncodedData(buffer);
encoder_callbacks_->addEncodedData(buffer, true);
}
return Http::FilterTrailersStatus::Continue;
}
Expand Down
1 change: 1 addition & 0 deletions source/common/grpc/http1_bridge_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Http::FilterDataStatus Http1BridgeFilter::encodeData(Buffer::Instance&, bool end
if (!do_bridging_ || end_stream) {
return Http::FilterDataStatus::Continue;
} else {
// Buffer until the complete request has been processed.
return Http::FilterDataStatus::StopIterationAndBuffer;
}
}
Expand Down
7 changes: 4 additions & 3 deletions source/common/grpc/json_transcoder_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ Http::FilterHeadersStatus JsonTranscoderFilter::decodeHeaders(Http::HeaderMap& h
readToBuffer(*transcoder_->RequestOutput(), data);

if (data.length() > 0) {
decoder_callbacks_->addDecodedData(data);
decoder_callbacks_->addDecodedData(data, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is TODO(lizan) somewhere in here about using watermarks to bound buffer size. I think that is done with this change so we can remove todo? cc @lizan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn't sure if the transcoder was infinite buffering under the hood, so I figured @lizan could remove the TODO after checking

}
}
return Http::FilterHeadersStatus::Continue;
Expand Down Expand Up @@ -273,7 +273,7 @@ Http::FilterTrailersStatus JsonTranscoderFilter::decodeTrailers(Http::HeaderMap&
readToBuffer(*transcoder_->RequestOutput(), data);

if (data.length()) {
decoder_callbacks_->addDecodedData(data);
decoder_callbacks_->addDecodedData(data, true);
}
return Http::FilterTrailersStatus::Continue;
}
Expand Down Expand Up @@ -311,6 +311,7 @@ Http::FilterDataStatus JsonTranscoderFilter::encodeData(Buffer::Instance& data,
readToBuffer(*transcoder_->ResponseOutput(), data);

if (!method_->server_streaming()) {
// Buffer until the response is complete.
return Http::FilterDataStatus::StopIterationAndBuffer;
}
// TODO(lizan): Check ResponseStatus
Expand All @@ -329,7 +330,7 @@ Http::FilterTrailersStatus JsonTranscoderFilter::encodeTrailers(Http::HeaderMap&
readToBuffer(*transcoder_->ResponseOutput(), data);

if (data.length()) {
encoder_callbacks_->addEncodedData(data);
encoder_callbacks_->addEncodedData(data, true);
}

if (method_->server_streaming()) {
Expand Down
4 changes: 3 additions & 1 deletion source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class AsyncStreamImpl : public AsyncClient::Stream,
Tracing::Span& activeSpan() override { return active_span_; }
const std::string& downstreamAddress() override { return EMPTY_STRING; }
void continueDecoding() override { NOT_IMPLEMENTED; }
void addDecodedData(Buffer::Instance&) override { NOT_IMPLEMENTED; }
void addDecodedData(Buffer::Instance&, bool) override { NOT_IMPLEMENTED; }
const Buffer::Instance* decodingBuffer() override {
throw EnvoyException("buffering is not supported in streaming");
}
Expand All @@ -201,6 +201,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
void onDecoderFilterBelowWriteBufferLowWatermark() override {}
void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
void setDecoderBufferLimit(uint32_t) override {}
uint32_t decoderBufferLimit() override { return 0; }

AsyncClient::StreamCallbacks& stream_callbacks_;
const uint64_t stream_id_;
Expand Down
Loading