Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Version history
* redis: added :ref:`prefix routing <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.prefix_routes>` to enable routing commands based on their key's prefix to different upstream.
* redis: add support for zpopmax and zpopmin commands.
* router: added ability to control retry back-off intervals via :ref:`retry policy <envoy_api_msg_route.RetryPolicy.RetryBackOff>`.
* router: per try timeouts will no longer start before the downstream request has been received
in full by the router. This ensures that the per try timeout does not account for slow
downstreams and that will not start before the global timeout.
* upstream: added :ref:`upstream_cx_pool_overflow <config_cluster_manager_cluster_stats>` for the connection pool circuit breaker.

1.10.0 (Apr 5, 2019)
Expand Down
16 changes: 14 additions & 2 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ void Filter::maybeDoShadowing() {
}

void Filter::onRequestComplete() {
// This should be called exactly once, when the downstream request has been received in full.
ASSERT(!downstream_end_stream_);
downstream_end_stream_ = true;
Event::Dispatcher& dispatcher = callbacks_->dispatcher();
downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
Expand All @@ -521,6 +523,12 @@ void Filter::onRequestComplete() {
response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
response_timeout_->enableTimer(timeout_.global_timeout_);
}

for (auto& upstream_request : upstream_requests_) {
if (upstream_request->create_per_try_timeout_on_request_complete_) {
upstream_request->setupPerTryTimeout();
}
}
}
}

Expand Down Expand Up @@ -984,7 +992,7 @@ Filter::UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::I
: parent_(parent), conn_pool_(pool), grpc_rq_success_deferred_(false),
stream_info_(pool.protocol(), parent_.callbacks_->dispatcher().timeSource()),
calling_encode_headers_(false), upstream_canary_(false), encode_complete_(false),
encode_trailers_(false) {
encode_trailers_(false), create_per_try_timeout_on_request_complete_(false) {

if (parent_.config_.start_child_span_) {
span_ = parent_.callbacks_->activeSpan().spawnChild(
Expand Down Expand Up @@ -1184,7 +1192,11 @@ void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder,
onUpstreamHostSelected(host);
request_encoder.getStream().addCallbacks(*this);

setupPerTryTimeout();
if (parent_.downstream_end_stream_) {
setupPerTryTimeout();
} else {
create_per_try_timeout_on_request_complete_ = true;
}

conn_pool_stream_handle_ = nullptr;
setRequestEncoder(request_encoder);
Expand Down
3 changes: 3 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ class Filter : Logger::Loggable<Logger::Id::router>,
bool upstream_canary_ : 1;
bool encode_complete_ : 1;
bool encode_trailers_ : 1;
// Tracks whether we deferred a per try timeout because the downstream request
// had not been completed yet.
bool create_per_try_timeout_on_request_complete_ : 1;
};

typedef std::unique_ptr<UpstreamRequest> UpstreamRequestPtr;
Expand Down
60 changes: 57 additions & 3 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ TEST_F(RouterTest, UpstreamTimeoutWithAltResponse) {
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
}

// Verifies that the per try timeout is initialized once the downstream request has been read.
TEST_F(RouterTest, UpstreamPerTryTimeout) {
NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
Expand All @@ -1167,16 +1168,69 @@ TEST_F(RouterTest, UpstreamPerTryTimeout) {
EXPECT_EQ(host_address_, host->address());
}));

expectResponseTimerCreate();
Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

// We verify that both timeouts are started after decodeData(_, true) is called. This
// verifies that we are not starting the initial per try timeout on the first onPoolReady.
expectPerTryTimerCreate();
expectResponseTimerCreate();

Buffer::OwnedImpl data;
router_.decodeData(data, true);

EXPECT_CALL(callbacks_.stream_info_,
setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout));
EXPECT_CALL(encoder.stream_, resetStream(Http::StreamResetReason::LocalReset));
Http::TestHeaderMapImpl response_headers{
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), false));
EXPECT_CALL(callbacks_, encodeData(_, true));
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(504));
per_try_timeout_->callback_();

EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("upstream_rq_per_try_timeout")
.value());
EXPECT_EQ(1UL, cm_.conn_pool_.host_->stats().rq_timeout_.value());
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
}

// Verifies that the per try timeout starts when onPoolReady is called when it occurs
// after the downstream request has been read.
TEST_F(RouterTest, UpstreamPerTryTimeoutDelayedPoolReady) {
NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
Http::ConnectionPool::Callbacks* pool_callbacks;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
pool_callbacks = &callbacks;
return nullptr;
}));

Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

// Global timeout starts when decodeData(_, true) is called.
expectResponseTimerCreate();
Buffer::OwnedImpl data;
router_.decodeData(data, true);

// Per try timeout starts when onPoolReady is called.
expectPerTryTimerCreate();
EXPECT_CALL(callbacks_.stream_info_, onUpstreamHostSelected(_))
.WillOnce(Invoke([&](const Upstream::HostDescriptionConstSharedPtr host) -> void {
EXPECT_EQ(host_address_, host->address());
}));

pool_callbacks->onPoolReady(encoder, cm_.conn_pool_.host_);

EXPECT_CALL(callbacks_.stream_info_,
setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout));
EXPECT_CALL(encoder.stream_, resetStream(Http::StreamResetReason::LocalReset));
Expand Down Expand Up @@ -1364,8 +1418,8 @@ TEST_F(RouterTest, RetryUpstreamPerTryTimeout) {
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"},
{"x-envoy-internal", "true"},
Expand Down Expand Up @@ -1455,8 +1509,8 @@ TEST_F(RouterTest, DontResetStartedResponseOnUpstreamPerTryTimeout) {
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
Expand Down
2 changes: 1 addition & 1 deletion test/common/router/router_upstream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class RouterUpstreamLogTest : public testing::Test {
callbacks.onPoolReady(encoder1, context_.cluster_manager_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"},
{"x-envoy-internal", "true"},
Expand Down