Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ StreamDecoder& ConnectionManagerImpl::newStream(StreamEncoder& response_encoder)
new_stream->response_encoder_ = &response_encoder;
new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
new_stream->buffer_limit_ = new_stream->response_encoder_->getStream().bufferLimit();
// Make sure new streams are apprised that the underlying connection is blocked.
if (read_callbacks_->connection().aboveHighWatermark()) {
new_stream->callHighWatermarkCallbacks();
}
// If the network connection is backed up, the stream should be made aware of it on creation.
// Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacks_.
ASSERT(read_callbacks_->connection().aboveHighWatermark() == false ||
new_stream->high_watermark_count_ > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused when/how high_watermark_count_ gets incremented in this call chain in order to make this assert true. From looking at the HTTP/2 codec it looks like we run high watermark callbacks in ServerConnectionImpl::onBeginHeaders before we call newStream. Wouldn't we have to run the callbacks instead when we call addCallbacks for this to work properly? I'm probably missing something here. If so can you add more comments?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I can interject, the callback are in fact run in addCallbacks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I see it, thanks. Can we potentially clarify the comment? This makes sense. I will take a look at the tests now.

new_stream->moveIntoList(std::move(new_stream), streams_);
return **streams_.begin();
}
Expand Down
6 changes: 6 additions & 0 deletions source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ namespace Http1 {
const std::string StreamEncoderImpl::CRLF = "\r\n";
const std::string StreamEncoderImpl::LAST_CHUNK = "0\r\n\r\n";

StreamEncoderImpl::StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above about when this call happens in relation to the newStream() call.

if (connection_.connection().aboveHighWatermark()) {
runHighWatermarkCallbacks();
}
}

void StreamEncoderImpl::encodeHeader(const char* key, uint32_t key_size, const char* value,
uint32_t value_size) {

Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class StreamEncoderImpl : public StreamEncoder,
void isResponseToHeadRequest(bool value) { is_response_to_head_request_ = value; }

protected:
StreamEncoderImpl(ConnectionImpl& connection) : connection_(connection) {}
StreamEncoderImpl(ConnectionImpl& connection);

static const std::string CRLF;
static const std::string LAST_CHUNK;
Expand Down
4 changes: 4 additions & 0 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "absl/types/optional.h"

namespace Envoy {
class TestPauseFilter;

namespace Network {

/**
Expand Down Expand Up @@ -149,6 +151,8 @@ class ConnectionImpl : public virtual Connection,
Event::FileEventPtr file_event_;

private:
friend class ::Envoy::TestPauseFilter;

void onFileEvent(uint32_t events);
void onRead(uint64_t read_buffer_size);
void onReadReady();
Expand Down
82 changes: 4 additions & 78 deletions test/common/http/conn_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2722,78 +2722,6 @@ TEST_F(HttpConnectionManagerImplTest, UpstreamWatermarkCallbacks) {
HeaderMapPtr{new TestHeaderMapImpl{{":status", "200"}}}, true);
}

TEST_F(HttpConnectionManagerImplTest, DownstreamWatermarkCallbacks) {
setup(false, "");
setUpEncoderAndDecoder();
sendReqestHeadersAndData();

// Test what happens when there are no subscribers.
conn_manager_->onAboveWriteBufferHighWatermark();
conn_manager_->onBelowWriteBufferLowWatermark();

// The connection manger will outlive callbacks but never reference them once deleted.
MockDownstreamWatermarkCallbacks callbacks;

// Network::Connection callbacks are passed through the codec
ASSERT(decoder_filters_[0]->callbacks_ != nullptr);
EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark());
conn_manager_->onAboveWriteBufferHighWatermark();
EXPECT_CALL(*codec_, onUnderlyingConnectionBelowWriteBufferLowWatermark());
conn_manager_->onBelowWriteBufferLowWatermark();

// Now add a watermark subscriber and make sure both the high and low watermark callbacks are
// propogated.
ASSERT_NE(0, decoder_filters_.size());
decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks);
// Make sure encoder filter callbacks are propogated to the watermark subscriber.
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
encoder_filters_[0]->callbacks_->onEncoderFilterAboveWriteBufferHighWatermark();
EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark());
encoder_filters_[0]->callbacks_->onEncoderFilterBelowWriteBufferLowWatermark();

ASSERT(stream_callbacks_ != nullptr);
// Finally make sure that watermark events on the downstream stream are passed to the watermark
// subscriber.
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
stream_callbacks_->onAboveWriteBufferHighWatermark();
EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark());
stream_callbacks_->onBelowWriteBufferLowWatermark();

// Set things up so the callbacks have been called twice.
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
stream_callbacks_->onAboveWriteBufferHighWatermark();
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
encoder_filters_[0]->callbacks_->onEncoderFilterAboveWriteBufferHighWatermark();

// Now unsubscribe and verify no further callbacks are called.
EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()).Times(0);
decoder_filters_[0]->callbacks_->removeDownstreamWatermarkCallbacks(callbacks);
}

TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOn) {
setup(false, "");

// Make sure codec_ is created.
EXPECT_CALL(*codec_, dispatch(_));
Buffer::OwnedImpl fake_input("");
conn_manager_->onData(fake_input, false);

// Mark the connection manger as backed up before the stream is created.
ASSERT_EQ(decoder_filters_.size(), 0);
EXPECT_CALL(*codec_, onUnderlyingConnectionAboveWriteBufferHighWatermark());
conn_manager_->onAboveWriteBufferHighWatermark();

// Now when the stream is created, it should be informed of the connection
// callbacks immediately.
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true));
setUpEncoderAndDecoder();
sendReqestHeadersAndData();
ASSERT_GE(decoder_filters_.size(), 1);
MockDownstreamWatermarkCallbacks callbacks;
EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
decoder_filters_[0]->callbacks_->addDownstreamWatermarkCallbacks(callbacks);
}

TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWithLazyCreation) {
setup(false, "");

Expand All @@ -2814,11 +2742,10 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksPassedOnWith
setUpBufferLimits();
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
decoder = &conn_manager_->newStream(response_encoder_);
// Call the high buffer callbacks as the codecs do.
stream_callbacks_->onAboveWriteBufferHighWatermark();
}));

// Verify the high watermark is passed on.
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true));

// Send fake data to kick off newStream being created.
Buffer::OwnedImpl fake_input2("asdf");
conn_manager_->onData(fake_input2, false);
Expand Down Expand Up @@ -2868,11 +2795,10 @@ TEST_F(HttpConnectionManagerImplTest, UnderlyingConnectionWatermarksUnwoundWithL
setUpBufferLimits();
EXPECT_CALL(*codec_, dispatch(_)).WillOnce(Invoke([&](Buffer::Instance&) -> void {
decoder = &conn_manager_->newStream(response_encoder_);
// Call the high buffer callbacks as the codecs do.
stream_callbacks_->onAboveWriteBufferHighWatermark();
}));

// Verify the high watermark is passed on.
EXPECT_CALL(filter_callbacks_.connection_, aboveHighWatermark()).WillOnce(Return(true));

// Send fake data to kick off newStream being created.
Buffer::OwnedImpl fake_input2("asdf");
conn_manager_->onData(fake_input2, false);
Expand Down
13 changes: 13 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "pass_through_filter_lib",
srcs = [
"pass_through_filter.h",
],
deps = [
"//include/envoy/http:filter_interface",
"//include/envoy/registry",
],
)

envoy_cc_test_library(
name = "add_trailers_filter_config_lib",
srcs = [
Expand Down Expand Up @@ -216,8 +227,10 @@ envoy_cc_test_library(
deps = [
":add_trailers_filter_config_lib",
":integration_lib",
":pass_through_filter_lib",
":test_host_predicate_lib",
"//include/envoy/event:timer_interface",
"//source/common/common:thread_annotations",
"//source/extensions/filters/http/router:config",
"//source/extensions/filters/network/http_connection_manager:config",
"//test/common/upstream:utility_lib",
Expand Down
2 changes: 2 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ TEST_P(Http2IntegrationTest, RouterUpstreamResponseBeforeRequestComplete) {

TEST_P(Http2IntegrationTest, TwoRequests) { testTwoRequests(); }

TEST_P(Http2IntegrationTest, TwoRequestsWithForcedBackup) { testTwoRequests(true); }

TEST_P(Http2IntegrationTest, Retry) { testRetry(); }

TEST_P(Http2IntegrationTest, EnvoyHandling100Continue) { testEnvoyHandling100Continue(); }
Expand Down
94 changes: 93 additions & 1 deletion test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
#include "envoy/buffer/buffer.h"
#include "envoy/event/dispatcher.h"
#include "envoy/http/header_map.h"
#include "envoy/registry/registry.h"

#include "common/api/api_impl.h"
#include "common/buffer/buffer_impl.h"
#include "common/common/fmt.h"
#include "common/common/thread_annotations.h"
#include "common/http/headers.h"
#include "common/network/connection_impl.h"
#include "common/network/utility.h"
#include "common/protobuf/utility.h"
#include "common/upstream/upstream_impl.h"

#include "extensions/filters/http/common/empty_http_filter_config.h"

#include "test/common/upstream/utility.h"
#include "test/integration/autonomous_upstream.h"
#include "test/integration/pass_through_filter.h"
#include "test/integration/test_host_predicate_config.h"
#include "test/integration/utility.h"
#include "test/mocks/upstream/mocks.h"
Expand Down Expand Up @@ -1275,7 +1280,94 @@ void HttpIntegrationTest::testUpstreamDisconnectWithTwoRequests() {
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_200", 2);
}

void HttpIntegrationTest::testTwoRequests() {
// This filter exists to synthetically test network backup by faking TCP
// connection back-up when an encode is finished, and unblocking it when the
// next stream starts to decode headers.
// Allows regression tests for https://github.com/envoyproxy/envoy/issues/4541
// TODO(alyssawilk) move this somewhere general and turn it up for more tests.
class TestPauseFilter : public PassThroughFilter {
public:
// Pass in a some global filter state to ensure the Network::Connection is
// blocked and unblocked exactly once.
TestPauseFilter(absl::Mutex& encode_lock, uint32_t& number_of_encode_calls_ref,
uint32_t& number_of_decode_calls_ref)
: encode_lock_(encode_lock), number_of_encode_calls_ref_(number_of_encode_calls_ref),
number_of_decode_calls_ref_(number_of_decode_calls_ref) {}

Http::FilterDataStatus decodeData(Buffer::Instance& buf, bool end_stream) override {
if (end_stream) {
absl::WriterMutexLock m(&encode_lock_);
number_of_decode_calls_ref_++;
if (number_of_decode_calls_ref_ == 2) {
// If this is the second stream to decode headers, force low watermark state.
connection()->onLowWatermark();
}
}
return PassThroughFilter::decodeData(buf, end_stream);
}

Http::FilterDataStatus encodeData(Buffer::Instance& buf, bool end_stream) override {
if (end_stream) {
absl::WriterMutexLock m(&encode_lock_);
number_of_encode_calls_ref_++;
if (number_of_encode_calls_ref_ == 1) {
// If this is the first stream to encode headers, force high watermark state.
connection()->onHighWatermark();
}
}
return PassThroughFilter::encodeData(buf, end_stream);
}

Network::ConnectionImpl* connection() {
// As long as we're doing horrible things let's do *all* the horrible things.
// Assert the connection we have is a ConnectionImpl and const cast it so we
// can force watermark changes.
auto conn_impl = dynamic_cast<const Network::ConnectionImpl*>(decoder_callbacks_->connection());
return const_cast<Network::ConnectionImpl*>(conn_impl);
}

absl::Mutex& encode_lock_;
uint32_t& number_of_encode_calls_ref_;
uint32_t& number_of_decode_calls_ref_;
};

class TestPauseFilterConfig : public Extensions::HttpFilters::Common::EmptyHttpFilterConfig {
public:
TestPauseFilterConfig() : EmptyHttpFilterConfig("pause-filter") {}

Http::FilterFactoryCb createFilter(const std::string&, Server::Configuration::FactoryContext&) {
return [&](Http::FilterChainFactoryCallbacks& callbacks) -> void {
// GUARDED_BY insists the lock be held when the guarded variables are passed by reference.
absl::WriterMutexLock m(&encode_lock_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: What is the acquiring the lock here required for? Can you add a comment?

callbacks.addStreamFilter(std::make_shared<::Envoy::TestPauseFilter>(
encode_lock_, number_of_encode_calls_, number_of_decode_calls_));
};
}

absl::Mutex encode_lock_;
uint32_t number_of_encode_calls_ GUARDED_BY(encode_lock_) = 0;
uint32_t number_of_decode_calls_ GUARDED_BY(encode_lock_) = 0;
};

// perform static registration
static Registry::RegisterFactory<TestPauseFilterConfig,
Server::Configuration::NamedHttpFilterConfigFactory>
register_;

void HttpIntegrationTest::testTwoRequests(bool network_backup) {
// if network_backup is false, this simply tests that Envoy can handle multiple
// requests on a connection.
//
// If network_backup is true, the first request will explicitly set the TCP level flow control
// as blocked as it finishes the encode and set a timer to unblock. The second stream should be
// created while the socket appears to be in the high watermark state, and regression tests that
// flow control will be corrected as the socket "becomes unblocked"
if (network_backup) {
config_helper_.addFilter(R"EOF(
name: pause-filter
config: {}
)EOF");
}
initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
Expand Down
2 changes: 1 addition & 1 deletion test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class HttpIntegrationTest : public BaseIntegrationTest {
void testRouterDownstreamDisconnectBeforeResponseComplete(
ConnectionCreationFunction* creator = nullptr);
void testRouterUpstreamResponseBeforeRequestComplete();
void testTwoRequests();
void testTwoRequests(bool force_network_backup = false);
void testOverlyLongHeaders();
void testIdleTimeoutBasic();
void testIdleTimeoutWithTwoRequests();
Expand Down
2 changes: 2 additions & 0 deletions test/integration/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ TEST_P(IntegrationTest, EnvoyProxyingLate100ContinueWithEncoderFilter) {

TEST_P(IntegrationTest, TwoRequests) { testTwoRequests(); }

TEST_P(IntegrationTest, TwoRequestsWithForcedBackup) { testTwoRequests(true); }

TEST_P(IntegrationTest, UpstreamDisconnectWithTwoRequests) {
testUpstreamDisconnectWithTwoRequests();
}
Expand Down
50 changes: 50 additions & 0 deletions test/integration/pass_through_filter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#pragma once

#include "envoy/http/filter.h"

namespace Envoy {

// TODO(alyssawilk) move add_trailers_filter to use this.
// A filter which passes all data through with Continue status.
class PassThroughFilter : public Http::StreamFilter {
public:
// Http::StreamFilterBase
void onDestroy() override {}

// Http::StreamDecoderFilter
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap&, bool) override {
return Http::FilterHeadersStatus::Continue;
}
Http::FilterDataStatus decodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap&) override {
return Http::FilterTrailersStatus::Continue;
}
void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override {
decoder_callbacks_ = &callbacks;
}

// Http::StreamEncoderFilter
Http::FilterHeadersStatus encode100ContinueHeaders(Http::HeaderMap&) override {
return Http::FilterHeadersStatus::Continue;
}
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap&, bool) override {
return Http::FilterHeadersStatus::Continue;
}
Http::FilterDataStatus encodeData(Buffer::Instance&, bool) override {
return Http::FilterDataStatus::Continue;
}
Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap&) override {
return Http::FilterTrailersStatus::Continue;
}
void setEncoderFilterCallbacks(Http::StreamEncoderFilterCallbacks& callbacks) override {
encoder_callbacks_ = &callbacks;
}

protected:
Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
};
} // namespace Envoy