Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Version history

1.8.0 (Pending)
===============
* http: response filters not applied to early error paths such as http_parser generated 400s.
* ratelimit: added support for :repo:`api/envoy/service/ratelimit/v2/rls.proto`.
Lyft's reference implementation of the `ratelimit <https://github.com/lyft/ratelimit>`_ service also supports the data-plane-api proto as of v1.1.0.
Envoy can use either proto to send client requests to a ratelimit server with the use of the
Expand Down
9 changes: 7 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder)
new_stream->response_encoder_ = &response_encoder;
new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit();
config_.filterFactory().createFilterChain(*new_stream);
// Make sure new streams are apprised that the underlying connection is blocked.
if (read_callbacks_->connection().aboveHighWatermark()) {
new_stream->callHighWatermarkCallbacks();
Expand Down Expand Up @@ -447,8 +446,10 @@ const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {
}

void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) {
maybeEndDecode(end_stream);
request_headers_ = std::move(headers);
createFilterChain();

maybeEndDecode(end_stream);

ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
*request_headers_);
Expand Down Expand Up @@ -1116,6 +1117,10 @@ void ConnectionManagerImpl::ActiveStream::setBufferLimit(uint32_t new_limit) {
}
}

void ConnectionManagerImpl::ActiveStream::createFilterChain() {
connection_manager_.config_.filterFactory().createFilterChain(*this);
}

void ConnectionManagerImpl::ActiveStreamFilterBase::commonContinue() {
// TODO(mattklein123): Raise an error if this is called during a callback.
if (!canContinue()) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,

// Possibly increases buffer_limit_ to the value of limit.
void setBufferLimit(uint32_t limit);
// Set up the Encoder/Decoder filter chain.
void createFilterChain();

ConnectionManagerImpl& connection_manager_;
Router::ConfigConstSharedPtr snapped_route_config_;
Expand Down
120 changes: 116 additions & 4 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1657,14 +1657,13 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamDisconnect) {
data.drain(2);
}));

setupFilterChain(1, 0);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// Kick off the incoming data.
Buffer::OwnedImpl fake_input("1234");
conn_manager_->onData(fake_input, false);

// Now raise a remote disconnection, we should see the filter get reset called.
EXPECT_CALL(*decoder_filters_[0], onDestroy());
conn_manager_->onEvent(Network::ConnectionEvent::RemoteClose);
}

Expand All @@ -1677,10 +1676,9 @@ TEST_F(HttpConnectionManagerImplTest, DownstreamProtocolError) {
throw CodecProtocolException("protocol error");
}));

setupFilterChain(1, 0);
EXPECT_CALL(filter_factory_, createFilterChain(_)).Times(0);

// A protocol exception should result in reset of the streams followed by a local close.
EXPECT_CALL(*decoder_filters_[0], onDestroy());
EXPECT_CALL(filter_callbacks_.connection_, close(Network::ConnectionCloseType::FlushWrite));

// Kick off the incoming data.
Expand Down Expand Up @@ -2225,6 +2223,120 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOn) {
decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks);
}

TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWithLazyCreation) {
setup(false, "");

// Make sure codec_ is created.
EXPECT_CALL(*codec_, dispatch(_));
Buffer::OwnedImpl fake_input("");
conn_manager_->onData(fake_input, false);

// Mark the connection manger as backed up before the stream is created.
ASSERT_EQ(decoder_filters_.size(), 0);
EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark());
conn_manager_->onAboveWriteBufferHighWatermark();

// Create the stream. Defer the creation of the filter chain by not sending
// complete headers.
StreamDecoder* decoder;
{
setUpBufferLimits();
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
decoder = &conn_manager_->newStream(response_encoder_);
}));

// Verify the high watermark is passed on.
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true));

// Send fake data to kick off newStream being created.
Buffer::OwnedImpl fake_input2("asdf");
conn_manager_->onData(fake_input2, false);
}

// Now set up the filter chain by sending full headers. The filters should be
// immediately appraised that the low watermark is in effect.
{
setupFilterChain(2, 2);
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).Times(0);
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}};
decoder->decodeHeaders(std::move(headers), true);
}));
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
Buffer::OwnedImpl data("hello");
decoder_filters_[0]->callbacks_->addDecodedData(data, true);
return FilterHeadersStatus::Continue;
}));
sendReqestHeadersAndData();
ASSERT_GE(decoder_filters_.size(), 1);
MockDownstreamWatermarkCallbacks callbacks;
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks);
}
}

TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksUnwoundWithLazyCreation) {
setup(false, "");

// Make sure codec_ is created.
EXPECT_CALL(*codec_, dispatch(_));
Buffer::OwnedImpl fake_input("");
conn_manager_->onData(fake_input, false);

// Mark the connection manger as backed up before the stream is created.
ASSERT_EQ(decoder_filters_.size(), 0);
EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark());
conn_manager_->onAboveWriteBufferHighWatermark();

// Create the stream. Defer the creation of the filter chain by not sending
// complete headers.
StreamDecoder* decoder;
{
setUpBufferLimits();
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
decoder = &conn_manager_->newStream(response_encoder_);
}));

// Verify the high watermark is passed on.
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true));

// Send fake data to kick off newStream being created.
Buffer::OwnedImpl fake_input2("asdf");
conn_manager_->onData(fake_input2, false);
}

// Now before the filter chain is created, fire the low watermark callbacks
// and ensure it is passed down to the stream.
ASSERT(stream_callbacks_ != nullptr);
EXPECT_CALL(*codec_, onUnderlyingConnectionBelowWriteBufferLowWatermark())
.WillOnce(Invoke([&]() -> void { stream_callbacks_->onBelowWriteBufferLowWatermark(); }));
conn_manager_->onBelowWriteBufferLowWatermark();

// Now set up the filter chain by sending full headers. The filters should
// not get any watermark callbacks.
{
setupFilterChain(2, 2);
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).Times(0);
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
HeaderMapPtr headers{new TestHeaderMapImpl{{":authority", "host"}, {":path", "/"}}};
decoder->decodeHeaders(std::move(headers), true);
}));
EXPECT_CALL(*decoder_filters_[0], decodeHeaders(_, true))
.WillOnce(InvokeWithoutArgs([&]() -> FilterHeadersStatus {
Buffer::OwnedImpl data("hello");
decoder_filters_[0]->callbacks_->addDecodedData(data, true);
return FilterHeadersStatus::Continue;
}));
sendReqestHeadersAndData();
ASSERT_GE(decoder_filters_.size(), 1);
MockDownstreamWatermarkCallbacks callbacks;
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()).Times(0);
EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()).Times(0);
decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks);
}
}

TEST_F(HttpConnectionManagerImplTest, AlterFilterWatermarkLimits) {
initial_buffer_limit_ = 100;
setup(false, "");
Expand Down