Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
89ba76c
add max stream duration on upstream connection
Shikugawa Mar 25, 2020
df7eb75
delete unused sections
Shikugawa Mar 26, 2020
7449db5
fix
Shikugawa Mar 30, 2020
9ca82f0
fix
Shikugawa Apr 2, 2020
52ae433
resolve conflict
Shikugawa Apr 2, 2020
132ad71
fix
Shikugawa Apr 6, 2020
8d7b527
Merge branch 'master' into upstream-max-stream-duration
Shikugawa Apr 6, 2020
5e6eaa5
fix
Shikugawa Apr 7, 2020
878d8f4
fix
Shikugawa Apr 7, 2020
f4ac4be
add retry
Shikugawa Apr 11, 2020
ea73692
fix
Shikugawa Apr 11, 2020
69797fd
proto
Shikugawa Apr 11, 2020
a4215ec
build
Shikugawa Apr 13, 2020
77bdd1b
fix flag
Shikugawa Apr 13, 2020
3b06d6d
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa Apr 13, 2020
b4e6725
fix memory
Shikugawa Apr 13, 2020
2f54fbc
add retry test
Shikugawa Apr 14, 2020
afd20d7
fix
Shikugawa Apr 15, 2020
dce6d90
full stream retry
Shikugawa Apr 23, 2020
ae0d2c3
resolve test failure
Shikugawa Apr 23, 2020
a6a6f89
memory
Shikugawa Apr 23, 2020
147f0ab
fix
Shikugawa Apr 24, 2020
d24662d
build
Shikugawa Apr 24, 2020
42decdd
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa Apr 28, 2020
f65dd0b
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa Apr 29, 2020
cf9dae4
connect termination
Shikugawa Apr 29, 2020
e68d9af
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa Apr 30, 2020
fb07faf
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa Apr 30, 2020
382221e
fix
Shikugawa Apr 30, 2020
dc0c318
fix
Shikugawa May 1, 2020
1e475e2
fix
Shikugawa May 1, 2020
d75836c
Merge branch 'master' of https://github.com/envoyproxy/envoy into ups…
Shikugawa May 4, 2020
db8b989
fix
Shikugawa May 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/envoy/api/v2/core/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ message HttpProtocolOptions {

// Total duration to keep alive an HTTP request/response stream. If the time limit is reached the stream will be
// reset independent of any other timeouts. If not specified, this value is not set.
// The current implementation implements this timeout on downstream connections only.
Comment thread
mattklein123 marked this conversation as resolved.
// [#comment:TODO(shikugawa): add this functionality to upstream.]
google.protobuf.Duration max_stream_duration = 4;
}

Expand Down
2 changes: 0 additions & 2 deletions api/envoy/config/core/v3/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ message HttpProtocolOptions {

// Total duration to keep alive an HTTP request/response stream. If the time limit is reached the stream will be
// reset independent of any other timeouts. If not specified, this value is not set.
// The current implementation implements this timeout on downstream connections only.
// [#comment:TODO(shikugawa): add this functionality to upstream.]
google.protobuf.Duration max_stream_duration = 4;
}

Expand Down
Empty file added clang-tidy-fixes.yaml
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Every cluster has a statistics tree rooted at *cluster.<name>.* with the followi
upstream_rq_cancelled, Counter, Total requests cancelled before obtaining a connection pool connection
upstream_rq_maintenance_mode, Counter, Total requests that resulted in an immediate 503 due to :ref:`maintenance mode<config_http_filters_router_runtime_maintenance_mode>`
upstream_rq_timeout, Counter, Total requests that timed out waiting for a response
upstream_rq_max_duration_reached, Counter, Total requests closed due to max duration reached
upstream_rq_per_try_timeout, Counter, Total requests that hit the per try timeout
upstream_rq_rx_reset, Counter, Total requests that were reset remotely
upstream_rq_tx_reset, Counter, Total requests that were reset locally
Expand Down
2 changes: 0 additions & 2 deletions generated_api_shadow/envoy/api/v2/core/protocol.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions generated_api_shadow/envoy/config/core/v3/protocol.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ class PrioritySet {
COUNTER(upstream_rq_cancelled) \
COUNTER(upstream_rq_completed) \
COUNTER(upstream_rq_maintenance_mode) \
COUNTER(upstream_rq_max_duration_reached) \
COUNTER(upstream_rq_pending_failure_eject) \
COUNTER(upstream_rq_pending_overflow) \
COUNTER(upstream_rq_pending_total) \
Expand Down Expand Up @@ -699,6 +700,11 @@ class ClusterInfo {
*/
virtual std::chrono::milliseconds connectTimeout() const PURE;

/**
* @return maximum duration time to keep alive stream
*/
virtual absl::optional<std::chrono::milliseconds> maxStreamDuration() const PURE;
Comment thread
Shikugawa marked this conversation as resolved.
Outdated

/**
* @return the idle timeout for upstream connection pool connections.
*/
Expand Down
19 changes: 19 additions & 0 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ UpstreamRequest::~UpstreamRequest() {
// Allows for testing.
per_try_timeout_->disableTimer();
}
if (max_stream_duration_timer_ != nullptr) {
max_stream_duration_timer_->disableTimer();
}
clearRequestEncoder();

// If desired, fire the per-try histogram when the UpstreamRequest
Expand Down Expand Up @@ -360,7 +363,15 @@ void UpstreamRequest::onPoolReady(
// If end_stream is set in headers, and there are metadata to send, delays end_stream. The case
// only happens when decoding headers filters return ContinueAndEndStream.
const bool delay_headers_end_stream = end_stream && !downstream_metadata_map_vector_.empty();

const auto max_stream_duration = upstream_host_->cluster().maxStreamDuration();
if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
[this]() -> void { onStreamMaxDurationReached(); });
max_stream_duration_timer_->enableTimer(upstream_host_->cluster().maxStreamDuration().value());
}
upstream_->encodeHeaders(*parent_.downstreamHeaders(), end_stream && !delay_headers_end_stream);

calling_encode_headers_ = false;

// It is possible to get reset in the middle of an encodeHeaders() call. This happens for
Expand Down Expand Up @@ -398,6 +409,14 @@ void UpstreamRequest::onPoolReady(
}
}

void UpstreamRequest::onStreamMaxDurationReached() {
ASSERT(upstream_host_);
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
upstream_host_->cluster().stats().upstream_rq_max_duration_reached_.inc();
max_stream_duration_timer_->disableTimer();
max_stream_duration_timer_.reset();
resetStream();
}

void UpstreamRequest::clearRequestEncoder() {
// Before clearing the encoder, unsubscribe from callbacks.
if (upstream_) {
Expand Down
3 changes: 3 additions & 0 deletions source/common/router/upstream_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
UpstreamRequest* upstreamRequest() override { return this; }

void clearRequestEncoder();
void onStreamMaxDurationReached();
Comment thread
mattklein123 marked this conversation as resolved.

struct DownstreamWatermarkManager : public Http::DownstreamWatermarkCallbacks {
DownstreamWatermarkManager(UpstreamRequest& parent) : parent_(parent) {}
Expand Down Expand Up @@ -174,6 +175,8 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
// Sentinel to indicate if timeout budget tracking is configured for the cluster,
// and if so, if the per-try histogram should record a value.
bool record_timeout_budget_ : 1;

Event::TimerPtr max_stream_duration_timer_;
};

class HttpConnPool : public GenericConnPool, public Http::ConnectionPool::Callbacks {
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ ClusterInfoImpl::ClusterInfoImpl(
Http::DEFAULT_MAX_HEADERS_COUNT))),
connect_timeout_(
std::chrono::milliseconds(PROTOBUF_GET_MS_REQUIRED(config, connect_timeout))),
max_stream_duration_(
PROTOBUF_GET_OPTIONAL_MS(config.common_http_protocol_options(), max_stream_duration)),
per_connection_buffer_limit_bytes_(
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
Expand Down
4 changes: 4 additions & 0 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::I
return common_lb_config_;
}
std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
absl::optional<std::chrono::milliseconds> maxStreamDuration() const override {
return max_stream_duration_;
}
const absl::optional<std::chrono::milliseconds> idleTimeout() const override {
return idle_timeout_;
}
Expand Down Expand Up @@ -616,6 +619,7 @@ class ClusterInfoImpl : public ClusterInfo, protected Logger::Loggable<Logger::I
const uint64_t max_requests_per_connection_;
const uint32_t max_response_headers_count_;
const std::chrono::milliseconds connect_timeout_;
const absl::optional<std::chrono::milliseconds> max_stream_duration_;
absl::optional<std::chrono::milliseconds> idle_timeout_;
const uint32_t per_connection_buffer_limit_bytes_;
TransportSocketMatcherPtr socket_matcher_;
Expand Down
80 changes: 80 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class RouterTestBase : public testing::Test {
EXPECT_CALL(*per_try_timeout_, disableTimer());
}

void expectMaxStreamDurationTimer() {
max_stream_duration_timer_ = new Event::MockTimer(&callbacks_.dispatcher_);
EXPECT_CALL(*max_stream_duration_timer_, enableTimer(_, _));
EXPECT_CALL(*max_stream_duration_timer_, disableTimer());
}

AssertionResult verifyHostUpstreamStats(uint64_t success, uint64_t error) {
if (success != cm_.conn_pool_.host_->stats_.rq_success_.value()) {
return AssertionFailure() << fmt::format("rq_success {} does not match expected {}",
Expand Down Expand Up @@ -348,6 +354,7 @@ class RouterTestBase : public testing::Test {
RouterTestFilter router_;
Event::MockTimer* response_timeout_{};
Event::MockTimer* per_try_timeout_{};
Event::MockTimer* max_stream_duration_timer_{};
Network::Address::InstanceConstSharedPtr host_address_{
Network::Utility::resolveUrl("tcp://10.0.0.5:9211")};
NiceMock<Http::MockRequestEncoder> original_encoder_;
Expand Down Expand Up @@ -3551,6 +3558,79 @@ TEST_F(RouterTest, RetryTimeoutDuringRetryDelay) {
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
}

TEST_F(RouterTest, MaxStreamDurationValidlyConfigured) {
NiceMock<Http::MockRequestEncoder> encoder1;
Http::ResponseDecoder* response_decoder = nullptr;
ON_CALL(cm_.conn_pool_.host_->cluster_, maxStreamDuration())
.WillByDefault(Return(std::chrono::milliseconds(500)));
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke(
[&](Http::ResponseDecoder& decoder,
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));
expectMaxStreamDurationTimer();

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);
max_stream_duration_timer_->invokeCallback();

router_.onDestroy();
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
}

TEST_F(RouterTest, MaxStreamDurationDisabledIfSetToZero) {
NiceMock<Http::MockRequestEncoder> encoder1;
Http::ResponseDecoder* response_decoder = nullptr;
ON_CALL(cm_.conn_pool_.host_->cluster_, maxStreamDuration())
.WillByDefault(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke(
[&](Http::ResponseDecoder& decoder,
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));

// not to be called timer creation.
EXPECT_CALL(callbacks_.dispatcher_, createTimer_).Times(0);

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

router_.onDestroy();
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
}

TEST_F(RouterTest, MaxStreamDurationCallbackNotCalled) {
NiceMock<Http::MockRequestEncoder> encoder1;
Http::ResponseDecoder* response_decoder = nullptr;
ON_CALL(cm_.conn_pool_.host_->cluster_, maxStreamDuration())
.WillByDefault(Return(std::chrono::milliseconds(5000)));
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke(
[&](Http::ResponseDecoder& decoder,
Http::ConnectionPool::Callbacks& callbacks) -> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));

expectMaxStreamDurationTimer();

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

router_.onDestroy();
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
}

TEST_F(RouterTest, RetryTimeoutDuringRetryDelayWithUpstreamRequestNoHost) {
NiceMock<Http::MockRequestEncoder> encoder1;
Http::ResponseDecoder* response_decoder = nullptr;
Expand Down
1 change: 1 addition & 0 deletions test/common/tcp_proxy/upstream_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class HttpUpstreamTest : public testing::Test {
Http::MockRequestEncoder encoder_;
NiceMock<Tcp::ConnectionPool::MockUpstreamCallbacks> callbacks_;
std::unique_ptr<HttpUpstream> upstream_;
testing::NiceMock<Event::MockDispatcher> dispatcher_;
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
std::string hostname_{"default.host.com"};
};

Expand Down
2 changes: 2 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ TEST_P(Http2IntegrationTest, RetryAttemptCount) { testRetryAttemptCountHeader();

TEST_P(Http2IntegrationTest, LargeRequestTrailersRejected) { testLargeRequestTrailers(66, 60); }

TEST_P(Http2IntegrationTest, BasicMaxStreamDuration) { testMaxStreamDuration(); }
Comment thread
Shikugawa marked this conversation as resolved.
Outdated

static std::string response_metadata_filter = R"EOF(
name: response-metadata-filter
typed_config:
Expand Down
23 changes: 23 additions & 0 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,29 @@ void HttpIntegrationTest::testAdminDrain(Http::CodecClient::Type admin_request_t
nullptr, true));
}

void HttpIntegrationTest::testMaxStreamDuration() {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
auto* static_resources = bootstrap.mutable_static_resources();
auto* cluster = static_resources->mutable_clusters(0);
auto* http_protocol_options = cluster->mutable_common_http_protocol_options();
http_protocol_options->mutable_max_stream_duration()->MergeFrom(
ProtobufUtil::TimeUtil::MillisecondsToDuration(200));
});

initialize();
fake_upstreams_[0]->set_allow_unexpected_disconnects(true);
codec_client_ = makeHttpConnection(lookupPort("http"));

auto encoder_decoder = codec_client_->startRequest(default_request_headers_);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));

test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_max_duration_reached", 1);
Comment thread
Shikugawa marked this conversation as resolved.
}

std::string HttpIntegrationTest::listenerStatPrefix(const std::string& stat_name) {
if (version_ == Network::Address::IpVersion::v4) {
return "listener.127.0.0.1_0." + stat_name;
Expand Down
3 changes: 2 additions & 1 deletion test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class HttpIntegrationTest : public BaseIntegrationTest {
bool response_trailers_present);
// Test /drain_listener from admin portal.
void testAdminDrain(Http::CodecClient::Type admin_request_type);

// test max stream duration
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
void testMaxStreamDuration();
Http::CodecClient::Type downstreamProtocol() const { return downstream_protocol_; }
// Prefix listener stat with IP:port, including IP version dependent loopback address.
std::string listenerStatPrefix(const std::string& stat_name);
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/cluster_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class MockClusterInfo : public ClusterInfo {
// Upstream::ClusterInfo
MOCK_METHOD(bool, addedViaApi, (), (const));
MOCK_METHOD(std::chrono::milliseconds, connectTimeout, (), (const));
MOCK_METHOD(absl::optional<std::chrono::milliseconds>, maxStreamDuration, (), (const));
MOCK_METHOD(const absl::optional<std::chrono::milliseconds>, idleTimeout, (), (const));
MOCK_METHOD(uint32_t, perConnectionBufferLimitBytes, (), (const));
MOCK_METHOD(uint64_t, features, (), (const));
Expand Down Expand Up @@ -158,6 +159,7 @@ class MockClusterInfo : public ClusterInfo {
envoy::config::cluster::v3::Cluster::CommonLbConfig lb_config_;
envoy::config::core::v3::Metadata metadata_;
std::unique_ptr<Envoy::Config::TypedMetadata> typed_metadata_;
absl::optional<std::chrono::milliseconds> max_stream_duration_;
};

class MockIdleTimeEnabledClusterInfo : public MockClusterInfo {
Expand Down