diff --git a/Makefile b/Makefile index 7c7699b9445..bf1fe3d6e36 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,8 @@ BAZEL_STARTUP_ARGS ?= BAZEL_BUILD_ARGS ?= BAZEL_TEST_ARGS ?= BAZEL_TARGETS ?= //... +# Some tests run so slowly under the santizers that they always timeout. +SANITIZER_EXCLUSIONS ?= -test/integration:mixer_fault_test HUB ?= TAG ?= ifeq "$(origin CC)" "default" @@ -49,11 +51,11 @@ test: @bazel shutdown test_asan: - PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-asan $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-asan -- $(BAZEL_TARGETS) $(SANITIZER_EXCLUSIONS) @bazel shutdown test_tsan: - PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-tsan $(BAZEL_TARGETS) + PATH=$(PATH) CC=$(CC) CXX=$(CXX) bazel $(BAZEL_STARTUP_ARGS) test $(BAZEL_TEST_ARGS) --config=clang-tsan -- $(BAZEL_TARGETS) $(SANITIZER_EXCLUSIONS) @bazel shutdown check: diff --git a/include/istio/control/http/request_handler.h b/include/istio/control/http/request_handler.h index 916b6191dc0..f2f0369a092 100644 --- a/include/istio/control/http/request_handler.h +++ b/include/istio/control/http/request_handler.h @@ -35,10 +35,13 @@ class RequestHandler { // * extract attributes from the config. // * if necessary, forward some attributes to downstream // * make a Check call. - virtual ::istio::mixerclient::CancelFunc Check( - CheckData* check_data, HeaderUpdate* header_update, - ::istio::mixerclient::TransportCheckFunc transport, - ::istio::mixerclient::CheckDoneFunc on_done) = 0; + virtual void Check(CheckData* check_data, HeaderUpdate* header_update, + const ::istio::mixerclient::TransportCheckFunc& transport, + const ::istio::mixerclient::CheckDoneFunc& on_done) = 0; + + virtual void ResetCancel() = 0; + + virtual void CancelCheck() = 0; // Make a Report call. It will: // * check service config to see if Report is required diff --git a/include/istio/control/tcp/request_handler.h b/include/istio/control/tcp/request_handler.h index 2fb0bedf60a..8c415daa22a 100644 --- a/include/istio/control/tcp/request_handler.h +++ b/include/istio/control/tcp/request_handler.h @@ -32,8 +32,12 @@ class RequestHandler { // Perform a Check call. It will: // * extract downstream tcp connection attributes // * check config, make a Check call if necessary. - virtual ::istio::mixerclient::CancelFunc Check( - CheckData* check_data, ::istio::mixerclient::CheckDoneFunc on_done) = 0; + virtual void Check(CheckData* check_data, + const ::istio::mixerclient::CheckDoneFunc& on_done) = 0; + + virtual void ResetCancel() = 0; + + virtual void CancelCheck() = 0; // Make report call. virtual void Report(ReportData* report_data, diff --git a/include/istio/mixerclient/client.h b/include/istio/mixerclient/client.h index 7c49ceffcc4..69233731558 100644 --- a/include/istio/mixerclient/client.h +++ b/include/istio/mixerclient/client.h @@ -133,9 +133,9 @@ class MixerClient { // The response data from mixer will be consumed by mixer client. // A check call. - virtual CancelFunc Check(istio::mixerclient::CheckContextSharedPtr& context, - TransportCheckFunc transport, - CheckDoneFunc on_done) = 0; + virtual void Check(istio::mixerclient::CheckContextSharedPtr& context, + const TransportCheckFunc& transport, + const CheckDoneFunc& on_done) = 0; // A report call. virtual void Report( diff --git a/include/istio/mixerclient/options.h b/include/istio/mixerclient/options.h index f24a0a17f23..3d988cf7221 100644 --- a/include/istio/mixerclient/options.h +++ b/include/istio/mixerclient/options.h @@ -39,7 +39,17 @@ struct CheckOptions { const int num_entries; // If true, Check is passed for any network failures. - bool network_fail_open = true; + bool network_fail_open{true}; + + // Number of retries on transport error + uint32_t retries{0}; + + // Base milliseconds to sleep between retries. Will be adjusted by + // exponential backoff and jitter. + uint32_t base_retry_ms{80}; + + // Max milliseconds to sleep between retries. + uint32_t max_retry_ms{1000}; }; // Options controlling report batch. diff --git a/istio.deps b/istio.deps index 3b869edc44e..39d2924e519 100644 --- a/istio.deps +++ b/istio.deps @@ -4,7 +4,7 @@ "name": "ISTIO_API", "repoName": "api", "file": "repositories.bzl", - "lastStableSHA": "92b7ddc0f30b3aab6a5e82a861e54bf55fe249bd" + "lastStableSHA": "5945a02236f53ad860d518772f730594709b1234" }, { "_comment": "", diff --git a/repositories.bzl b/repositories.bzl index 890d8cf6f1f..7142d7e7cca 100644 --- a/repositories.bzl +++ b/repositories.bzl @@ -99,8 +99,14 @@ cc_library( actual = "@googletest_git//:googletest_prod", ) -ISTIO_API = "92b7ddc0f30b3aab6a5e82a861e54bf55fe249bd" -ISTIO_API_SHA256 = "03fc53fe2a2ac980d2fbe9eeab9cf5526f8e493c786f72ba456d4c4e78b44e6b" +# +# To update these... +# 1) find the ISTIO_API SHA you want in git +# 2) wget https://github.com/istio/api/archive/ISTIO_API_SHA.tar.gz +# 3) sha256sum ISTIO_API_SHA.tar.gz +# +ISTIO_API = "5945a02236f53ad860d518772f730594709b1234" +ISTIO_API_SHA256 = "b75411deda635c70bdbf12cd1d405129d1f23e6a56a5eebbe69c75cfa3d6009e" def mixerapi_repositories(bind = True): BUILD = """ diff --git a/src/envoy/http/mixer/filter.cc b/src/envoy/http/mixer/filter.cc index c3ec6bba84f..8c310fa27df 100644 --- a/src/envoy/http/mixer/filter.cc +++ b/src/envoy/http/mixer/filter.cc @@ -76,7 +76,7 @@ FilterHeadersStatus Filter::decodeHeaders(HeaderMap& headers, bool) { decoder_callbacks_->connection()); Utils::HeaderUpdate header_update(&headers); headers_ = &headers; - cancel_check_ = handler_->Check( + handler_->Check( &check_data, &header_update, control_.GetCheckTransport(decoder_callbacks_->activeSpan()), [this](const CheckResponseInfo& info) { completeCheck(info); }); @@ -203,14 +203,12 @@ void Filter::completeCheck(const CheckResponseInfo& info) { void Filter::onDestroy() { ENVOY_LOG(debug, "Called Mixer::Filter : {} state: {}", __func__, state_); - if (state_ != Calling) { - cancel_check_ = nullptr; + if (state_ != Calling && handler_) { + handler_->ResetCancel(); } state_ = Responded; - if (cancel_check_) { - ENVOY_LOG(debug, "Cancelling check call"); - cancel_check_(); - cancel_check_ = nullptr; + if (handler_) { + handler_->CancelCheck(); } } diff --git a/src/envoy/http/mixer/filter.h b/src/envoy/http/mixer/filter.h index 326c60acac5..e5e12503561 100644 --- a/src/envoy/http/mixer/filter.h +++ b/src/envoy/http/mixer/filter.h @@ -89,8 +89,6 @@ class Filter : public StreamFilter, Control& control_; // The request handler. std::unique_ptr<::istio::control::http::RequestHandler> handler_; - // The pending callback object. - istio::mixerclient::CancelFunc cancel_check_; enum State { NotStarted, Calling, Complete, Responded }; // The state diff --git a/src/envoy/tcp/mixer/filter.cc b/src/envoy/tcp/mixer/filter.cc index 79ba5e9b05f..822d130fb9f 100644 --- a/src/envoy/tcp/mixer/filter.cc +++ b/src/envoy/tcp/mixer/filter.cc @@ -43,14 +43,12 @@ void Filter::initializeReadFilterCallbacks( } void Filter::cancelCheck() { - if (state_ != State::Calling) { - cancel_check_ = nullptr; + if (state_ != State::Calling && handler_) { + handler_->ResetCancel(); } state_ = State::Closed; - if (cancel_check_) { - ENVOY_LOG(debug, "Cancelling check call"); - cancel_check_(); - cancel_check_ = nullptr; + if (handler_) { + handler_->CancelCheck(); } } @@ -61,7 +59,7 @@ void Filter::callCheck() { state_ = State::Calling; filter_callbacks_->connection().readDisable(true); calling_check_ = true; - cancel_check_ = handler_->Check( + handler_->Check( this, [this](const CheckResponseInfo &info) { completeCheck(info); }); calling_check_ = false; } @@ -140,7 +138,7 @@ Network::FilterStatus Filter::onNewConnection() { void Filter::completeCheck(const CheckResponseInfo &info) { const auto &status = info.status(); ENVOY_LOG(debug, "Called tcp filter completeCheck: {}", status.ToString()); - cancel_check_ = nullptr; + handler_->ResetCancel(); if (state_ == State::Closed) { return; } diff --git a/src/envoy/tcp/mixer/filter.h b/src/envoy/tcp/mixer/filter.h index b9bfb97369c..cf59f1cdb88 100644 --- a/src/envoy/tcp/mixer/filter.h +++ b/src/envoy/tcp/mixer/filter.h @@ -86,8 +86,6 @@ class Filter : public Network::Filter, // Cancel the pending Check call. void cancelCheck(); - // the cancel check - istio::mixerclient::CancelFunc cancel_check_; // the control object. Control &control_; // pre-request handler diff --git a/src/envoy/utils/utils_test.cc b/src/envoy/utils/utils_test.cc index 66a12024f6c..53f262dbba4 100644 --- a/src/envoy/utils/utils_test.cc +++ b/src/envoy/utils/utils_test.cc @@ -51,7 +51,7 @@ TEST(UtilsTest, ParseMessageWithUnknownField) { TEST(UtilsTest, CheckResponseInfoToStreamInfo) { auto attributes = std::make_shared<::istio::mixerclient::SharedAttributes>(); ::istio::mixerclient::CheckContext check_response( - false /* fail_open */, attributes); // by default status is unknown + 0U, false /* fail_open */, attributes); // by default status is unknown Envoy::StreamInfo::MockStreamInfo mock_stream_info; EXPECT_CALL( diff --git a/src/istio/control/client_context_base.cc b/src/istio/control/client_context_base.cc index 069d596c0eb..9ca884c2b6d 100644 --- a/src/istio/control/client_context_base.cc +++ b/src/istio/control/client_context_base.cc @@ -31,6 +31,7 @@ using ::istio::mixerclient::MixerClientOptions; using ::istio::mixerclient::QuotaOptions; using ::istio::mixerclient::ReportOptions; using ::istio::mixerclient::Statistics; +using ::istio::mixerclient::TimerCreateFunc; using ::istio::mixerclient::TransportCheckFunc; using ::istio::utils::CreateLocalAttributes; using ::istio::utils::LocalNode; @@ -39,6 +40,16 @@ namespace istio { namespace control { namespace { +static constexpr uint32_t MaxDurationSec = 24 * 60 * 60; + +static uint32_t DurationToMsec(const ::google::protobuf::Duration& duration) { + uint32_t msec = + 1000 * (duration.seconds() > MaxDurationSec ? MaxDurationSec + : duration.seconds()); + msec += duration.nanos() / 1000 / 1000; + return msec; +} + CheckOptions GetJustCheckOptions(const TransportConfig& config) { if (config.disable_check_cache()) { return CheckOptions(0); @@ -48,9 +59,25 @@ CheckOptions GetJustCheckOptions(const TransportConfig& config) { CheckOptions GetCheckOptions(const TransportConfig& config) { auto options = GetJustCheckOptions(config); - if (config.has_network_fail_policy() && - config.network_fail_policy().policy() == NetworkFailPolicy::FAIL_CLOSE) { - options.network_fail_open = false; + if (config.has_network_fail_policy()) { + if (config.network_fail_policy().policy() == + NetworkFailPolicy::FAIL_CLOSE) { + options.network_fail_open = false; + } + + if (0 <= config.network_fail_policy().max_retry()) { + options.retries = config.network_fail_policy().max_retry(); + } + + if (config.network_fail_policy().has_base_retry_wait()) { + options.base_retry_ms = + DurationToMsec(config.network_fail_policy().base_retry_wait()); + } + + if (config.network_fail_policy().has_max_retry_wait()) { + options.max_retry_ms = + DurationToMsec(config.network_fail_policy().max_retry_wait()); + } } return options; } @@ -81,9 +108,10 @@ ClientContextBase::ClientContextBase(const TransportConfig& config, mixer_client_ = ::istio::mixerclient::CreateMixerClient(options); CreateLocalAttributes(local_node, &local_attributes_); network_fail_open_ = options.check_options.network_fail_open; + retries_ = options.check_options.retries; } -CancelFunc ClientContextBase::SendCheck( +void ClientContextBase::SendCheck( const TransportCheckFunc& transport, const CheckDoneFunc& on_done, ::istio::mixerclient::CheckContextSharedPtr& context) { MIXER_DEBUG("Check attributes: %s", diff --git a/src/istio/control/client_context_base.h b/src/istio/control/client_context_base.h index 5f7f741dc8b..e3640f4d203 100644 --- a/src/istio/control/client_context_base.h +++ b/src/istio/control/client_context_base.h @@ -17,6 +17,7 @@ #define ISTIO_CONTROL_CLIENT_CONTEXT_BASE_H #include "include/istio/mixerclient/client.h" +#include "include/istio/mixerclient/timer.h" #include "include/istio/utils/attribute_names.h" #include "include/istio/utils/local_attributes.h" #include "mixer/v1/config/client/client_config.pb.h" @@ -42,15 +43,15 @@ class ClientContextBase { : mixer_client_(std::move(mixer_client)), outbound_(outbound), local_attributes_(local_attributes), - network_fail_open_(false) {} + network_fail_open_(false), + retries_(0) {} // virtual destrutor virtual ~ClientContextBase() {} // Use mixer client object to make a Check call. - ::istio::mixerclient::CancelFunc SendCheck( - const ::istio::mixerclient::TransportCheckFunc& transport, - const ::istio::mixerclient::CheckDoneFunc& on_done, - ::istio::mixerclient::CheckContextSharedPtr& check_context); + void SendCheck(const ::istio::mixerclient::TransportCheckFunc& transport, + const ::istio::mixerclient::CheckDoneFunc& on_done, + ::istio::mixerclient::CheckContextSharedPtr& check_context); // Use mixer client object to make a Report call. void SendReport( @@ -66,6 +67,8 @@ class ClientContextBase { bool NetworkFailOpen() const { return network_fail_open_; } + uint32_t Retries() const { return retries_; } + private: // The mixer client object with check cache and report batch features. std::unique_ptr<::istio::mixerclient::MixerClient> mixer_client_; @@ -77,6 +80,7 @@ class ClientContextBase { ::istio::utils::LocalAttributes local_attributes_; bool network_fail_open_; + uint32_t retries_; }; } // namespace control diff --git a/src/istio/control/http/request_handler_impl.cc b/src/istio/control/http/request_handler_impl.cc index 73cc95dd8aa..a5111a01e4d 100644 --- a/src/istio/control/http/request_handler_impl.cc +++ b/src/istio/control/http/request_handler_impl.cc @@ -20,6 +20,7 @@ using ::google::protobuf::util::Status; using ::istio::mixerclient::CancelFunc; using ::istio::mixerclient::CheckDoneFunc; using ::istio::mixerclient::CheckResponseInfo; +using ::istio::mixerclient::TimerCreateFunc; using ::istio::mixerclient::TransportCheckFunc; using ::istio::quota_config::Requirement; @@ -31,6 +32,7 @@ RequestHandlerImpl::RequestHandlerImpl( std::shared_ptr service_context) : attributes_(new istio::mixerclient::SharedAttributes()), check_context_(new istio::mixerclient::CheckContext( + service_context->client_context()->Retries(), service_context->client_context()->NetworkFailOpen(), attributes_)), service_context_(service_context) {} @@ -61,10 +63,10 @@ void RequestHandlerImpl::AddCheckAttributes(CheckData* check_data) { } } -CancelFunc RequestHandlerImpl::Check(CheckData* check_data, - HeaderUpdate* header_update, - TransportCheckFunc transport, - CheckDoneFunc on_done) { +void RequestHandlerImpl::Check(CheckData* check_data, + HeaderUpdate* header_update, + const TransportCheckFunc& transport, + const CheckDoneFunc& on_done) { // Forwarded attributes need to be stored regardless Check is needed // or not since the header will be updated or removed. AddCheckAttributes(check_data); @@ -75,14 +77,26 @@ CancelFunc RequestHandlerImpl::Check(CheckData* check_data, if (!service_context_->enable_mixer_check()) { check_context_->setFinalStatus(Status::OK, false); on_done(*check_context_); - return nullptr; + return; } service_context_->AddQuotas(attributes_->attributes(), check_context_->quotaRequirements()); - return service_context_->client_context()->SendCheck(transport, on_done, - check_context_); + service_context_->client_context()->SendCheck(transport, on_done, + check_context_); +} + +void RequestHandlerImpl::ResetCancel() { + if (check_context_) { + check_context_->resetCancel(); + } +} + +void RequestHandlerImpl::CancelCheck() { + if (check_context_) { + check_context_->cancel(); + } } // Make remote report call. diff --git a/src/istio/control/http/request_handler_impl.h b/src/istio/control/http/request_handler_impl.h index 599f0660ee0..d454a4f505b 100644 --- a/src/istio/control/http/request_handler_impl.h +++ b/src/istio/control/http/request_handler_impl.h @@ -30,11 +30,16 @@ class RequestHandlerImpl : public RequestHandler { public: RequestHandlerImpl(std::shared_ptr service_context); + virtual ~RequestHandlerImpl() = default; + // Makes a Check call. - ::istio::mixerclient::CancelFunc Check( - CheckData* check_data, HeaderUpdate* header_update, - ::istio::mixerclient::TransportCheckFunc transport, - ::istio::mixerclient::CheckDoneFunc on_done) override; + void Check(CheckData* check_data, HeaderUpdate* header_update, + const ::istio::mixerclient::TransportCheckFunc& transport, + const ::istio::mixerclient::CheckDoneFunc& on_done) override; + + void ResetCancel() override; + + void CancelCheck() override; // Make a Report call. void Report(CheckData* check_data, ReportData* report_data) override; diff --git a/src/istio/control/http/request_handler_impl_test.cc b/src/istio/control/http/request_handler_impl_test.cc index d8ea8e30184..8c4e09cdcb6 100644 --- a/src/istio/control/http/request_handler_impl_test.cc +++ b/src/istio/control/http/request_handler_impl_test.cc @@ -28,6 +28,7 @@ using ::istio::mixer::v1::Attributes; using ::istio::mixer::v1::config::client::HttpClientConfig; using ::istio::mixer::v1::config::client::ServiceConfig; using ::istio::mixerclient::CancelFunc; +using ::istio::mixerclient::CheckContextSharedPtr; using ::istio::mixerclient::CheckDoneFunc; using ::istio::mixerclient::CheckResponseInfo; using ::istio::mixerclient::DoneFunc; @@ -253,13 +254,12 @@ TEST_F(RequestHandlerImplTest, TestPerRouteAttributes) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["global-key"].string_value(), "global-value"); EXPECT_EQ(map["per-route-key"].string_value(), "per-route-value"); - return nullptr; })); ServiceConfig config; @@ -280,13 +280,12 @@ TEST_F(RequestHandlerImplTest, TestDefaultRouteAttributes) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["global-key"].string_value(), "global-value"); EXPECT_EQ(map["route0-key"].string_value(), "route0-value"); - return nullptr; })); // Attribute is forwarded: route override @@ -318,13 +317,12 @@ TEST_F(RequestHandlerImplTest, TestRouteAttributes) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["global-key"].string_value(), "service-value"); EXPECT_EQ(map["route1-key"].string_value(), "route1-value"); - return nullptr; })); // Attribute is forwarded: global @@ -348,15 +346,14 @@ TEST_F(RequestHandlerImplTest, TestPerRouteQuota) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["global-key"].string_value(), "global-value"); EXPECT_EQ(context->quotaRequirements().size(), 1); EXPECT_EQ(context->quotaRequirements()[0].quota, "route0-quota"); EXPECT_EQ(context->quotaRequirements()[0].charge, 10); - return nullptr; })); ServiceConfig config; @@ -389,14 +386,13 @@ TEST_F(RequestHandlerImplTest, TestPerRouteApiSpec) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["global-key"].string_value(), "global-value"); EXPECT_EQ(map["api.name"].string_value(), "test-name"); EXPECT_EQ(map["api.operation"].string_value(), "test-method"); - return nullptr; })); ServiceConfig config; @@ -448,13 +444,12 @@ TEST_F(RequestHandlerImplTest, TestDefaultApiKey) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map[utils::AttributeName::kRequestApiKey].string_value(), "test-api-key"); - return nullptr; })); // destionation.server is empty, will use default one @@ -548,13 +543,12 @@ TEST_F(OutboundRequestHandlerImplTest, TestLocalAttributes) { ::testing::NiceMock mock_header; // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["source.uid"].string_value(), "kubernetes://src-client-84469dc8d7-jbbxt.default"); - return nullptr; })); ServiceConfig config; @@ -581,13 +575,12 @@ TEST_F(OutboundRequestHandlerImplTest, TestLocalAttributesOverride) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["source.uid"].string_value(), "fwded"); EXPECT_NE(map["destination.uid"].string_value(), "ignored"); - return nullptr; })); ServiceConfig config; diff --git a/src/istio/control/mock_mixer_client.h b/src/istio/control/mock_mixer_client.h index e94277490ba..93e54a0c74d 100644 --- a/src/istio/control/mock_mixer_client.h +++ b/src/istio/control/mock_mixer_client.h @@ -25,10 +25,10 @@ namespace control { // The mock object for MixerClient interface. class MockMixerClient : public ::istio::mixerclient::MixerClient { public: - MOCK_METHOD3(Check, ::istio::mixerclient::CancelFunc( - ::istio::mixerclient::CheckContextSharedPtr& context, - ::istio::mixerclient::TransportCheckFunc transport, - ::istio::mixerclient::CheckDoneFunc on_done)); + MOCK_METHOD3(Check, + void(::istio::mixerclient::CheckContextSharedPtr& context, + const ::istio::mixerclient::TransportCheckFunc& transport, + const ::istio::mixerclient::CheckDoneFunc& on_done)); MOCK_METHOD1( Report, void(const istio::mixerclient::SharedAttributesSharedPtr& attributes)); diff --git a/src/istio/control/tcp/request_handler_impl.cc b/src/istio/control/tcp/request_handler_impl.cc index eb74be8ccce..4abb3b70780 100644 --- a/src/istio/control/tcp/request_handler_impl.cc +++ b/src/istio/control/tcp/request_handler_impl.cc @@ -30,12 +30,13 @@ RequestHandlerImpl::RequestHandlerImpl( std::shared_ptr client_context) : attributes_(new istio::mixerclient::SharedAttributes()), check_context_(new istio::mixerclient::CheckContext( - client_context->NetworkFailOpen(), attributes_)), + client_context->Retries(), client_context->NetworkFailOpen(), + attributes_)), client_context_(client_context), last_report_info_{0ULL, 0ULL, std::chrono::nanoseconds::zero()} {} -CancelFunc RequestHandlerImpl::Check(CheckData* check_data, - CheckDoneFunc on_done) { +void RequestHandlerImpl::Check(CheckData* check_data, + const CheckDoneFunc& on_done) { if (client_context_->enable_mixer_check() || client_context_->enable_mixer_report()) { client_context_->AddStaticAttributes(attributes_->attributes()); @@ -47,13 +48,25 @@ CancelFunc RequestHandlerImpl::Check(CheckData* check_data, if (!client_context_->enable_mixer_check()) { check_context_->setFinalStatus(Status::OK, false); on_done(*check_context_); - return nullptr; + return; } client_context_->AddQuotas(attributes_->attributes(), check_context_->quotaRequirements()); - return client_context_->SendCheck(nullptr, on_done, check_context_); + client_context_->SendCheck(nullptr, on_done, check_context_); +} + +void RequestHandlerImpl::ResetCancel() { + if (check_context_) { + check_context_->resetCancel(); + } +} + +void RequestHandlerImpl::CancelCheck() { + if (check_context_) { + check_context_->cancel(); + } } void RequestHandlerImpl::Report(ReportData* report_data, diff --git a/src/istio/control/tcp/request_handler_impl.h b/src/istio/control/tcp/request_handler_impl.h index 57d72623d9e..c702e6a18e2 100644 --- a/src/istio/control/tcp/request_handler_impl.h +++ b/src/istio/control/tcp/request_handler_impl.h @@ -30,9 +30,12 @@ class RequestHandlerImpl : public RequestHandler { RequestHandlerImpl(std::shared_ptr client_context); // Make a Check call. - ::istio::mixerclient::CancelFunc Check( - CheckData* check_data, - ::istio::mixerclient::CheckDoneFunc on_done) override; + void Check(CheckData* check_data, + const ::istio::mixerclient::CheckDoneFunc& on_done) override; + + void ResetCancel() override; + + void CancelCheck() override; // Make a Report call. void Report(ReportData* report_data, diff --git a/src/istio/control/tcp/request_handler_impl_test.cc b/src/istio/control/tcp/request_handler_impl_test.cc index 40916270c91..9c882e166e7 100644 --- a/src/istio/control/tcp/request_handler_impl_test.cc +++ b/src/istio/control/tcp/request_handler_impl_test.cc @@ -26,6 +26,7 @@ using ::google::protobuf::util::Status; using ::istio::mixer::v1::Attributes; using ::istio::mixer::v1::config::client::TcpClientConfig; using ::istio::mixerclient::CancelFunc; +using ::istio::mixerclient::CheckContextSharedPtr; using ::istio::mixerclient::CheckDoneFunc; using ::istio::mixerclient::CheckResponseInfo; using ::istio::mixerclient::DoneFunc; @@ -124,15 +125,14 @@ TEST_F(RequestHandlerImplTest, TestHandlerCheck) { // Check should be called. EXPECT_CALL(*mock_client_, Check(_, _, _)) - .WillOnce(Invoke([](istio::mixerclient::CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) -> CancelFunc { + .WillOnce(Invoke([](CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { auto map = context->attributes()->attributes(); EXPECT_EQ(map["key1"].string_value(), "value1"); EXPECT_EQ(context->quotaRequirements().size(), 1); EXPECT_EQ(context->quotaRequirements()[0].quota, "quota"); EXPECT_EQ(context->quotaRequirements()[0].charge, 5); - return nullptr; })); auto handler = controller_->CreateRequestHandler(); diff --git a/src/istio/mixerclient/check_context.h b/src/istio/mixerclient/check_context.h index 0abff0d2d8e..38e36de0f14 100644 --- a/src/istio/mixerclient/check_context.h +++ b/src/istio/mixerclient/check_context.h @@ -18,6 +18,7 @@ #include "google/protobuf/arena.h" #include "google/protobuf/stubs/status.h" #include "include/istio/mixerclient/check_response.h" +#include "include/istio/mixerclient/environment.h" #include "include/istio/quota_config/requirement.h" #include "include/istio/utils/attribute_names.h" #include "include/istio/utils/attributes_builder.h" @@ -27,6 +28,7 @@ #include "src/istio/mixerclient/check_cache.h" #include "src/istio/mixerclient/quota_cache.h" #include "src/istio/mixerclient/shared_attributes.h" +#include "src/istio/utils/logger.h" #include @@ -39,8 +41,11 @@ namespace mixerclient { */ class CheckContext : public CheckResponseInfo { public: - CheckContext(bool fail_open, SharedAttributesSharedPtr& shared_attributes) - : shared_attributes_(shared_attributes), fail_open_(fail_open) {} + CheckContext(uint32_t retries, bool fail_open, + SharedAttributesSharedPtr& shared_attributes) + : shared_attributes_(shared_attributes), + fail_open_(fail_open), + max_retries_(retries) {} const istio::mixer::v1::Attributes* attributes() const { return shared_attributes_->attributes(); @@ -148,6 +153,38 @@ class CheckContext : public CheckResponseInfo { final_status_ = status; } + // + // Policy gRPC request attempt, retry, and cancellation + // + + bool retryable() const { return retry_attempts_ < max_retries_; } + + uint32_t retryAttempt() const { return retry_attempts_; } + + void retry(uint32_t retry_ms, std::unique_ptr timer) { + retry_attempts_++; + retry_timer_ = std::move(timer); + retry_timer_->Start(retry_ms); + } + + void cancel() { + if (cancel_func_) { + MIXER_DEBUG("Cancelling check call"); + cancel_func_(); + cancel_func_ = nullptr; + } + + if (retry_timer_) { + MIXER_DEBUG("Cancelling retry"); + retry_timer_->Stop(); + retry_timer_ = nullptr; + } + } + + void setCancel(CancelFunc cancel_func) { cancel_func_ = cancel_func; } + + void resetCancel() { cancel_func_ = nullptr; } + // // CheckResponseInfo (exposed to the top-level filter) // @@ -161,6 +198,9 @@ class CheckContext : public CheckResponseInfo { } private: + CheckContext(const CheckContext&) = delete; + void operator=(const CheckContext&) = delete; + istio::mixer::v1::CheckRequest* allocRequestOnce() { if (!request_) { request_ = google::protobuf::Arena::CreateMessage< @@ -186,6 +226,14 @@ class CheckContext : public CheckResponseInfo { bool remote_quota_check_required_{false}; google::protobuf::util::Status final_status_{ google::protobuf::util::Status::UNKNOWN}; + const uint32_t max_retries_; + uint32_t retry_attempts_{0}; + + // Calling this will cancel any currently outstanding gRPC request to mixer + // policy server. + CancelFunc cancel_func_{nullptr}; + + std::unique_ptr retry_timer_{nullptr}; }; typedef std::shared_ptr CheckContextSharedPtr; diff --git a/src/istio/mixerclient/client_impl.cc b/src/istio/mixerclient/client_impl.cc index 25d527a34ee..3a5e9a0b1b5 100644 --- a/src/istio/mixerclient/client_impl.cc +++ b/src/istio/mixerclient/client_impl.cc @@ -14,8 +14,11 @@ */ #include "src/istio/mixerclient/client_impl.h" #include +#include +#include #include "include/istio/mixerclient/check_response.h" #include "include/istio/utils/protobuf.h" +#include "src/istio/utils/logger.h" using ::google::protobuf::util::Status; using ::google::protobuf::util::error::Code; @@ -62,11 +65,12 @@ TransportResult TransportStatus(const Status &status) { MixerClientImpl::MixerClientImpl(const MixerClientOptions &options) : options_(options) { + timer_create_ = options.env.timer_create_func; check_cache_ = std::unique_ptr(new CheckCache(options.check_options)); report_batch_ = std::unique_ptr( new ReportBatch(options.report_options, options_.env.report_transport, - options.env.timer_create_func, compressor_)); + timer_create_, compressor_)); quota_cache_ = std::unique_ptr(new QuotaCache(options.quota_options)); @@ -77,9 +81,21 @@ MixerClientImpl::MixerClientImpl(const MixerClientOptions &options) MixerClientImpl::~MixerClientImpl() {} -CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, - TransportCheckFunc transport, - CheckDoneFunc on_done) { +uint32_t MixerClientImpl::RetryDelay(uint32_t retry_attempt) { + const uint32_t max_retry_ms = + std::min(options_.check_options.max_retry_ms, + options_.check_options.base_retry_ms * + static_cast(std::pow(2, retry_attempt))); + + std::uniform_int_distribution distribution( + options_.check_options.base_retry_ms, max_retry_ms); + + return distribution(rand_); +} + +void MixerClientImpl::Check(CheckContextSharedPtr &context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { // // Always check the policy cache // @@ -87,6 +103,10 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, context->checkPolicyCache(*check_cache_); ++total_check_calls_; + MIXER_DEBUG("Policy cache hit=%s, status=%s", + context->policyCacheHit() ? "true" : "false", + context->policyStatus().ToString().c_str()); + if (context->policyCacheHit()) { ++total_check_cache_hits_; @@ -97,7 +117,7 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, ++total_check_cache_hit_denies_; context->setFinalStatus(context->policyStatus()); on_done(*context); - return nullptr; + return; } // @@ -108,16 +128,23 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, if (!context->quotaCheckRequired()) { context->setFinalStatus(context->policyStatus()); on_done(*context); - return nullptr; + return; } } else { ++total_check_cache_misses_; } + bool remote_quota_prefetch{false}; + if (context->quotaCheckRequired()) { context->checkQuotaCache(*quota_cache_); ++total_quota_calls_; + MIXER_DEBUG("Quota cache hit=%s, status=%s, remote_call=%s", + context->quotaCacheHit() ? "true" : "false", + context->quotaStatus().ToString().c_str(), + context->remoteQuotaRequestRequired() ? "true" : "false"); + if (context->quotaCacheHit()) { ++total_quota_cache_hits_; if (context->quotaStatus().ok()) { @@ -135,11 +162,9 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, // context->setFinalStatus(context->quotaStatus()); on_done(*context); - on_done = nullptr; - if (!context->remoteQuotaRequestRequired()) { - return nullptr; - } else { - ++total_remote_quota_prefetch_calls_; + remote_quota_prefetch = context->remoteQuotaRequestRequired(); + if (!remote_quota_prefetch) { + return; } } } else { @@ -152,10 +177,6 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, compressor_, deduplication_id_base_ + std::to_string(deduplication_id_.fetch_add(1))); - if (!transport) { - transport = options_.env.check_transport; - } - // // Classify and track reason for remote request // @@ -170,82 +191,140 @@ CancelFunc MixerClientImpl::Check(CheckContextSharedPtr &context, ++total_remote_quota_calls_; } - return transport(context->request(), context->response(), - [this, context, on_done](const Status &status) { - // - // Classify and track transport errors - // - - TransportResult result = TransportStatus(status); - - switch (result) { - case TransportResult::SUCCESS: - ++total_remote_call_successes_; - break; - case TransportResult::RESPONSE_TIMEOUT: - ++total_remote_call_timeouts_; - break; - case TransportResult::SEND_ERROR: - ++total_remote_call_send_errors_; - break; - case TransportResult::OTHER: - ++total_remote_call_other_errors_; - break; - } - - // - // Update caches. This has the side-effect of updating - // status, so track those too - // - - if (!context->policyCacheHit()) { - context->updatePolicyCache(status, *context->response()); - - if (context->policyStatus().ok()) { - ++total_remote_check_accepts_; - } else { - ++total_remote_check_denies_; - } - } - - if (context->quotaCheckRequired()) { - context->updateQuotaCache(status, *context->response()); - - if (context->quotaStatus().ok()) { - ++total_remote_quota_accepts_; - } else { - ++total_remote_quota_denies_; - } - } - - // - // Determine final status for Filter::completeCheck(). This - // will send an error response to the downstream client if - // the final status is not Status::OK - // - - if (result != TransportResult::SUCCESS) { - if (context->networkFailOpen()) { - context->setFinalStatus(Status::OK); - } else { - context->setFinalStatus(status); - } - } else if (!context->quotaCheckRequired()) { - context->setFinalStatus(context->policyStatus()); - } else if (!context->policyStatus().ok()) { - context->setFinalStatus(context->policyStatus()); - } else { - context->setFinalStatus(context->quotaStatus()); - } - - if (on_done) { - on_done(*context); - } - - if (utils::InvalidDictionaryStatus(status)) { - compressor_.ShrinkGlobalDictionary(); - } - }); + if (remote_quota_prefetch) { + ++total_remote_quota_prefetch_calls_; + } + + RemoteCheck(context, transport ? transport : options_.env.check_transport, + remote_quota_prefetch ? nullptr : on_done); +} + +void MixerClientImpl::RemoteCheck(CheckContextSharedPtr context, + const TransportCheckFunc &transport, + const CheckDoneFunc &on_done) { + // + // This lambda and any lambdas it creates for retry will inc the ref count + // on the CheckContext shared pointer. + // + // The CheckDoneFunc is valid as long as the Filter object is valid. This + // has a lifespan similar to the CheckContext, but TODO(jblatt) it would be + // good to move this into the CheckContext anyways. + // + // The other captures (this/MixerClientImpl and TransportCheckFunc's + // references) have lifespans much greater than any individual transaction. + // + CancelFunc cancel_func = transport( + context->request(), context->response(), + [this, context, transport, on_done](const Status &status) { + context->resetCancel(); + + // + // Classify and track transport errors + // + + TransportResult result = TransportStatus(status); + + switch (result) { + case TransportResult::SUCCESS: + ++total_remote_call_successes_; + break; + case TransportResult::RESPONSE_TIMEOUT: + ++total_remote_call_timeouts_; + break; + case TransportResult::SEND_ERROR: + ++total_remote_call_send_errors_; + break; + case TransportResult::OTHER: + ++total_remote_call_other_errors_; + break; + } + + if (result != TransportResult::SUCCESS && context->retryable()) { + ++total_remote_call_retries_; + const uint32_t retry_ms = RetryDelay(context->retryAttempt()); + + MIXER_DEBUG("Retry %u in %u msec due to transport error=%s", + context->retryAttempt() + 1, retry_ms, + status.ToString().c_str()); + + context->retry(retry_ms, + timer_create_([this, context, transport, on_done]() { + RemoteCheck(context, transport, on_done); + })); + + return; + } + + // + // Update caches. This has the side-effect of updating + // status, so track those too + // + + if (!context->policyCacheHit()) { + context->updatePolicyCache(status, *context->response()); + + if (context->policyStatus().ok()) { + ++total_remote_check_accepts_; + } else { + ++total_remote_check_denies_; + } + } + + if (context->quotaCheckRequired()) { + context->updateQuotaCache(status, *context->response()); + + if (context->quotaStatus().ok()) { + ++total_remote_quota_accepts_; + } else { + ++total_remote_quota_denies_; + } + } + + MIXER_DEBUG( + "CheckResult transport=%s, policy=%s, quota=%s, attempt=%u", + status.ToString().c_str(), + result == TransportResult::SUCCESS + ? context->policyStatus().ToString().c_str() + : "NA", + result == TransportResult::SUCCESS && context->quotaCheckRequired() + ? context->policyStatus().ToString().c_str() + : "NA", + context->retryAttempt()); + + // + // Determine final status for Filter::completeCheck(). This + // will send an error response to the downstream client if + // the final status is not Status::OK + // + + if (result != TransportResult::SUCCESS) { + if (context->networkFailOpen()) { + context->setFinalStatus(Status::OK); + } else { + context->setFinalStatus(status); + } + } else if (!context->quotaCheckRequired()) { + context->setFinalStatus(context->policyStatus()); + } else if (!context->policyStatus().ok()) { + context->setFinalStatus(context->policyStatus()); + } else { + context->setFinalStatus(context->quotaStatus()); + } + + if (on_done) { + on_done(*context); + } + + if (utils::InvalidDictionaryStatus(status)) { + // TODO(jblatt) verify this is threadsafe + compressor_.ShrinkGlobalDictionary(); + } + }); + + context->setCancel([this, cancel_func]() { + ++total_remote_call_cancellations_; + cancel_func(); + }); } void MixerClientImpl::Report(const SharedAttributesSharedPtr &attributes) { diff --git a/src/istio/mixerclient/client_impl.h b/src/istio/mixerclient/client_impl.h index 35c1ff4b5d3..4fa76032d06 100644 --- a/src/istio/mixerclient/client_impl.h +++ b/src/istio/mixerclient/client_impl.h @@ -23,6 +23,10 @@ #include "src/istio/mixerclient/report_batch.h" #include +#include + +using ::istio::mixerclient::CheckContextSharedPtr; +using ::istio::mixerclient::SharedAttributesSharedPtr; namespace istio { namespace mixerclient { @@ -35,21 +39,29 @@ class MixerClientImpl : public MixerClient { // Destructor virtual ~MixerClientImpl(); - CancelFunc Check(istio::mixerclient::CheckContextSharedPtr& context, - TransportCheckFunc transport, - CheckDoneFunc on_done) override; + void Check(CheckContextSharedPtr& context, + const TransportCheckFunc& transport, + const CheckDoneFunc& on_done) override; void Report(const SharedAttributesSharedPtr& attributes) override; void GetStatistics(Statistics* stat) const override; private: + void RemoteCheck(CheckContextSharedPtr context, + const TransportCheckFunc& transport, + const CheckDoneFunc& on_done); + + uint32_t RetryDelay(uint32_t retry_attempt); + // Store the options MixerClientOptions options_; // To compress attributes. AttributeCompressor compressor_; + // timer create func + TimerCreateFunc timer_create_; // Cache for Check call. std::unique_ptr check_cache_; // Report batch. @@ -57,6 +69,9 @@ class MixerClientImpl : public MixerClient { // Cache for Quota call. std::unique_ptr quota_cache_; + // RNG for retry jitter + std::default_random_engine rand_; + // for deduplication_id std::string deduplication_id_base_; std::atomic deduplication_id_; diff --git a/src/istio/mixerclient/client_impl_test.cc b/src/istio/mixerclient/client_impl_test.cc index e34d1357f3f..befcf3d48fa 100644 --- a/src/istio/mixerclient/client_impl_test.cc +++ b/src/istio/mixerclient/client_impl_test.cc @@ -19,6 +19,7 @@ #include "include/istio/mixerclient/client.h" #include "include/istio/utils/attributes_builder.h" #include "src/istio/mixerclient/status_test_util.h" +#include "src/istio/utils/logger.h" using ::google::protobuf::util::Status; using ::google::protobuf::util::error::Code; @@ -128,11 +129,12 @@ class MixerClientImplTest : public ::testing::Test { } CheckContextSharedPtr CreateContext(int quota_request) { + uint32_t retries{0}; bool fail_open{false}; istio::mixerclient::SharedAttributesSharedPtr attributes{ new SharedAttributes()}; istio::mixerclient::CheckContextSharedPtr context{ - new CheckContext(fail_open, attributes)}; + new CheckContext(retries, fail_open, attributes)}; if (0 < quota_request) { context->quotaRequirements().push_back({kRequestCount, quota_request}); } diff --git a/src/istio/mixerclient/report_batch.cc b/src/istio/mixerclient/report_batch.cc index 73fc421a2ba..0b2ea360e67 100644 --- a/src/istio/mixerclient/report_batch.cc +++ b/src/istio/mixerclient/report_batch.cc @@ -72,6 +72,9 @@ void ReportBatch::FlushWithLock() { ++total_remote_report_calls_; auto request = batch_compressor_->Finish(); ReportResponse* response = new ReportResponse; + + // TODO(jblatt) should an async call be made while this lock is held? Can the + // request send block()? transport_(request, response, [this, response](const Status& status) { delete response; if (!status.ok()) { diff --git a/test/integration/BUILD b/test/integration/BUILD index 7abc7baf610..42dda5376c8 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -57,6 +57,20 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "mixer_fault_test", + srcs = [ + "mixer_fault_test.cc", + ], + repository = "@envoy", + deps = [ + "//include/istio/utils:attribute_names_header", + "//src/envoy/http/mixer:filter_lib", + "//src/envoy/utils:filter_names_lib", + ":int_client_server", + ], +) + envoy_cc_test( name = "int_client_server_test", srcs = [ diff --git a/test/integration/mixer_fault_test.cc b/test/integration/mixer_fault_test.cc new file mode 100644 index 00000000000..302e00e1117 --- /dev/null +++ b/test/integration/mixer_fault_test.cc @@ -0,0 +1,1217 @@ +/* Copyright 2019 Istio Authors. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "absl/strings/match.h" +#include "gtest/gtest.h" +#include "include/istio/mixerclient/options.h" +#include "int_client.h" +#include "int_server.h" +#include "mixer/v1/mixer.pb.h" +#include "test/integration/http_integration.h" +#include "test/test_common/network_utility.h" + +#define EXPECT_IN_RANGE(val, min, max) \ + EXPECT_LE(val, max); \ + EXPECT_GE(val, min) + +namespace Mixer { +namespace Integration { + +enum class NetworkFailPolicy { FAIL_OPEN = 0, FAIL_CLOSED = 1 }; + +inline static int networkFailPolicyToInt(NetworkFailPolicy policy) { + switch (policy) { + case NetworkFailPolicy::FAIL_OPEN: + return 0; + default: + return 1; + } +} + +class MixerFaultTest : public Envoy::HttpIntegrationTest, public testing::Test { + public: + MixerFaultTest() + : HttpIntegrationTest( + Envoy::Http::CodecClient::Type::HTTP1, + Envoy::Network::Address::IpVersion::v4, + std::make_unique()), + transport_socket_factory_(), + client_("client") { + Envoy::Http::CodecClient::Type origin_protocol = + Envoy::Http::CodecClient::Type::HTTP2; + setUpstreamProtocol(Envoy::Http::CodecClient::Type::HTTP2 == origin_protocol + ? Envoy::FakeHttpConnection::Type::HTTP2 + : Envoy::FakeHttpConnection::Type::HTTP1); + + // Tell the base class that we will create our own upstream origin server. + fake_upstreams_count_ = 0; + + origin_listeners_.emplace_back(new LocalListenSocket()); + origin_servers_.emplace_back( + new Server(fmt::sprintf("origin-0"), *origin_listeners_.back(), + transport_socket_factory_, origin_protocol)); + } + + virtual ~MixerFaultTest() {} + + // TODO modify BaseIntegrationTest in Envoy to eliminate this copy of the + // createEnvoy function. + virtual void createEnvoy() override { + std::vector ports; + + // TODO modify BaseIntegrationTest to add additional ports without having to + // make them fake upstreams + addPorts(ports); + + config_helper_.finalize(ports); + + // TODO modify BaseIntegrationTest use protected inheritance for + // Envoy::Logger::Loggable so tests can use ENVOY_LOG fprintf(stderr, + // "Running Envoy with configuration:\n%s", + // config_helper_.bootstrap().DebugString().c_str()); + + const std::string bootstrap_path = + Envoy::TestEnvironment::writeStringToFileForTest( + "bootstrap.json", Envoy::MessageUtil::getJsonStringFromMessage( + config_helper_.bootstrap())); + + std::vector named_ports; + const auto &static_resources = + config_helper_.bootstrap().static_resources(); + for (int i = 0; i < static_resources.listeners_size(); ++i) { + named_ports.push_back(static_resources.listeners(i).name()); + } + createGeneratedApiTestServer(bootstrap_path, named_ports); + } + + // Must be called before Envoy is stopped + void extractCounters(const std::string &prefix, + std::unordered_map &counters) { + for (auto counter : test_server_->stat_store().counters()) { + if (!absl::StartsWith(counter->name(), prefix)) { + continue; + } + + counters[counter->name()] = counter->value(); + } + } + + void dumpCounters(const std::unordered_map &counters) { + for (auto it : counters) { + std::cerr << it.first << " = " << it.second << std::endl; + } + } + + protected: + LoadGeneratorPtr startServers(NetworkFailPolicy fail_policy, + ServerCallbackHelper &origin_callbacks, + ClusterHelper &policy_cluster, + ClusterHelper &telemetry_cluster, + uint32_t retries = 0, + uint32_t base_retry_ms = 10, + uint32_t max_retry_ms = 100) { + for (size_t i = 0; i < origin_servers_.size(); ++i) { + origin_servers_[i]->start(origin_callbacks); + } + + for (size_t i = 0; i < policy_cluster.servers().size(); ++i) { + policy_listeners_.emplace_back(new LocalListenSocket()); + policy_servers_.emplace_back(new Server( + fmt::sprintf("policy-%d", i), *policy_listeners_.back(), + transport_socket_factory_, Envoy::Http::CodecClient::Type::HTTP2)); + policy_servers_.back()->start(*policy_cluster.servers()[i]); + } + + for (size_t i = 0; i < telemetry_cluster.servers().size(); ++i) { + telemetry_listeners_.emplace_back(new LocalListenSocket()); + telemetry_servers_.emplace_back(new Server( + fmt::sprintf("telemetry-%d", i), *telemetry_listeners_.back(), + transport_socket_factory_, Envoy::Http::CodecClient::Type::HTTP2)); + telemetry_servers_.back()->start(*telemetry_cluster.servers()[i]); + } + + std::string telemetry_name("telemetry-backend"); + std::string policy_name("policy-backend"); + + addNodeMetadata(); + configureMixerFilter(fail_policy, policy_name, telemetry_name, retries, + base_retry_ms, max_retry_ms); + addCluster(telemetry_name, telemetry_listeners_); + addCluster(policy_name, policy_listeners_); + + // This calls createEnvoy() (see below) and then starts envoy + HttpIntegrationTest::initialize(); + + auto addr = Envoy::Network::Utility::parseInternetAddress( + "127.0.0.1", static_cast(lookupPort("http"))); + return std::make_unique(client_, transport_socket_factory_, + HttpVersion::HTTP1, addr); + } + + private: + void addPorts(std::vector &ports) { + // origin must come first. The order of the rest depends on the order their + // cluster was added to the config. + for (size_t i = 0; i < origin_listeners_.size(); ++i) { + ports.push_back(origin_listeners_[i]->localAddress()->ip()->port()); + } + + for (size_t i = 0; i < telemetry_listeners_.size(); ++i) { + ports.push_back(telemetry_listeners_[i]->localAddress()->ip()->port()); + } + + for (size_t i = 0; i < policy_listeners_.size(); ++i) { + ports.push_back(policy_listeners_[i]->localAddress()->ip()->port()); + } + } + + void addNodeMetadata() { + config_helper_.addConfigModifier( + [](envoy::config::bootstrap::v2::Bootstrap &bootstrap) { + ::google::protobuf::Struct meta; + + Envoy::MessageUtil::loadFromJson(R"({ + "ISTIO_VERSION": "1.0.1", + "NODE_UID": "pod", + "NODE_NAMESPACE": "kubernetes://dest.pod" + })", + meta); + + bootstrap.mutable_node()->mutable_metadata()->MergeFrom(meta); + }); + } + + void configureMixerFilter(NetworkFailPolicy fail_policy, + const std::string &policy_name, + const std::string &telemetry_name, uint32_t retries, + uint32_t base_retry_ms, uint32_t max_retry_ms) { + const uint32_t base_retry_sec = base_retry_ms / 1000; + const uint32_t base_retry_nanos = base_retry_sec % 1000 * 1'000'000; + const uint32_t max_retry_sec = max_retry_ms / 1000; + const uint32_t max_retry_nanos = max_retry_sec % 1000 * 1'000'000; + constexpr char sourceUID[] = "kubernetes://src.pod"; + + std::string mixer_conf{fmt::sprintf( + R"EOF( + name: mixer + config: + defaultDestinationService: "default" + mixerAttributes: + attributes: {} + serviceConfigs: { + "default": {} + } + transport: + attributes_for_mixer_proxy: + attributes: { + "source.uid": { + string_value: %s + } + } + network_fail_policy: { + policy: %d, + max_retry: %u, + base_retry_wait: { + seconds: %u, + nanos: %u + }, + max_retry_wait: { + seconds: %u, + nanos: %u + } + } + stats_update_interval: { + seconds: %u, + nanos: %u + } + report_cluster: %s + check_cluster: %s + )EOF", + sourceUID, networkFailPolicyToInt(fail_policy), retries, base_retry_sec, + base_retry_nanos, max_retry_sec, max_retry_nanos, 0U, 1'000'000, + telemetry_name.c_str(), policy_name.c_str())}; + config_helper_.addFilter(mixer_conf); + } + + void addCluster( + const std::string &name, + const std::vector &listeners) { + constexpr uint32_t max_uint32 = + 2147483647U; // protobuf max, not language max + + // See + // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/cds.proto#cluster + + // TODO something in the base class clobbers the connection timeout here + std::string cluster_conf{fmt::sprintf(R"EOF( + name: %s + type: STATIC + lb_policy: ROUND_ROBIN + http2_protocol_options: { + max_concurrent_streams: %u + } + connect_timeout: 1s + max_requests_per_connection: %u + hosts: + )EOF", + name.c_str(), max_uint32, + max_uint32)}; + + for (size_t i = 0; i < listeners.size(); ++i) { + cluster_conf.append({fmt::sprintf( + R"EOF( + - socket_address: + address: %s + port_value: %d + )EOF", + Envoy::Network::Test::getLoopbackAddressString(version_), + listeners[i]->localAddress()->ip()->port())}); + } + + config_helper_.addConfigModifier( + [cluster_conf](envoy::config::bootstrap::v2::Bootstrap &bootstrap) { + bootstrap.mutable_static_resources()->add_clusters()->CopyFrom( + Envoy::TestUtility::parseYaml( + cluster_conf)); + }); + } + + Envoy::Network::RawBufferSocketFactory transport_socket_factory_; + Client client_; + std::vector origin_listeners_; + std::vector policy_listeners_; + std::vector telemetry_listeners_; + // These three vectors could store Server directly if + // Envoy::Stats::IsolatedStoreImpl was made movable. + std::vector origin_servers_; + std::vector policy_servers_; + std::vector telemetry_servers_; + Envoy::Network::Address::InstanceConstSharedPtr + envoy_address_; // at most 1 envoy +}; + +TEST_F(MixerFaultTest, HappyPath) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + // Send a gRPC success response immediately to every policy check + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + ClusterHelper telemetry_cluster( + {new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + // Send a gRPC success response immediately to every telemetry report. + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = startServers(fail_policy, origin_callbacks, + policy_cluster, telemetry_cluster); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request)); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(client->class2xxResponses(), requests_to_send); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // assert that the origin request callback is called for every client request + // sent + EXPECT_EQ(origin_callbacks.requestsReceived(), requests_to_send); + + // assert that the policy request callback is called for every client request + // sent + EXPECT_EQ(policy_cluster.requestsReceived(), requests_to_send); +} + +TEST_F(MixerFaultTest, FailClosedAndClosePolicySocketAfterAccept) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Setup + // + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// Policy server immediately closes any connection accepted. + new ServerCallbackHelper( + [](ServerConnection &, ServerStream &, + Envoy::Http::HeaderMapPtr &&) { + GTEST_FATAL_FAILURE_( + "Connections immediately closed so no response should be " + "received"); + }, + [](ServerConnection &) -> ServerCallbackResult { + return ServerCallbackResult::CLOSE; + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry server sends a gRPC success response immediately to every + // telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = startServers(fail_policy, origin_callbacks, + policy_cluster, telemetry_cluster); + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request)); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 5xx class + EXPECT_EQ(0, client->class2xxResponses()); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(requests_to_send, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // Origin server should see no requests since the mixer filter is configured + // to fail closed. + EXPECT_EQ(0, origin_callbacks.requestsReceived()); + + // Policy server accept callback is called for every client connection + // initiated. + EXPECT_GE(policy_cluster.connectionsAccepted(), connections_to_initiate); + // Policy server request callback is never called + EXPECT_EQ(0, policy_cluster.requestsReceived()); + // Policy server closes every connection + EXPECT_EQ(policy_cluster.connectionsAccepted(), policy_cluster.localCloses()); + EXPECT_EQ(0, policy_cluster.remoteCloses()); +} + +TEST_F(MixerFaultTest, FailClosedAndSendPolicyResponseSlowly) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30 * 30; + constexpr uint32_t requests_to_send = 1 * connections_to_initiate; + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// Send a gRPC success response after 60 seconds to every policy check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(60'000)); + })}); + + ClusterHelper telemetry_cluster( + {// Sends a gRPC success response immediately to every telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = startServers(fail_policy, origin_callbacks, + policy_cluster, telemetry_cluster); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + + client->run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(10'000)); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 5xx class + EXPECT_EQ(0, client->class2xxResponses()); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(requests_to_send, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // Origin server should see no requests since the mixer filter is configured + // to fail closed. + EXPECT_EQ(0, origin_callbacks.requestsReceived()); + + // Policy server accept callback is called at least once (h2 socket reuse + // means may only be called once) + EXPECT_GE(policy_cluster.connectionsAccepted(), 1); + // Policy server request callback sees every policy check + EXPECT_EQ(requests_to_send, policy_cluster.requestsReceived()); + // Policy server closes every connection + EXPECT_EQ(policy_cluster.connectionsAccepted(), + policy_cluster.localCloses() + policy_cluster.remoteCloses()); +} + +TEST_F(MixerFaultTest, TolerateTelemetryBlackhole) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + // Over provision the policy cluster to reduce the change it becomes a source + // of error + + ClusterHelper policy_cluster( + {// Send a gRPC success response immediately to every policy check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + }), + // Send a gRPC success response immediately to every policy check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + }), + // Send a gRPC success response immediately to every policy check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry receives the telemetry report requests but never sends a + // response. + new ServerCallbackHelper([](ServerConnection &, ServerStream &, + Envoy::Http::HeaderMapPtr &&) { + // eat the request and do nothing + })}); + + LoadGeneratorPtr client = startServers(fail_policy, origin_callbacks, + policy_cluster, telemetry_cluster); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(10'000)); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(client->class2xxResponses(), requests_to_send); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // assert that the origin request callback is called for every client request + // sent + EXPECT_EQ(origin_callbacks.requestsReceived(), requests_to_send); + + // Policy server accept callback is called at least once (h2 socket reuse + // means may only be called once) + EXPECT_GE(policy_cluster.connectionsAccepted(), 1); + // Policy server request callback sees every policy check + EXPECT_EQ(requests_to_send, policy_cluster.requestsReceived()); + // Policy server closes every connection + EXPECT_EQ(policy_cluster.connectionsAccepted(), + policy_cluster.localCloses() + policy_cluster.remoteCloses()); + + // Telemetry server accept callback is called at least once (h2 socket reuse + // means may only be called once) + EXPECT_GE(telemetry_cluster.connectionsAccepted(), 1); +} + +TEST_F(MixerFaultTest, FailOpenAndSendPolicyResponseSlowly) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_OPEN; + constexpr uint32_t connections_to_initiate = 30 * 30; + constexpr uint32_t requests_to_send = 1 * connections_to_initiate; + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// Policy server sends a gRPC success response after 60 seconds to every + // policy check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(60'000)); + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry server sends a gRPC success response immediately to every + // telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = startServers(fail_policy, origin_callbacks, + policy_cluster, telemetry_cluster); + + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + + client->run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(10'000)); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(client->class2xxResponses(), requests_to_send); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // Origin server should see every requests since the mixer filter is + // configured to fail open. + EXPECT_EQ(origin_callbacks.requestsReceived(), requests_to_send); + + // Policy server accept callback is called at least once (h2 socket reuse + // means may only be called once) + EXPECT_GE(policy_cluster.connectionsAccepted(), 1); + // Policy server request callback sees every policy check + EXPECT_EQ(requests_to_send, policy_cluster.requestsReceived()); + // Policy server closes every connection + EXPECT_EQ(policy_cluster.connectionsAccepted(), + policy_cluster.localCloses() + policy_cluster.remoteCloses()); +} + +TEST_F(MixerFaultTest, RetryOnTransportError) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + uint32_t retries = 10; + uint32_t base_retry_ms = 1; + uint32_t max_retry_ms = 10; + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Setup + // + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// One policy server immediately closes any connection accepted. + new ServerCallbackHelper( + [](ServerConnection &, ServerStream &, + Envoy::Http::HeaderMapPtr &&) { + GTEST_FATAL_FAILURE_( + "Connections immediately closed so no response should be " + "received"); + }, + [](ServerConnection &) -> ServerCallbackResult { + return ServerCallbackResult::CLOSE; + }), + // Two other policy servers immediately send gRPC OK responses + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + }), + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry server sends a gRPC success response immediately to every + // telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = + startServers(fail_policy, origin_callbacks, policy_cluster, + telemetry_cluster, retries, base_retry_ms, max_retry_ms); + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request)); + + std::unordered_map counters; + extractCounters("http_mixer_filter", counters); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client response callback is called for every request sent + EXPECT_EQ(client->responsesReceived(), requests_to_send); + // Every response was a 2xx class + EXPECT_EQ(client->class2xxResponses(), requests_to_send); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + EXPECT_EQ(0, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // assert that the origin request callback is called for every client request + // sent + EXPECT_EQ(origin_callbacks.requestsReceived(), requests_to_send); + + // assert that the policy request callback is called for every client request + // sent + EXPECT_EQ(policy_cluster.requestsReceived(), requests_to_send); + + // Assertions against the mixer filter's internal counters. + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_other_errors"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_retries"], + requests_to_send / 2 - requests_to_send / 10, + requests_to_send / 2 + requests_to_send / 10); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hits"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_cancellations"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_misses"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_calls"], requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hits"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_successes"], + requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_timeouts"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_send_errors"], + requests_to_send / 2 - requests_to_send / 10, + requests_to_send / 2 + requests_to_send / 10); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_misses"], + requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_quota_calls"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_report_calls"], 0, + counters["http_mixer_filter.total_report_calls"] * 0.12); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_prefetch_calls"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_calls"], + requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_report_calls"], requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_calls"], requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_calls"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_accepts"], + requests_to_send); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_denies"], 0); +} + +TEST_F(MixerFaultTest, CancelCheck) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + uint32_t retries = 10; + uint32_t base_retry_ms = 1; + uint32_t max_retry_ms = 10; + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Setup + // + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// One policy server immediately closes any connection accepted. + new ServerCallbackHelper( + [](ServerConnection &, ServerStream &, + Envoy::Http::HeaderMapPtr &&) { + GTEST_FATAL_FAILURE_( + "Connections immediately closed so no response should be " + "received"); + }, + [](ServerConnection &) -> ServerCallbackResult { + return ServerCallbackResult::CLOSE; + }), + // One policy server is really slow - client will timeout first and + // cancel check + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(60'000)); + }), + // One policy server is nice and zippy + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::CheckResponse response; + response.mutable_precondition()->mutable_status()->set_code( + google::protobuf::util::error::Code::OK); + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry server sends a gRPC success response immediately to every + // telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = + startServers(fail_policy, origin_callbacks, policy_cluster, + telemetry_cluster, retries, base_retry_ms, max_retry_ms); + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(5'000)); + + std::unordered_map counters; + extractCounters("http_mixer_filter", counters); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Not all responses are received due to timeouts + EXPECT_LE(client->responsesReceived(), requests_to_send); + EXPECT_GE(client->responsesReceived(), 1); + // Every response was a 2xx class + EXPECT_EQ(client->class2xxResponses(), client->responsesReceived()); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + // Or a timeout. Implementational artifact: timeouts kill the connection and + // new connections are not created to take their place. + EXPECT_EQ(connections_to_initiate, client->responseTimeouts()); + // No client sockets are rudely closed by server. They timeout instead. + EXPECT_EQ(0, client->remoteCloses()); + + // assert that the origin request callback is called for every response + // received by the client. + EXPECT_GE(origin_callbacks.requestsReceived(), client->responsesReceived()); + + // assert that the policy request callback is called for every response + // received by the client. + EXPECT_GE(policy_cluster.requestsReceived(), client->responsesReceived()); + + // Assertions against the mixer filter's internal counters. Many of these + // assertions rely on an implementational artifact of the load generator + // client - when a request is cancelled due to timeout the connection is + // closed. With enough retries every connection we create will be closed due + // to cancellation/timeout. + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_other_errors"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_retries"], + connections_to_initiate / 2, 2 * connections_to_initiate); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hits"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_cancellations"], + connections_to_initiate * 0.8, connections_to_initiate); + EXPECT_GE(counters["http_mixer_filter.total_remote_calls"], + connections_to_initiate); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_misses"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hits"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_successes"], + connections_to_initiate / 2, 2 * connections_to_initiate); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_timeouts"], 0, + connections_to_initiate); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_call_send_errors"], + counters["http_mixer_filter.total_remote_calls"] / 4, + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_misses"], + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_quota_calls"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_report_calls"], 0, + counters["http_mixer_filter.total_report_calls"] * 0.12); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_prefetch_calls"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_calls"], + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_report_calls"], + counters["http_mixer_filter.total_remote_calls"] * 0.75, + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_calls"], + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_calls"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_check_accepts"], + counters["http_mixer_filter.total_remote_calls"] / 4, + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_denies"], 0); +} + +TEST_F(MixerFaultTest, CancelRetry) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::err); + + // Force client timeout while requests are waiting between retries. + uint32_t retries = 1; + uint32_t base_retry_ms = 10'000; + uint32_t max_retry_ms = 10'000; + constexpr NetworkFailPolicy fail_policy = NetworkFailPolicy::FAIL_CLOSED; + constexpr uint32_t connections_to_initiate = 30; + constexpr uint32_t requests_to_send = 30 * connections_to_initiate; + + // + // Setup + // + + // Origin server immediately sends a simple 200 OK to every request + ServerCallbackHelper origin_callbacks; + + ClusterHelper policy_cluster( + {// One policy server immediately closes any connection accepted. + new ServerCallbackHelper( + [](ServerConnection &, ServerStream &, + Envoy::Http::HeaderMapPtr &&) { + GTEST_FATAL_FAILURE_( + "Connections immediately closed so no response should be " + "received"); + }, + [](ServerConnection &) -> ServerCallbackResult { + return ServerCallbackResult::CLOSE; + })}); + + ClusterHelper telemetry_cluster( + {// Telemetry server sends a gRPC success response immediately to every + // telemetry report. + new ServerCallbackHelper([](ServerConnection &, ServerStream &stream, + Envoy::Http::HeaderMapPtr &&) { + ::istio::mixer::v1::ReportResponse response; + stream.sendGrpcResponse(Envoy::Grpc::Status::Ok, response, + std::chrono::milliseconds(0)); + })}); + + LoadGeneratorPtr client = + startServers(fail_policy, origin_callbacks, policy_cluster, + telemetry_cluster, retries, base_retry_ms, max_retry_ms); + // + // Exec test and wait for it to finish + // + + Envoy::Http::HeaderMapPtr request{ + new Envoy::Http::TestHeaderMapImpl{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "host"}}}; + client->run(connections_to_initiate, requests_to_send, std::move(request), + std::chrono::milliseconds(500)); + + std::unordered_map counters; + extractCounters("http_mixer_filter", counters); + + // shutdown envoy by destroying it + test_server_ = nullptr; + // wait until the upstreams have closed all connections they accepted. + // shutting down envoy should close them all + origin_callbacks.wait(); + policy_cluster.wait(); + telemetry_cluster.wait(); + + // + // Evaluate test + // + + // All client connections are successfully established. + EXPECT_EQ(client->connectSuccesses(), connections_to_initiate); + EXPECT_EQ(0, client->connectFailures()); + // Client close callback called for every client connection. + EXPECT_EQ(client->localCloses(), connections_to_initiate); + // Client doesn't receive any responses + EXPECT_EQ(0, client->responsesReceived()); + EXPECT_EQ(0, client->class2xxResponses()); + EXPECT_EQ(0, client->class4xxResponses()); + EXPECT_EQ(0, client->class5xxResponses()); + // All requests timeout. Implementational artifact: timeouts kill the + // connection and new connections are not created to take their place. + EXPECT_EQ(connections_to_initiate, client->responseTimeouts()); + // No client sockets are rudely closed by server / no client sockets are + // reset. + EXPECT_EQ(0, client->remoteCloses()); + + // The origin server receives no requests + EXPECT_EQ(0, origin_callbacks.requestsReceived()); + + // The policy server receives no requests + EXPECT_EQ(0, policy_cluster.requestsReceived()); + + // Assertions against the mixer filter's internal counters. Many of these + // assertions rely on an implementational artifact of the load generator + // client - when a request is cancelled due to timeout the connection is + // closed. With enough retries every connection we create will be closed due + // to cancellation/timeout. + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_other_errors"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_retries"], + connections_to_initiate); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hits"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_cancellations"], 0); + EXPECT_GE(counters["http_mixer_filter.total_remote_calls"], + connections_to_initiate); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_misses"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hits"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_successes"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_timeouts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_call_send_errors"], + connections_to_initiate); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_misses"], + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_quota_calls"], 0); + EXPECT_IN_RANGE(counters["http_mixer_filter.total_remote_report_calls"], 0, + counters["http_mixer_filter.total_report_calls"] * 0.12); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_prefetch_calls"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_calls"], + counters["http_mixer_filter.total_remote_calls"]); + // TODO(jblatt) report calls are not made if client disconnects first. Bug: + EXPECT_IN_RANGE(counters["http_mixer_filter.total_report_calls"], 0, + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_denies"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_check_calls"], + counters["http_mixer_filter.total_remote_calls"]); + EXPECT_EQ(counters["http_mixer_filter.total_check_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_quota_calls"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_remote_check_accepts"], 0); + EXPECT_EQ(counters["http_mixer_filter.total_quota_cache_hit_denies"], 0); +} + +} // namespace Integration +} // namespace Mixer