diff --git a/api/envoy/api/v2/eds.proto b/api/envoy/api/v2/eds.proto index 54f9d08c6f843..2f8fd7a4186dd 100644 --- a/api/envoy/api/v2/eds.proto +++ b/api/envoy/api/v2/eds.proto @@ -17,6 +17,7 @@ import "google/api/annotations.proto"; import "validate/validate.proto"; import "gogoproto/gogo.proto"; import "google/protobuf/wrappers.proto"; +import "google/protobuf/duration.proto"; option (gogoproto.equal_all) = true; option (gogoproto.stable_marshaler_all) = true; @@ -107,6 +108,12 @@ message ClusterLoadAssignment { // Read more at :ref:`priority levels ` and // :ref:`localities `. google.protobuf.UInt32Value overprovisioning_factor = 3 [(validate.rules).uint32.gt = 0]; + + // The max time until which the endpoints from this assignment can be used. + // If no new assignments are received before this time expires the endpoints + // are considered stale and should be marked unhealthy. + // Defaults to 0 which means endpoints never go stale. + google.protobuf.Duration endpoint_stale_after = 4 [(validate.rules).duration.gt.seconds = 0]; } // Load balancing policy settings. diff --git a/docs/root/configuration/cluster_manager/cluster_stats.rst b/docs/root/configuration/cluster_manager/cluster_stats.rst index f881e8963ccdd..b5b6554be7b63 100644 --- a/docs/root/configuration/cluster_manager/cluster_stats.rst +++ b/docs/root/configuration/cluster_manager/cluster_stats.rst @@ -95,6 +95,8 @@ Every cluster has a statistics tree rooted at *cluster..* with the followi version, Gauge, Hash of the contents from the last successful API fetch max_host_weight, Gauge, Maximum weight of any host in the cluster bind_errors, Counter, Total errors binding the socket to the configured source address + assignment_timeout_received, Counter, Total assignments received with endpoint lease information. + assignment_stale, Counter, Number of times the received assignments went stale before new assignments arrived. Health check statistics ----------------------- diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 93dcb931cc0e7..5ed2c7233f2ed 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -5,6 +5,7 @@ Version history ================ * dubbo_proxy: support the :ref:`Dubbo proxy filter `. * event: added :ref:`loop duration and poll delay statistics `. +* eds: added support to specify max time for which endpoints can be used :ref:`gRPC filter `. * http: mitigated a race condition with the :ref:`delayed_close_timeout` where it could trigger while actively flushing a pending write buffer for a downstream connection. * redis: added :ref:`prefix routing ` to enable routing commands based on their key's prefix to different upstream. * redis: add support for zpopmax and zpopmin commands. diff --git a/include/envoy/upstream/upstream.h b/include/envoy/upstream/upstream.h index 214096d79390e..2553833559c8a 100644 --- a/include/envoy/upstream/upstream.h +++ b/include/envoy/upstream/upstream.h @@ -515,6 +515,8 @@ class PrioritySet { COUNTER (update_failure) \ COUNTER (update_empty) \ COUNTER (update_no_rebuild) \ + COUNTER (assignment_timeout_received) \ + COUNTER (assignment_stale) \ GAUGE (version) // clang-format on diff --git a/source/common/upstream/eds.cc b/source/common/upstream/eds.cc index 7735114e50665..98a9654e838a5 100644 --- a/source/common/upstream/eds.cc +++ b/source/common/upstream/eds.cc @@ -2,6 +2,7 @@ #include "envoy/api/v2/eds.pb.validate.h" +#include "common/common/utility.h" #include "common/config/subscription_factory.h" namespace Envoy { @@ -18,11 +19,11 @@ EdsClusterImpl::EdsClusterImpl( ? cluster.name() : cluster.eds_cluster_config().service_name()) { Config::Utility::checkLocalInfo("eds", local_info_); - - const auto& eds_config = cluster.eds_cluster_config().eds_config(); Event::Dispatcher& dispatcher = factory_context.dispatcher(); Runtime::RandomGenerator& random = factory_context.random(); Upstream::ClusterManager& cm = factory_context.clusterManager(); + assignment_timeout_ = dispatcher.createTimer([this]() -> void { onAssignmentTimeout(); }); + const auto& eds_config = cluster.eds_cluster_config().eds_config(); subscription_ = Config::SubscriptionFactory::subscriptionFromConfigSource( eds_config, local_info_, dispatcher, cm, random, info_->statsScope(), "envoy.api.v2.EndpointDiscoveryService.FetchEndpoints", @@ -118,10 +119,37 @@ void EdsClusterImpl::onConfigUpdate(const Protobuf::RepeatedPtrFieldenabled()) { + assignment_timeout_->disableTimer(); + } + // Check if endpoint_stale_after is set. + const uint64_t stale_after_ms = + PROTOBUF_GET_MS_OR_DEFAULT(cluster_load_assignment.policy(), endpoint_stale_after, 0); + if (stale_after_ms > 0) { + // Stat to track how often we receive valid assignment_timeout in response. + info_->stats().assignment_timeout_received_.inc(); + assignment_timeout_->enableTimer(std::chrono::milliseconds(stale_after_ms)); + } + BatchUpdateHelper helper(*this, cluster_load_assignment); priority_set_.batchHostUpdate(helper); } +void EdsClusterImpl::onAssignmentTimeout() { + // We can no longer use the assignments, remove them. + // TODO(vishalpowar) This is not going to work for incremental updates, and we + // need to instead change the health status to indicate the assignments are + // stale. + Protobuf::RepeatedPtrField resources; + envoy::api::v2::ClusterLoadAssignment resource; + resource.set_cluster_name(cluster_name_); + resources.Add()->PackFrom(resource); + onConfigUpdate(resources, ""); + // Stat to track how often we end up with stale assignments. + info_->stats().assignment_stale_.inc(); +} + bool EdsClusterImpl::updateHostsPerLocality( const uint32_t priority, const uint32_t overprovisioning_factor, const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map, LocalityWeightsMap& new_locality_weights_map, diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index 2194ef2d22344..b2b3139031dae 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -53,6 +53,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba // ClusterImplBase void startPreInit() override; + void onAssignmentTimeout(); class BatchUpdateHelper : public PrioritySet::BatchUpdateCb { public: @@ -74,6 +75,7 @@ class EdsClusterImpl : public BaseDynamicClusterImpl, Config::SubscriptionCallba const std::string cluster_name_; std::vector locality_weights_map_; HostMap all_hosts_; + Event::TimerPtr assignment_timeout_; }; class EdsClusterFactory : public ClusterFactoryImplBase { diff --git a/test/common/upstream/eds_test.cc b/test/common/upstream/eds_test.cc index abd69b656e4ef..2ffdb3f991ca3 100644 --- a/test/common/upstream/eds_test.cc +++ b/test/common/upstream/eds_test.cc @@ -21,6 +21,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::AtLeast; using testing::Return; using testing::ReturnRef; @@ -1430,6 +1431,107 @@ TEST_F(EdsTest, MalformedIP) { "setting cluster type to 'STRICT_DNS' or 'LOGICAL_DNS'"); } +class EdsAssignmentTimeoutTest : public EdsTest { +public: + EdsAssignmentTimeoutTest() : EdsTest(), interval_timer_(nullptr) { + EXPECT_CALL(dispatcher_, createTimer_(_)) + .WillOnce(Invoke([this](Event::TimerCb cb) { + timer_cb_ = cb; + EXPECT_EQ(nullptr, interval_timer_); + interval_timer_ = new Event::MockTimer(); + return interval_timer_; + })) + .WillRepeatedly(Invoke([](Event::TimerCb) { return new Event::MockTimer(); })); + + resetCluster(); + } + + Event::MockTimer* interval_timer_; + Event::TimerCb timer_cb_; +}; + +// Test that assignment timeout is enabled and disabled correctly. +TEST_F(EdsAssignmentTimeoutTest, AssignmentTimeoutEnableDisable) { + envoy::api::v2::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + auto* endpoints = cluster_load_assignment.add_endpoints(); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(80); + + envoy::api::v2::ClusterLoadAssignment cluster_load_assignment_lease = cluster_load_assignment; + cluster_load_assignment_lease.mutable_policy()->mutable_endpoint_stale_after()->MergeFrom( + Protobuf::util::TimeUtil::SecondsToDuration(1)); + + EXPECT_CALL(*interval_timer_, enableTimer(_)).Times(2); // Timer enabled twice. + EXPECT_CALL(*interval_timer_, disableTimer()).Times(1); // Timer disabled once. + EXPECT_CALL(*interval_timer_, enabled()).Times(6); // Includes calls by test. + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment_lease); + // Check that the timer is enabled. + EXPECT_EQ(interval_timer_->enabled(), true); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + // Check that the timer is disabled. + EXPECT_EQ(interval_timer_->enabled(), false); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment_lease); + // Check that the timer is enabled. + EXPECT_EQ(interval_timer_->enabled(), true); +} + +// Test that assignment timeout is called and removes all the endpoints. +TEST_F(EdsAssignmentTimeoutTest, AssignmentLeaseExpired) { + envoy::api::v2::ClusterLoadAssignment cluster_load_assignment; + cluster_load_assignment.set_cluster_name("fare"); + cluster_load_assignment.mutable_policy()->mutable_endpoint_stale_after()->MergeFrom( + Protobuf::util::TimeUtil::SecondsToDuration(1)); + + auto health_checker = std::make_shared(); + EXPECT_CALL(*health_checker, start()); + EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2); + cluster_->setHealthChecker(health_checker); + + auto add_endpoint = [&cluster_load_assignment](int port) { + auto* endpoints = cluster_load_assignment.add_endpoints(); + + auto* socket_address = endpoints->add_lb_endpoints() + ->mutable_endpoint() + ->mutable_address() + ->mutable_socket_address(); + socket_address->set_address("1.2.3.4"); + socket_address->set_port_value(port); + }; + + // Add two endpoints to the cluster assignment. + add_endpoint(80); + add_endpoint(81); + + // Expect the timer to be enabled once. + EXPECT_CALL(*interval_timer_, enableTimer(std::chrono::milliseconds(1000))); + // Expect the timer to be disabled when stale assignments are removed. + EXPECT_CALL(*interval_timer_, disableTimer()); + EXPECT_CALL(*interval_timer_, enabled()).Times(2); + doOnConfigUpdateVerifyNoThrow(cluster_load_assignment); + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 2); + } + // Call the timer callback to indicate timeout. + timer_cb_(); + // Test that stale endpoints are removed. + { + auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts(); + EXPECT_EQ(hosts.size(), 0); + } +} + } // namespace } // namespace Upstream } // namespace Envoy diff --git a/test/integration/stats_integration_test.cc b/test/integration/stats_integration_test.cc index db0c1845b3afd..d1a3d302294db 100644 --- a/test/integration/stats_integration_test.cc +++ b/test/integration/stats_integration_test.cc @@ -195,8 +195,8 @@ TEST_P(ClusterMemoryTestRunner, MemoryLargeClusterSizeWithStats) { EXPECT_LT(start_mem, m1); EXPECT_LT(start_mem, m1001); - // As of 2019/03/20, m_per_cluster = 59015 (libstdc++) - EXPECT_LT(m_per_cluster, 59100); + // As of 2019/04/12, m_per_cluster = 59576 (libstdc++) + EXPECT_LT(m_per_cluster, 59600); } } // namespace