Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/root/configuration/http/http_filters/router_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,19 @@ compared to network latency between client and Envoy. This header is set on resp

.. _config_http_filters_router_x-envoy-overloaded_set:

x-envoy-upstream-stream-duration-ms
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This value is used to configure the maximum upstream stream lifetime for the stream which has this header.
If the stream exceeds this lifetime, it will be reset and a 408 response
will be sent to downstream. If the value of the header is 0, then the lifetime will be
infinite and no limit will be enforce. It is similar to
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
:ref:`max_stream_duration <envoy_v3_api_field_config.core.v3.HttpProtocolOptions.max_stream_duration>`,
but that configuration applies to all streams to this cluster. If set, this header will
override the cluster configuration.
Comment thread
alyssawilk marked this conversation as resolved.
Outdated

.. _config_http_filters_router_x-envoy-upstream-stream-duration-ms:

x-envoy-overloaded
^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ New Features
------------
* bootstrap: added :ref:`inline_headers <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.inline_headers>` in the bootstrap to make custom inline headers bootstrap configurable.
* http: added :ref:`string_match <envoy_v3_api_field_config.route.v3.HeaderMatcher.string_match>` in the header matcher.
* http: added :ref:`x-envoy-upstream-stream-duration-ms <config_http_filters_router_x-envoy-upstream-stream-duration-ms>` that allows to configure the a lifetime from the request header.
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
* http: added support for :ref:`max_requests_per_connection <envoy_v3_api_field_config.core.v3.HttpProtocolOptions.max_requests_per_connection>` for both upstream and downstream connections.
* jwt_authn: added support for :ref:`Jwt Cache <envoy_v3_api_field_extensions.filters.http.jwt_authn.v3.JwtProvider.jwt_cache_config>` and its size can be specified by :ref:`jwt_cache_size <envoy_v3_api_field_extensions.filters.http.jwt_authn.v3.JwtCacheConfig.jwt_cache_size>`.
* listener: new listener metric `downstream_cx_transport_socket_connect_timeout` to track transport socket timeouts.
Expand Down
3 changes: 2 additions & 1 deletion envoy/http/header_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ class HeaderEntry {
HEADER_FUNC(EnvoyExpectedRequestTimeoutMs) \
HEADER_FUNC(EnvoyMaxRetries) \
HEADER_FUNC(EnvoyUpstreamRequestTimeoutMs) \
HEADER_FUNC(EnvoyUpstreamRequestPerTryTimeoutMs)
HEADER_FUNC(EnvoyUpstreamRequestPerTryTimeoutMs) \
HEADER_FUNC(EnvoyUpstreamStreamDurationMs)

#define INLINE_REQ_HEADERS(HEADER_FUNC) \
INLINE_REQ_STRING_HEADERS(HEADER_FUNC) \
Expand Down
1 change: 0 additions & 1 deletion envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "envoy/http/codes.h"
#include "envoy/http/conn_pool.h"
#include "envoy/http/hash_policy.h"
#include "envoy/http/header_map.h"
#include "envoy/router/internal_redirect.h"
#include "envoy/tcp/conn_pool.h"
#include "envoy/tracing/http_tracer.h"
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/headers.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ class HeaderValues {
const LowerCaseString EnvoyUpstreamServiceTime{absl::StrCat(prefix(), "-upstream-service-time")};
const LowerCaseString EnvoyUpstreamHealthCheckedCluster{
absl::StrCat(prefix(), "-upstream-healthchecked-cluster")};
const LowerCaseString EnvoyUpstreamStreamDurationMs{
absl::StrCat(prefix(), "-upstream-stream-duration-ms")};
const LowerCaseString EnvoyDecoratorOperation{absl::StrCat(prefix(), "-decorator-operation")};
const LowerCaseString Expect{"expect"};
const LowerCaseString ForwardedClientCert{"x-forwarded-client-cert"};
Expand Down
30 changes: 25 additions & 5 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ FilterUtility::finalTimeout(const RouteEntry& route, Http::RequestHeaderMap& req
const Http::HeaderEntry* header_timeout_entry =
request_headers.EnvoyUpstreamRequestTimeoutMs();

if (trySetGlobalTimeout(header_timeout_entry, timeout)) {
if (header_timeout_entry) {
trySetGlobalTimeout(header_timeout_entry, timeout);
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
request_headers.removeEnvoyUpstreamRequestTimeoutMs();
}
}
} else {
const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
if (trySetGlobalTimeout(header_timeout_entry, timeout)) {

if (header_timeout_entry) {
trySetGlobalTimeout(header_timeout_entry, timeout);
Comment thread
alyssawilk marked this conversation as resolved.
Outdated
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
request_headers.removeEnvoyUpstreamRequestTimeoutMs();
}
}
Expand Down Expand Up @@ -226,13 +229,22 @@ FilterUtility::finalTimeout(const RouteEntry& route, Http::RequestHeaderMap& req
return timeout;
}

bool FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry* header_timeout_entry,
TimeoutData& timeout) {
absl::optional<std::chrono::milliseconds>
FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry* header_timeout_entry) {
if (header_timeout_entry) {
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
uint64_t header_timeout;
if (absl::SimpleAtoi(header_timeout_entry->value().getStringView(), &header_timeout)) {
timeout.global_timeout_ = std::chrono::milliseconds(header_timeout);
return std::chrono::milliseconds(header_timeout);
}
}
return absl::nullopt;
}

bool FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry* header_timeout_entry,
TimeoutData& timeout) {
const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
if (timeout_ms.has_value()) {
timeout.global_timeout_ = timeout_ms.value();
return true;
}
return false;
Expand Down Expand Up @@ -579,6 +591,14 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
config_.respect_expected_rq_timeout_);

const Http::HeaderEntry* header_max_stream_duration_entry =
headers.EnvoyUpstreamStreamDurationMs();
if (header_max_stream_duration_entry) {
dynamic_max_stream_duration_ =
FilterUtility::tryParseHeaderTimeout(header_max_stream_duration_entry);
headers.removeEnvoyUpstreamStreamDurationMs();
}

// If this header is set with any value, use an alternate response code on timeout
if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
timeout_response_code_ = Http::Code::NoContent;
Expand Down
16 changes: 15 additions & 1 deletion source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ class FilterUtility {
bool per_try_timeout_hedging_enabled,
bool respect_expected_rq_timeout);

/**
* Try to parse a header entry that may have a timeout field
*
* @param header_timeout_entry header entry which may contain a timeout value.
* @return result timeout value from header. It will return nullopt if parse failed.
*/
static absl::optional<std::chrono::milliseconds>
Comment thread
Shikugawa marked this conversation as resolved.
tryParseHeaderTimeout(const Http::HeaderEntry* header_timeout_entry);

static bool trySetGlobalTimeout(const Http::HeaderEntry* header_timeout_entry,
TimeoutData& timeout);

Expand Down Expand Up @@ -262,6 +271,7 @@ class RouterFilterInterface {
virtual Upstream::ClusterInfoConstSharedPtr cluster() PURE;
virtual FilterConfig& config() PURE;
virtual FilterUtility::TimeoutData timeout() PURE;
virtual absl::optional<std::chrono::milliseconds> dynamicMaxStreamDuration() const PURE;
virtual Http::RequestHeaderMap* downstreamHeaders() PURE;
virtual Http::RequestTrailerMap* downstreamTrailers() PURE;
virtual bool downstreamResponseStarted() const PURE;
Expand Down Expand Up @@ -436,6 +446,9 @@ class Filter : Logger::Loggable<Logger::Id::router>,
Upstream::ClusterInfoConstSharedPtr cluster() override { return cluster_; }
FilterConfig& config() override { return config_; }
FilterUtility::TimeoutData timeout() override { return timeout_; }
absl::optional<std::chrono::milliseconds> dynamicMaxStreamDuration() const override {
return dynamic_max_stream_duration_;
}
Http::RequestHeaderMap* downstreamHeaders() override { return downstream_headers_; }
Http::RequestTrailerMap* downstreamTrailers() override { return downstream_trailers_; }
bool downstreamResponseStarted() const override { return downstream_response_started_; }
Expand Down Expand Up @@ -535,7 +548,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
MetadataMatchCriteriaConstPtr metadata_match_;
std::function<void(Http::ResponseHeaderMap&)> modify_headers_;
std::vector<std::reference_wrapper<const ShadowPolicy>> active_shadow_policies_{};

// The stream lifetime configured by request header.
absl::optional<std::chrono::milliseconds> dynamic_max_stream_duration_;
// list of cookies to add to upstream headers
std::vector<std::string> downstream_set_cookies_;

Expand Down
18 changes: 11 additions & 7 deletions source/common/router/upstream_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,14 +450,18 @@ void UpstreamRequest::onPoolReady(
paused_for_connect_ = true;
}

if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
const auto max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
absl::optional<std::chrono::milliseconds> max_stream_duration;
if (parent_.dynamicMaxStreamDuration().has_value()) {
max_stream_duration = parent_.dynamicMaxStreamDuration().value();
} else if (upstream_host_->cluster().commonHttpProtocolOptions().has_max_stream_duration()) {
max_stream_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
upstream_host_->cluster().commonHttpProtocolOptions().max_stream_duration()));
if (max_stream_duration.count()) {
max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
[this]() -> void { onStreamMaxDurationReached(); });
max_stream_duration_timer_->enableTimer(max_stream_duration);
}
}

if (max_stream_duration.has_value() && max_stream_duration->count()) {
max_stream_duration_timer_ = parent_.callbacks()->dispatcher().createTimer(
[this]() -> void { onStreamMaxDurationReached(); });
max_stream_duration_timer_->enableTimer(*max_stream_duration);
}

const Http::Status status =
Expand Down
42 changes: 42 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6062,6 +6062,48 @@ TEST_F(RouterTest, PostHttpUpstream) {
router_.onDestroy();
}

TEST_F(RouterTest, SetDynamicMaxStreamDuration) {
NiceMock<Http::MockRequestEncoder> encoder1;
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::ResponseDecoder&, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
callbacks.onPoolReady(encoder1, cm_.thread_local_cluster_.conn_pool_.host_,
upstream_stream_info_, Http::Protocol::Http10);
return nullptr;
}));
expectMaxStreamDurationTimerCreate();

Http::TestRequestHeaderMapImpl headers{{"x-envoy-upstream-stream-duration-ms", "500"}};

HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);
max_stream_duration_timer_->invokeCallback();

router_.onDestroy();
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
}
Comment thread
Shikugawa marked this conversation as resolved.

TEST_F(RouterTest, NotSetDynamicMaxStreamDurationIfZero) {
NiceMock<Http::MockRequestEncoder> encoder1;
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::ResponseDecoder&, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
callbacks.onPoolReady(encoder1, cm_.thread_local_cluster_.conn_pool_.host_,
upstream_stream_info_, Http::Protocol::Http10);
return nullptr;
}));

// The timer will not be created.
EXPECT_CALL(callbacks_.dispatcher_, createTimer_).Times(0);

Http::TestRequestHeaderMapImpl headers{{"x-envoy-upstream-stream-duration-ms", "0"}};
HttpTestUtility::addDefaultHeaders(headers);
router_.decodeHeaders(headers, false);

router_.onDestroy();
EXPECT_TRUE(verifyHostUpstreamStats(0, 0));
}

// Test that request/response header/body sizes are properly recorded.
TEST_F(RouterTest, RequestResponseSize) { testRequestResponseSize(false); }

Expand Down
87 changes: 0 additions & 87 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1404,93 +1404,6 @@ void HttpIntegrationTest::testAdminDrain(Http::CodecType admin_request_type) {
}
}

void HttpIntegrationTest::testMaxStreamDuration() {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
ConfigHelper::HttpProtocolOptions protocol_options;
auto* http_protocol_options = protocol_options.mutable_common_http_protocol_options();
http_protocol_options->mutable_max_stream_duration()->MergeFrom(
ProtobufUtil::TimeUtil::MillisecondsToDuration(200));
ConfigHelper::setProtocolOptions(*bootstrap.mutable_static_resources()->mutable_clusters(0),
protocol_options);
});

initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

auto encoder_decoder = codec_client_->startRequest(default_request_headers_);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));

test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_max_duration_reached", 1);

if (downstream_protocol_ == Http::CodecType::HTTP1) {
ASSERT_TRUE(codec_client_->waitForDisconnect());
} else {
ASSERT_TRUE(response->waitForEndStream());
codec_client_->close();
}
}

void HttpIntegrationTest::testMaxStreamDurationWithRetry(bool invoke_retry_upstream_disconnect) {
config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
ConfigHelper::HttpProtocolOptions protocol_options;
auto* http_protocol_options = protocol_options.mutable_common_http_protocol_options();
http_protocol_options->mutable_max_stream_duration()->MergeFrom(
ProtobufUtil::TimeUtil::MillisecondsToDuration(1000));
ConfigHelper::setProtocolOptions(*bootstrap.mutable_static_resources()->mutable_clusters(0),
protocol_options);
});

Http::TestRequestHeaderMapImpl retriable_header = Http::TestRequestHeaderMapImpl{
{":method", "POST"}, {":path", "/test/long/url"}, {":scheme", "http"},
{":authority", "host"}, {"x-forwarded-for", "10.0.0.1"}, {"x-envoy-retry-on", "5xx"}};
initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));

auto encoder_decoder = codec_client_->startRequest(retriable_header);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());

if (fake_upstreams_[0]->httpType() == Http::CodecType::HTTP1) {
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
} else {
ASSERT_TRUE(upstream_request_->waitForReset());
}

test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_max_duration_reached", 1);

ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));

if (invoke_retry_upstream_disconnect) {
test_server_->waitForCounterGe("cluster.cluster_0.upstream_rq_max_duration_reached", 2);
if (downstream_protocol_ == Http::CodecType::HTTP1) {
ASSERT_TRUE(codec_client_->waitForDisconnect());
} else {
ASSERT_TRUE(response->waitForEndStream());
codec_client_->close();
}

EXPECT_EQ("408", response->headers().getStatusValue());
} else {
Http::TestResponseHeaderMapImpl response_headers{{":status", "200"}};
upstream_request_->encodeHeaders(response_headers, true);

response->waitForHeaders();
codec_client_->close();

EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
}
}

std::string HttpIntegrationTest::downstreamProtocolStatsRoot() const {
switch (downstreamProtocol()) {
case Http::CodecClient::Type::HTTP1:
Expand Down
7 changes: 2 additions & 5 deletions test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,8 @@ class HttpIntegrationTest : public BaseIntegrationTest {
void testTrailers(uint64_t request_size, uint64_t response_size, bool request_trailers_present,
bool response_trailers_present);
// Test /drain_listener from admin portal.
void testAdminDrain(Http::CodecType admin_request_type);
// Test max stream duration.
void testMaxStreamDuration();
void testMaxStreamDurationWithRetry(bool invoke_retry_upstream_disconnect);
Http::CodecType downstreamProtocol() const { return downstream_protocol_; }
void testAdminDrain(Http::CodecClient::Type admin_request_type);
Http::CodecClient::Type downstreamProtocol() const { return downstream_protocol_; }
std::string downstreamProtocolStatsRoot() const;
// Return the upstream protocol part of the stats root.
std::string upstreamProtocolStatsRoot() const;
Expand Down
Loading