Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ void Filter::maybeDoShadowing() {
}

void Filter::onRequestComplete() {
// This should be called exactly once, when the downstream request has been received in full.
ASSERT(!downstream_end_stream_);
downstream_end_stream_ = true;
Event::Dispatcher& dispatcher = callbacks_->dispatcher();
downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
Expand All @@ -521,6 +523,12 @@ void Filter::onRequestComplete() {
response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
response_timeout_->enableTimer(timeout_.global_timeout_);
}

for (auto& upstream_request : upstream_requests_) {
if (upstream_request->pending_per_try_timeout_) {
upstream_request->setupPerTryTimeout();
}
}
}
}

Expand Down Expand Up @@ -984,7 +992,7 @@ Filter::UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::I
: parent_(parent), conn_pool_(pool), grpc_rq_success_deferred_(false),
stream_info_(pool.protocol(), parent_.callbacks_->dispatcher().timeSource()),
calling_encode_headers_(false), upstream_canary_(false), encode_complete_(false),
encode_trailers_(false) {
encode_trailers_(false), pending_per_try_timeout_(false) {

if (parent_.config_.start_child_span_) {
span_ = parent_.callbacks_->activeSpan().spawnChild(
Expand Down Expand Up @@ -1184,7 +1192,11 @@ void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder,
onUpstreamHostSelected(host);
request_encoder.getStream().addCallbacks(*this);

setupPerTryTimeout();
if (parent_.downstream_end_stream_) {
setupPerTryTimeout();
} else {
pending_per_try_timeout_ = true;
}

conn_pool_stream_handle_ = nullptr;
setRequestEncoder(request_encoder);
Expand Down
3 changes: 3 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ class Filter : Logger::Loggable<Logger::Id::router>,
bool upstream_canary_ : 1;
bool encode_complete_ : 1;
bool encode_trailers_ : 1;
// Tracks whether we deferred a per try timeout because the downstream request
// had not been completed yet.
bool pending_per_try_timeout_ : 1;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would probably name this something like create_per_try_timeout_on_request_complete_ or something like that to differentiate between waiting for the per try timeout to elapse which is what this sounds like to me.

};

typedef std::unique_ptr<UpstreamRequest> UpstreamRequestPtr;
Expand Down
60 changes: 57 additions & 3 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,7 @@ TEST_F(RouterTest, UpstreamTimeoutWithAltResponse) {
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
}

// Verifies that the per try timeout is initialized once the downstream request has been read.
TEST_F(RouterTest, UpstreamPerTryTimeout) {
NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
Expand All @@ -1167,16 +1168,69 @@ TEST_F(RouterTest, UpstreamPerTryTimeout) {
EXPECT_EQ(host_address_, host->address());
}));

expectResponseTimerCreate();
Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

// We verify that both timeouts are started after decodeData(_, true) is called. This
// verifies that we are not starting the initial per try timeout on the first onPoolReady.
expectPerTryTimerCreate();
expectResponseTimerCreate();

Buffer::OwnedImpl data;
router_.decodeData(data, true);

EXPECT_CALL(callbacks_.stream_info_,
setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout));
EXPECT_CALL(encoder.stream_, resetStream(Http::StreamResetReason::LocalReset));
Http::TestHeaderMapImpl response_headers{
{":status", "504"}, {"content-length", "24"}, {"content-type", "text/plain"}};
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), false));
EXPECT_CALL(callbacks_, encodeData(_, true));
EXPECT_CALL(cm_.conn_pool_.host_->outlier_detector_, putHttpResponseCode(504));
per_try_timeout_->callback_();

EXPECT_EQ(1U, cm_.thread_local_cluster_.cluster_.info_->stats_store_
.counter("upstream_rq_per_try_timeout")
.value());
EXPECT_EQ(1UL, cm_.conn_pool_.host_->stats().rq_timeout_.value());
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
}

// Verifies that the per try timeout starts when onPoolReady is called when it occurs
// after the downstream request has been read.
TEST_F(RouterTest, UpstreamPerTryTimeoutDelayedPoolReady) {
NiceMock<Http::MockStreamEncoder> encoder;
Http::StreamDecoder* response_decoder = nullptr;
Http::ConnectionPool::Callbacks* pool_callbacks;
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
pool_callbacks = &callbacks;
return nullptr;
}));

Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

// Global timeout starts when decodeData(_, true) is called.
expectResponseTimerCreate();
Buffer::OwnedImpl data;
router_.decodeData(data, true);

// Per try timeout starts when onPoolReady is called.
expectPerTryTimerCreate();
EXPECT_CALL(callbacks_.stream_info_, onUpstreamHostSelected(_))
.WillOnce(Invoke([&](const Upstream::HostDescriptionConstSharedPtr host) -> void {
EXPECT_EQ(host_address_, host->address());
}));

pool_callbacks->onPoolReady(encoder, cm_.conn_pool_.host_);

EXPECT_CALL(callbacks_.stream_info_,
setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout));
EXPECT_CALL(encoder.stream_, resetStream(Http::StreamResetReason::LocalReset));
Expand Down Expand Up @@ -1364,8 +1418,8 @@ TEST_F(RouterTest, RetryUpstreamPerTryTimeout) {
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"},
{"x-envoy-internal", "true"},
Expand Down Expand Up @@ -1455,8 +1509,8 @@ TEST_F(RouterTest, DontResetStartedResponseOnUpstreamPerTryTimeout) {
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-internal", "true"},
{"x-envoy-upstream-rq-per-try-timeout-ms", "5"}};
Expand Down
2 changes: 1 addition & 1 deletion test/common/router/router_upstream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class RouterUpstreamLogTest : public testing::Test {
callbacks.onPoolReady(encoder1, context_.cluster_manager_.conn_pool_.host_);
return nullptr;
}));
expectResponseTimerCreate();
expectPerTryTimerCreate();
expectResponseTimerCreate();

Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"},
{"x-envoy-internal", "true"},
Expand Down