diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 237f9ed5b10ac..830432731df93 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -27,6 +27,7 @@ Removed Config or Runtime New Features ------------ * grpc: implemented header value syntax support when defining :ref:`initial metadata ` for gRPC-based `ext_authz` :ref:`HTTP ` and :ref:`network ` filters, and :ref:`ratelimit ` filters. +* hds: added support for delta updates in the :ref:`HealthCheckSpecifier `, making only the Endpoints and Health Checkers that changed be reconstructed on receiving a new message, rather than the entire HDS. * health_check: added option to use :ref:`no_traffic_healthy_interval ` which allows a different no traffic interval when the host is healthy. * mongo_proxy: the list of commands to produce metrics for is now :ref:`configurable `. * ratelimit: added :ref:`disable_x_envoy_ratelimited_header ` option to disable `X-Envoy-RateLimited` header. diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index 2d0fb940cf003..9219e59d768df 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -202,12 +202,23 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "locality_endpoint_lib", + hdrs = ["locality_endpoint.h"], + deps = [ + "//source/common/protobuf:utility_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", + ], +) + envoy_cc_library( name = "health_discovery_service_lib", srcs = ["health_discovery_service.cc"], hdrs = ["health_discovery_service.h"], deps = [ ":health_checker_lib", + ":locality_endpoint_lib", ":upstream_includes", "//include/envoy/api:api_interface", "//include/envoy/event:dispatcher_interface", diff --git a/source/common/upstream/health_checker_base_impl.h b/source/common/upstream/health_checker_base_impl.h index b773ced03a376..c1e4bb7affff1 100644 --- a/source/common/upstream/health_checker_base_impl.h +++ b/source/common/upstream/health_checker_base_impl.h @@ -37,6 +37,23 @@ struct HealthCheckerStats { ALL_HEALTH_CHECKER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) }; +/** + * HealthCheckerHash and HealthCheckerEqualTo are used to allow the HealthCheck proto to be used as + * a flat_hash_map key. + */ +struct HealthCheckerHash { + size_t operator()(const envoy::config::core::v3::HealthCheck& health_check) const { + return MessageUtil::hash(health_check); + } +}; + +struct HealthCheckerEqualTo { + bool operator()(const envoy::config::core::v3::HealthCheck& lhs, + const envoy::config::core::v3::HealthCheck& rhs) const { + return Protobuf::util::MessageDifferencer::Equals(lhs, rhs); + } +}; + /** * Base implementation for all health checkers. */ diff --git a/source/common/upstream/health_discovery_service.cc b/source/common/upstream/health_discovery_service.cc index 0d0d5e600635c..12c92979f418e 100644 --- a/source/common/upstream/health_discovery_service.cc +++ b/source/common/upstream/health_discovery_service.cc @@ -46,8 +46,8 @@ HdsDelegate::HdsDelegate(Stats::Scope& scope, Grpc::RawAsyncClientPtr async_clie dispatcher_(dispatcher), runtime_(runtime), store_stats_(stats), ssl_context_manager_(ssl_context_manager), info_factory_(info_factory), access_log_manager_(access_log_manager), cm_(cm), local_info_(local_info), admin_(admin), - singleton_manager_(singleton_manager), tls_(tls), validation_visitor_(validation_visitor), - api_(api) { + singleton_manager_(singleton_manager), tls_(tls), specifier_hash_(0), + validation_visitor_(validation_visitor), api_(api) { health_check_request_.mutable_health_check_request()->mutable_node()->MergeFrom( local_info_.node()); backoff_strategy_ = std::make_unique( @@ -99,7 +99,6 @@ void HdsDelegate::handleFailure() { setHdsRetryTimer(); } -// TODO(lilika): Add support for the same endpoint in different clusters/ports envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse HdsDelegate::sendResponse() { envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse response; @@ -165,68 +164,139 @@ void HdsDelegate::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata UNREFERENCED_PARAMETER(metadata); } +envoy::config::cluster::v3::Cluster HdsDelegate::createClusterConfig( + const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check) { + // Create HdsCluster config + envoy::config::cluster::v3::Cluster cluster_config; + + cluster_config.set_name(cluster_health_check.cluster_name()); + cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds); + cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value( + ClusterConnectionBufferLimitBytes); + + // Add endpoints to cluster + for (const auto& locality_endpoints : cluster_health_check.locality_endpoints()) { + // add endpoint group by locality to config + auto* endpoints = cluster_config.mutable_load_assignment()->add_endpoints(); + // if this group contains locality information, save it. + if (locality_endpoints.has_locality()) { + endpoints->mutable_locality()->MergeFrom(locality_endpoints.locality()); + } + + // add all endpoints for this locality group to the config + for (const auto& endpoint : locality_endpoints.endpoints()) { + endpoints->add_lb_endpoints()->mutable_endpoint()->mutable_address()->MergeFrom( + endpoint.address()); + } + } + + // TODO(lilika): Add support for optional per-endpoint health checks + + // Add healthchecks to cluster + for (auto& health_check : cluster_health_check.health_checks()) { + cluster_config.add_health_checks()->MergeFrom(health_check); + } + + // Add transport_socket_match to cluster for use in host connections. + cluster_config.mutable_transport_socket_matches()->MergeFrom( + cluster_health_check.transport_socket_matches()); + + ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString()); + + return cluster_config; +} + +void HdsDelegate::updateHdsCluster(HdsClusterPtr cluster, + const envoy::config::cluster::v3::Cluster& cluster_config) { + cluster->update(admin_, cluster_config, info_factory_, cm_, local_info_, dispatcher_, + singleton_manager_, tls_, validation_visitor_, api_, access_log_manager_, + runtime_); +} + +HdsClusterPtr +HdsDelegate::createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_config) { + static const envoy::config::core::v3::BindConfig bind_config; + + // Create HdsCluster. + auto new_cluster = std::make_shared( + admin_, runtime_, std::move(cluster_config), bind_config, store_stats_, ssl_context_manager_, + false, info_factory_, cm_, local_info_, dispatcher_, singleton_manager_, tls_, + validation_visitor_, api_); + + // Begin HCs in the background. + new_cluster->initialize([] {}); + new_cluster->initHealthchecks(access_log_manager_, runtime_, dispatcher_, api_); + + return new_cluster; +} + void HdsDelegate::processMessage( std::unique_ptr&& message) { ENVOY_LOG(debug, "New health check response message {} ", message->DebugString()); ASSERT(message); + std::vector hds_clusters; + // Maps to replace the current member variable versions. + absl::flat_hash_map new_hds_clusters_name_map; for (const auto& cluster_health_check : message->cluster_health_checks()) { - // Create HdsCluster config - static const envoy::config::core::v3::BindConfig bind_config; - envoy::config::cluster::v3::Cluster cluster_config; - - cluster_config.set_name(cluster_health_check.cluster_name()); - cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds); - cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value( - ClusterConnectionBufferLimitBytes); - - // Add endpoints to cluster - for (const auto& locality_endpoints : cluster_health_check.locality_endpoints()) { - // add endpoint group by locality to config - auto* endpoints = cluster_config.mutable_load_assignment()->add_endpoints(); - // if this group contains locality information, save it. - if (locality_endpoints.has_locality()) { - endpoints->mutable_locality()->MergeFrom(locality_endpoints.locality()); + if (!new_hds_clusters_name_map.contains(cluster_health_check.cluster_name())) { + HdsClusterPtr cluster_ptr; + + // Create a new configuration for a cluster based on our different or new config. + auto cluster_config = createClusterConfig(cluster_health_check); + + // If this particular cluster configuration happens to have a name, then it is possible + // this particular cluster exists in the name map. We check and if we found a match, + // attempt to update this cluster. If no match was found, either the cluster name is empty + // or we have not seen a cluster by this name before. In either case, create a new cluster. + auto cluster_map_pair = hds_clusters_name_map_.find(cluster_health_check.cluster_name()); + if (cluster_map_pair != hds_clusters_name_map_.end()) { + // We have a previous cluster with this name, update. + cluster_ptr = cluster_map_pair->second; + updateHdsCluster(cluster_ptr, cluster_config); + } else { + // There is no cluster with this name previously or its an empty string, so just create a + // new cluster. + cluster_ptr = createHdsCluster(cluster_config); } - // add all endpoints for this locality group to the config - for (const auto& endpoint : locality_endpoints.endpoints()) { - endpoints->add_lb_endpoints()->mutable_endpoint()->mutable_address()->MergeFrom( - endpoint.address()); + // If this cluster does not have a name, do not add it to the name map since cluster_name is + // an optional field, and reconstruct these clusters on every update. + if (!cluster_health_check.cluster_name().empty()) { + // Since this cluster has a name, add it to our by-name map so we can update it later. + new_hds_clusters_name_map.insert({cluster_health_check.cluster_name(), cluster_ptr}); + } else { + ENVOY_LOG(warn, + "HDS Cluster has no cluster_name, it will be recreated instead of updated on " + "every reconfiguration."); } - } - - // TODO(lilika): Add support for optional per-endpoint health checks - // Add healthchecks to cluster - for (auto& health_check : cluster_health_check.health_checks()) { - cluster_config.add_health_checks()->MergeFrom(health_check); + // Add this cluster to the flat list for health checking. + hds_clusters.push_back(cluster_ptr); + } else { + ENVOY_LOG(warn, "An HDS Cluster with this cluster_name has already been added, not using."); } + } - // Add transport_socket_match to cluster for use in host connections. - cluster_config.mutable_transport_socket_matches()->MergeFrom( - cluster_health_check.transport_socket_matches()); - - ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString()); - - // Create HdsCluster - hds_clusters_.emplace_back(new HdsCluster(admin_, runtime_, std::move(cluster_config), - bind_config, store_stats_, ssl_context_manager_, - false, info_factory_, cm_, local_info_, dispatcher_, - singleton_manager_, tls_, validation_visitor_, api_)); - hds_clusters_.back()->initialize([] {}); + // Overwrite our map data structures. + hds_clusters_name_map_ = std::move(new_hds_clusters_name_map); + hds_clusters_ = std::move(hds_clusters); - hds_clusters_.back()->startHealthchecks(access_log_manager_, runtime_, dispatcher_, api_); - } + // TODO: add stats reporting for number of clusters added, removed, and reused. } -// TODO(lilika): Add support for subsequent HealthCheckSpecifier messages that -// might modify the HdsClusters void HdsDelegate::onReceiveMessage( std::unique_ptr&& message) { stats_.requests_.inc(); ENVOY_LOG(debug, "New health check response message {} ", message->DebugString()); + const uint64_t hash = MessageUtil::hash(*message); + + if (hash == specifier_hash_) { + ENVOY_LOG(debug, "New health check specifier is unchanged, no action taken."); + return; + } + // Validate message fields try { MessageUtil::validate(*message, validation_visitor_); @@ -239,15 +309,17 @@ void HdsDelegate::onReceiveMessage( return; } - // Reset - hds_clusters_.clear(); - // Set response auto server_response_ms = PROTOBUF_GET_MS_OR_DEFAULT(*message, interval, 1000); // Process the HealthCheckSpecifier message. processMessage(std::move(message)); + stats_.updates_.inc(); + + // Update the stored hash. + specifier_hash_ = hash; + if (server_response_ms_ != server_response_ms) { server_response_ms_ = server_response_ms; setHdsStreamResponseTimer(); @@ -276,9 +348,12 @@ HdsCluster::HdsCluster(Server::Admin& admin, Runtime::Loader& runtime, ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api) : runtime_(runtime), cluster_(std::move(cluster)), bind_config_(bind_config), stats_(stats), ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api), - initial_hosts_(new HostVector()), validation_visitor_(validation_visitor) { + hosts_(new HostVector()), validation_visitor_(validation_visitor) { ENVOY_LOG(debug, "Creating an HdsCluster"); priority_set_.getOrCreateHostSet(0); + // Set initial hashes for possible delta updates. + config_hash_ = MessageUtil::hash(cluster_); + socket_match_hash_ = RepeatedPtrUtil::hash(cluster_.transport_socket_matches()); info_ = info_factory.createClusterInfo( {admin, runtime_, cluster_, bind_config_, stats_, ssl_context_manager_, added_via_api_, cm, @@ -296,6 +371,7 @@ HdsCluster::HdsCluster(Server::Admin& admin, Runtime::Loader& runtime, hosts_by_locality.back().reserve(locality_endpoints.lb_endpoints_size()); for (const auto& host : locality_endpoints.lb_endpoints()) { + const LocalityEndpointTuple endpoint_key = {locality_endpoints.locality(), host}; // Initialize an endpoint host object. HostSharedPtr endpoint = std::make_shared( info_, "", Network::Address::resolveProtoAddress(host.endpoint().address()), nullptr, 1, @@ -303,17 +379,154 @@ HdsCluster::HdsCluster(Server::Admin& admin, Runtime::Loader& runtime, envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0, envoy::config::core::v3::UNKNOWN); // Add this host/endpoint pointer to our flat list of endpoints for health checking. - initial_hosts_->push_back(endpoint); + hosts_->push_back(endpoint); // Add this host/endpoint pointer to our structured list by locality so results can be // requested by locality. hosts_by_locality.back().push_back(endpoint); + // Add this host/endpoint pointer to our map so we can rebuild this later. + hosts_map_.insert({endpoint_key, endpoint}); } } // Create the HostsPerLocality. - initial_hosts_per_locality_ = + hosts_per_locality_ = std::make_shared(std::move(hosts_by_locality), false); } +void HdsCluster::update(Server::Admin& admin, envoy::config::cluster::v3::Cluster cluster, + ClusterInfoFactory& info_factory, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Singleton::Manager& singleton_manager, ThreadLocal::SlotAllocator& tls, + ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, + AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime) { + + // check to see if the config changed. If it did, update. + const uint64_t config_hash = MessageUtil::hash(cluster); + if (config_hash_ != config_hash) { + config_hash_ = config_hash; + cluster_ = std::move(cluster); + + // Check to see if our list of socket matches have changed. If they have, create a new matcher + // in info_. + bool update_cluster_info = false; + const uint64_t socket_match_hash = RepeatedPtrUtil::hash(cluster_.transport_socket_matches()); + if (socket_match_hash_ != socket_match_hash) { + socket_match_hash_ = socket_match_hash; + update_cluster_info = true; + info_ = info_factory.createClusterInfo( + {admin, runtime_, cluster_, bind_config_, stats_, ssl_context_manager_, added_via_api_, + cm, local_info, dispatcher, singleton_manager, tls, validation_visitor, api}); + } + + // Check to see if anything in the endpoints list has changed. + updateHosts(cluster_.load_assignment().endpoints(), update_cluster_info); + + // Check to see if any of the health checkers have changed. + updateHealthchecks(cluster_.health_checks(), access_log_manager, runtime, dispatcher, api); + } +} + +void HdsCluster::updateHealthchecks( + const Protobuf::RepeatedPtrField& health_checks, + AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime, + Event::Dispatcher& dispatcher, Api::Api& api) { + std::vector health_checkers; + HealthCheckerMap health_checkers_map; + + for (const auto& health_check : health_checks) { + // Check to see if this exact same health_check config already has a health checker. + auto health_checker = health_checkers_map_.find(health_check); + if (health_checker != health_checkers_map_.end()) { + // If it does, use it. + health_checkers_map.insert({health_check, health_checker->second}); + health_checkers.push_back(health_checker->second); + } else { + // If it does not, create a new one. + auto new_health_checker = Upstream::HealthCheckerFactory::create( + health_check, *this, runtime, dispatcher, access_log_manager, validation_visitor_, api); + health_checkers_map.insert({health_check, new_health_checker}); + health_checkers.push_back(new_health_checker); + + // Start these health checks now because upstream assumes they already have been started. + new_health_checker->start(); + } + } + + // replace our member data structures with our newly created ones. + health_checkers_ = std::move(health_checkers); + health_checkers_map_ = std::move(health_checkers_map); + + // TODO: add stats reporting for number of health checkers added, removed, and reused. +} + +void HdsCluster::updateHosts( + const Protobuf::RepeatedPtrField& + locality_endpoints, + bool update_cluster_info) { + // Create the data structures needed for PrioritySet::update. + HostVectorSharedPtr hosts = std::make_shared>(); + std::vector hosts_added; + std::vector hosts_removed; + std::vector hosts_by_locality; + + // Use for delta update comparison. + HostsMap hosts_map; + + for (auto& endpoints : locality_endpoints) { + hosts_by_locality.emplace_back(); + for (auto& endpoint : endpoints.lb_endpoints()) { + LocalityEndpointTuple endpoint_key = {endpoints.locality(), endpoint}; + + // Check to see if this exact Locality+Endpoint has been seen before. + // Also, if we made changes to our info, re-create all endpoints. + auto host_pair = hosts_map_.find(endpoint_key); + HostSharedPtr host; + if (!update_cluster_info && host_pair != hosts_map_.end()) { + // If we have this exact pair, save the shared pointer. + host = host_pair->second; + } else { + // We do not have this endpoint saved, so create a new one. + host = std::make_shared( + info_, "", Network::Address::resolveProtoAddress(endpoint.endpoint().address()), + nullptr, 1, endpoints.locality(), + envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0, + envoy::config::core::v3::UNKNOWN); + + // Set the initial health status as in HdsCluster::initialize. + host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + + // Add to our hosts added list and save the shared pointer. + hosts_added.push_back(host); + } + + // No matter if it is reused or new, always add to these data structures. + hosts_by_locality.back().push_back(host); + hosts->push_back(host); + hosts_map.insert({endpoint_key, host}); + } + } + + // Compare the old map to the new to find out which endpoints are going to be removed. + for (auto& host_pair : hosts_map_) { + if (!hosts_map.contains(host_pair.first)) { + hosts_removed.push_back(host_pair.second); + } + } + + // Update the member data structures. + hosts_ = std::move(hosts); + hosts_map_ = std::move(hosts_map); + + // TODO: add stats reporting for number of endpoints added, removed, and reused. + ENVOY_LOG(debug, "Hosts Added: {}, Removed: {}, Reused: {}", hosts_added.size(), + hosts_removed.size(), hosts_->size() - hosts_added.size()); + + // Update the priority set. + hosts_per_locality_ = + std::make_shared(std::move(hosts_by_locality), false); + priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {}, + hosts_added, hosts_removed, absl::nullopt); +} + ClusterSharedPtr HdsCluster::create() { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } ClusterInfoConstSharedPtr @@ -337,25 +550,34 @@ ProdClusterInfoFactory::createClusterInfo(const CreateClusterInfoParams& params) params.added_via_api_, factory_context); } -void HdsCluster::startHealthchecks(AccessLog::AccessLogManager& access_log_manager, - Runtime::Loader& runtime, Event::Dispatcher& dispatcher, - Api::Api& api) { +void HdsCluster::initHealthchecks(AccessLog::AccessLogManager& access_log_manager, + Runtime::Loader& runtime, Event::Dispatcher& dispatcher, + Api::Api& api) { for (auto& health_check : cluster_.health_checks()) { - health_checkers_.push_back(Upstream::HealthCheckerFactory::create( - health_check, *this, runtime, dispatcher, access_log_manager, validation_visitor_, api)); - health_checkers_.back()->start(); + auto health_checker = Upstream::HealthCheckerFactory::create( + health_check, *this, runtime, dispatcher, access_log_manager, validation_visitor_, api); + + health_checkers_.push_back(health_checker); + health_checkers_map_.insert({health_check, health_checker}); + health_checker->start(); } } void HdsCluster::initialize(std::function callback) { initialization_complete_callback_ = callback; - for (const auto& host : *initial_hosts_) { - host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + + // If this function gets called again we do not want to touch the priority set again with the + // initial hosts, because the hosts may have changed. + if (!initialized_) { + for (const auto& host : *hosts_) { + host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); + } + // Use the ungrouped and grouped hosts lists to retain locality structure in the priority set. + priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {}, + *hosts_, {}, absl::nullopt); + + initialized_ = true; } - // Use the ungrouped and grouped hosts lists to retain locality structure in the priority set. - priority_set_.updateHosts( - 0, HostSetImpl::partitionHosts(initial_hosts_, initial_hosts_per_locality_), {}, - *initial_hosts_, {}, absl::nullopt); } void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) { diff --git a/source/common/upstream/health_discovery_service.h b/source/common/upstream/health_discovery_service.h index 3818a80a3f064..a3ecbb7c428e2 100644 --- a/source/common/upstream/health_discovery_service.h +++ b/source/common/upstream/health_discovery_service.h @@ -18,15 +18,24 @@ #include "common/grpc/async_client_impl.h" #include "common/network/resolver_impl.h" #include "common/upstream/health_checker_impl.h" +#include "common/upstream/locality_endpoint.h" #include "common/upstream/upstream_impl.h" #include "server/transport_socket_config_impl.h" #include "extensions/transport_sockets/well_known_names.h" +#include "absl/container/flat_hash_map.h" + namespace Envoy { namespace Upstream { +using HostsMap = absl::flat_hash_map; +using HealthCheckerMap = + absl::flat_hash_map; + class ProdClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable { public: ClusterInfoConstSharedPtr createClusterInfo(const CreateClusterInfoParams& params) override; @@ -60,12 +69,19 @@ class HdsCluster : public Cluster, Logger::Loggable { Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); } const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); } void initialize(std::function callback) override; - - // Creates and starts healthcheckers to its endpoints - void startHealthchecks(AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime, - Event::Dispatcher& dispatcher, Api::Api& api); + // Compare changes in the cluster proto, and update parts of the cluster as needed. + void update(Server::Admin& admin, envoy::config::cluster::v3::Cluster cluster, + ClusterInfoFactory& info_factory, ClusterManager& cm, + const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, + Singleton::Manager& singleton_manager, ThreadLocal::SlotAllocator& tls, + ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, + AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime); + // Creates healthcheckers and adds them to the list, then does initial start. + void initHealthchecks(AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime, + Event::Dispatcher& dispatcher, Api::Api& api); std::vector healthCheckers() { return health_checkers_; }; + std::vector hosts() { return *hosts_; }; protected: PrioritySetImpl priority_set_; @@ -76,17 +92,31 @@ class HdsCluster : public Cluster, Logger::Loggable { std::function initialization_complete_callback_; Runtime::Loader& runtime_; - const envoy::config::cluster::v3::Cluster cluster_; + envoy::config::cluster::v3::Cluster cluster_; const envoy::config::core::v3::BindConfig& bind_config_; Stats::Store& stats_; Ssl::ContextManager& ssl_context_manager_; bool added_via_api_; + bool initialized_ = false; + uint64_t config_hash_; + uint64_t socket_match_hash_; - HostVectorSharedPtr initial_hosts_; - HostsPerLocalitySharedPtr initial_hosts_per_locality_; + HostVectorSharedPtr hosts_; + HostsPerLocalitySharedPtr hosts_per_locality_; + HostsMap hosts_map_; ClusterInfoConstSharedPtr info_; std::vector health_checkers_; + HealthCheckerMap health_checkers_map_; ProtobufMessage::ValidationVisitor& validation_visitor_; + + void updateHealthchecks( + const Protobuf::RepeatedPtrField& health_checks, + AccessLog::AccessLogManager& access_log_manager, Runtime::Loader& runtime, + Event::Dispatcher& dispatcher, Api::Api& api); + void + updateHosts(const Protobuf::RepeatedPtrField& + locality_endpoints, + bool update_socket_matches); }; using HdsClusterPtr = std::shared_ptr; @@ -97,7 +127,8 @@ using HdsClusterPtr = std::shared_ptr; #define ALL_HDS_STATS(COUNTER) \ COUNTER(requests) \ COUNTER(responses) \ - COUNTER(errors) + COUNTER(errors) \ + COUNTER(updates) /** * Struct definition for all hds stats. @see stats_macros.h @@ -145,7 +176,11 @@ class HdsDelegate : Grpc::AsyncStreamCallbacks&& message); - + envoy::config::cluster::v3::Cluster + createClusterConfig(const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check); + void updateHdsCluster(HdsClusterPtr cluster, + const envoy::config::cluster::v3::Cluster& cluster_health_check); + HdsClusterPtr createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_health_check); HdsDelegateStats stats_; const Protobuf::MethodDescriptor& service_method_; @@ -168,10 +203,11 @@ class HdsDelegate : Grpc::AsyncStreamCallbacks health_check_message_; + uint64_t specifier_hash_; std::vector clusters_; std::vector hds_clusters_; + absl::flat_hash_map hds_clusters_name_map_; Event::TimerPtr hds_stream_response_timer_; Event::TimerPtr hds_retry_timer_; diff --git a/source/common/upstream/locality_endpoint.h b/source/common/upstream/locality_endpoint.h new file mode 100644 index 0000000000000..a928cbde3ad0b --- /dev/null +++ b/source/common/upstream/locality_endpoint.h @@ -0,0 +1,29 @@ +#pragma once + +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/config/endpoint/v3/endpoint_components.pb.h" + +#include "common/protobuf/utility.h" + +namespace Envoy { +namespace Upstream { + +using LocalityEndpointTuple = std::tuple; +struct LocalityEndpointHash { + size_t operator()(const LocalityEndpointTuple& values) const { + const auto locality_hash = MessageUtil::hash(std::get<0>(values)); + const auto endpoint_hash = MessageUtil::hash(std::get<1>(values)); + return locality_hash ^ endpoint_hash; + } +}; + +struct LocalityEndpointEqualTo { + bool operator()(const LocalityEndpointTuple& lhs, const LocalityEndpointTuple& rhs) const { + return Protobuf::util::MessageDifferencer::Equals(std::get<0>(lhs), std::get<0>(rhs)) && + Protobuf::util::MessageDifferencer::Equals(std::get<1>(lhs), std::get<1>(rhs)); + } +}; + +} // namespace Upstream +} // namespace Envoy diff --git a/test/common/upstream/hds_test.cc b/test/common/upstream/hds_test.cc index f2012e3d31d72..09804ccbb3ab6 100644 --- a/test/common/upstream/hds_test.cc +++ b/test/common/upstream/hds_test.cc @@ -67,6 +67,15 @@ class HdsTest : public testing::Test { node_.set_id("hds-node"); } + // Checks if the cluster counters are correct + void checkHdsCounters(int requests, int responses, int errors, int updates) { + auto stats = hds_delegate_friend_.getStats(*hds_delegate_); + EXPECT_EQ(requests, stats.requests_.value()); + EXPECT_LE(responses, stats.responses_.value()); + EXPECT_EQ(errors, stats.errors_.value()); + EXPECT_EQ(updates, stats.updates_.value()); + } + // Creates an HdsDelegate void createHdsDelegate() { InSequence s; @@ -91,6 +100,23 @@ class HdsTest : public testing::Test { singleton_manager_, tls_, validation_visitor_, *api_); } + void expectCreateClientConnection() { + // Create a new mock connection for each call to createClientConnection. + EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)) + .WillRepeatedly(Invoke( + [](Network::Address::InstanceConstSharedPtr, Network::Address::InstanceConstSharedPtr, + Network::TransportSocketPtr&, const Network::ConnectionSocket::OptionsSharedPtr&) { + Network::MockClientConnection* connection = + new NiceMock(); + + // pretend our endpoint was connected to. + connection->raiseEvent(Network::ConnectionEvent::Connected); + + // return this new, connected endpoint. + return connection; + })); + } + // Creates a HealthCheckSpecifier message that contains one endpoint and one // healthcheck envoy::service::health::v3::HealthCheckSpecifier* createSimpleMessage() { @@ -175,6 +201,34 @@ class HdsTest : public testing::Test { return msg; } + void + addTransportSocketMatches(envoy::service::health::v3::ClusterHealthCheck* cluster_health_check, + std::string match, std::string criteria) { + // Add transport socket matches to specified cluster and its first health check. + const std::string match_yaml = absl::StrFormat( + R"EOF( +transport_socket_matches: +- name: "test_socket" + match: + %s: "true" + transport_socket: + name: "envoy.transport_sockets.raw_buffer" +)EOF", + match); + cluster_health_check->MergeFrom( + TestUtility::parseYaml(match_yaml)); + + // Add transport socket match criteria to our health check, for filtering matches. + const std::string criteria_yaml = absl::StrFormat( + R"EOF( +transport_socket_match_criteria: + %s: "true" +)EOF", + criteria); + cluster_health_check->mutable_health_checks(0)->MergeFrom( + TestUtility::parseYaml(criteria_yaml)); + } + Event::SimulatedTimeSystem time_system_; envoy::config::core::v3::Node node_; Event::MockDispatcher dispatcher_; @@ -402,19 +456,7 @@ TEST_F(HdsTest, TestSendResponseMultipleEndpoints) { // Create a new active connection on request, setting its status to connected // to mock a found endpoint. - EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)) - .WillRepeatedly(Invoke( - [](Network::Address::InstanceConstSharedPtr, Network::Address::InstanceConstSharedPtr, - Network::TransportSocketPtr&, const Network::ConnectionSocket::OptionsSharedPtr&) { - Network::MockClientConnection* connection = - new NiceMock(); - - // pretend our endpoint was connected to. - connection->raiseEvent(Network::ConnectionEvent::Connected); - - // return this new, connected endpoint. - return connection; - })); + expectCreateClientConnection(); EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(2); EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); @@ -494,31 +536,9 @@ TEST_F(HdsTest, TestSocketContext) { EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); createHdsDelegate(); - // Create Message. + // Create Message with transport sockets. message.reset(createSimpleMessage()); - - // Add transport socket matches to message. - const std::string match_yaml = absl::StrFormat( - R"EOF( -transport_socket_matches: -- name: "test_socket" - match: - test_match: "true" - transport_socket: - name: "envoy.transport_sockets.raw_buffer" -)EOF"); - auto* cluster_health_check = message->mutable_cluster_health_checks(0); - cluster_health_check->MergeFrom( - TestUtility::parseYaml(match_yaml)); - - // Add transport socket match criteria to our health check, for filtering matches. - const std::string criteria_yaml = absl::StrFormat( - R"EOF( -transport_socket_match_criteria: - test_match: "true" -)EOF"); - cluster_health_check->mutable_health_checks(0)->MergeFrom( - TestUtility::parseYaml(criteria_yaml)); + addTransportSocketMatches(message->mutable_cluster_health_checks(0), "test_match", "test_match"); Network::MockClientConnection* connection = new NiceMock(); EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillRepeatedly(Return(connection)); @@ -679,5 +699,401 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) { 1234); } +// Check to see if two of the same specifier does not get parsed twice in a row. +TEST_F(HdsTest, TestSameSpecifier) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create Message + message.reset(createSimpleMessage()); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_)).WillRepeatedly(Return(cluster_info_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + + // Try to change the specifier, but it is the same. + message.reset(createSimpleMessage()); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Check to see that HDS got two requests, but only used the specifier one time. + checkHdsCounters(2, 0, 0, 1); + + // Try to change the specifier, but use a new specifier this time. + message = createComplexSpecifier(1, 1, 2); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Check that both requests and updates increased, meaning we did an update. + checkHdsCounters(3, 0, 0, 2); +} + +// Test to see that if a cluster is added or removed, the ones that did not change are reused. +TEST_F(HdsTest, TestClusterChange) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create Message + message = createComplexSpecifier(2, 1, 1); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_)).WillRepeatedly(Return(cluster_info_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + // Process message + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + + // Get cluster shared pointers to make sure they are the same memory addresses, that we reused + // them. + auto original_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(original_clusters.size(), 2); + + // Add a third cluster to the specifier. The first two should reuse pointers. + message = createComplexSpecifier(3, 1, 1); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get the new clusters list from HDS. + auto new_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(new_clusters.size(), 3); + + // Make sure our first two clusters are at the same address in memory as before. + for (int i = 0; i < 2; i++) { + EXPECT_EQ(new_clusters[i], original_clusters[i]); + } + + message = createComplexSpecifier(3, 1, 1); + + // Remove the first element, change the order of the last two elements. + message->mutable_cluster_health_checks()->SwapElements(0, 2); + message->mutable_cluster_health_checks()->RemoveLast(); + // Sanity check. + ASSERT_EQ(message->cluster_health_checks_size(), 2); + + // Send this new specifier. + hds_delegate_->onReceiveMessage(std::move(message)); + + // Check to see that even if we changed the order, we get the expected pointers. + auto final_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(final_clusters.size(), 2); + + // Compare first cluster in the new list is the same as the last in the previous list, + // and that the second cluster in the new list is the same as the second in the previous. + for (int i = 0; i < 2; i++) { + EXPECT_EQ(final_clusters[i], new_clusters[2 - i]); + } + + // Check to see that HDS got three requests, and updated three times with it. + checkHdsCounters(3, 0, 0, 3); +} + +// Edit one of two cluster's endpoints by adding and removing. +TEST_F(HdsTest, TestUpdateEndpoints) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create Message, and later add/remove endpoints from the second cluster. + message.reset(createSimpleMessage()); + message->MergeFrom(*createComplexSpecifier(1, 1, 2)); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_)).WillRepeatedly(Return(cluster_info_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + // Process message + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + + // Save list of hosts/endpoints for comparison later. + auto original_hosts = hds_delegate_->hdsClusters()[1]->hosts(); + ASSERT_EQ(original_hosts.size(), 2); + + // Add 3 endpoints to the specifier's second cluster. The first in the list should reuse pointers. + message.reset(createSimpleMessage()); + message->MergeFrom(*createComplexSpecifier(1, 1, 5)); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get the new clusters list from HDS. + auto new_hosts = hds_delegate_->hdsClusters()[1]->hosts(); + ASSERT_EQ(new_hosts.size(), 5); + + // Make sure our first two endpoints are at the same address in memory as before. + for (int i = 0; i < 2; i++) { + EXPECT_EQ(original_hosts[i], new_hosts[i]); + } + EXPECT_TRUE(original_hosts[0] != new_hosts[2]); + + // This time, have 4 endpoints, 2 each under 2 localities. + // The first locality will be reused, so its 2 endpoints will be as well. + // The second locality is new so we should be getting 2 new endpoints. + // Since the first locality had 5 but now has 2, we are removing 3. + // 2 ADDED, 3 REMOVED, 2 REUSED. + message.reset(createSimpleMessage()); + message->MergeFrom(*createComplexSpecifier(1, 2, 2)); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get this new list of hosts. + auto final_hosts = hds_delegate_->hdsClusters()[1]->hosts(); + ASSERT_EQ(final_hosts.size(), 4); + + // Ensure the first two elements in the new list are reused. + for (int i = 0; i < 2; i++) { + EXPECT_EQ(new_hosts[i], final_hosts[i]); + } + + // Ensure the first last two elements in the new list are different then the previous list. + for (int i = 2; i < 4; i++) { + EXPECT_TRUE(new_hosts[i] != final_hosts[i]); + } + + // Check to see that HDS got three requests, and updated three times with it. + checkHdsCounters(3, 0, 0, 3); +} + +// Test adding, reusing, and removing health checks. +TEST_F(HdsTest, TestUpdateHealthCheckers) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create Message with two different health checkers. + message.reset(createSimpleMessage()); + auto new_hc = message->mutable_cluster_health_checks(0)->add_health_checks(); + new_hc->MergeFrom(message->mutable_cluster_health_checks(0)->health_checks(0)); + new_hc->mutable_http_health_check()->set_path("/different_path"); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_)).WillRepeatedly(Return(cluster_info_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + // Process message + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + + // Save list of health checkers for use later. + auto original_hcs = hds_delegate_->hdsClusters()[0]->healthCheckers(); + ASSERT_EQ(original_hcs.size(), 2); + + // Create a new specifier, but make the second health checker different and add a third. + // Then reverse the order so the first one is at the end, testing the hashing works as expected. + message.reset(createSimpleMessage()); + auto new_hc0 = message->mutable_cluster_health_checks(0)->add_health_checks(); + new_hc0->MergeFrom(message->mutable_cluster_health_checks(0)->health_checks(0)); + new_hc0->mutable_http_health_check()->set_path("/path0"); + auto new_hc1 = message->mutable_cluster_health_checks(0)->add_health_checks(); + new_hc1->MergeFrom(message->mutable_cluster_health_checks(0)->health_checks(0)); + new_hc1->mutable_http_health_check()->set_path("/path1"); + message->mutable_cluster_health_checks(0)->mutable_health_checks()->SwapElements(0, 2); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get the new health check list from HDS. + auto new_hcs = hds_delegate_->hdsClusters()[0]->healthCheckers(); + ASSERT_EQ(new_hcs.size(), 3); + + // Make sure our first hc from the original list is the same as the third in the new list. + EXPECT_EQ(original_hcs[0], new_hcs[2]); + EXPECT_TRUE(original_hcs[1] != new_hcs[1]); + + // Check to see that HDS got two requests, and updated two times with it. + checkHdsCounters(2, 0, 0, 2); +} + +// Test to see that if clusters with an empty name get used, there are two clusters. +// Also test to see that if two clusters with the same non-empty name are used, only have +// One cluster. +TEST_F(HdsTest, TestClusterSameName) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create Message + message = createComplexSpecifier(2, 1, 1); + // Set both clusters to have an empty name. + message->mutable_cluster_health_checks(0)->set_cluster_name(""); + message->mutable_cluster_health_checks(1)->set_cluster_name(""); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, false)); + EXPECT_CALL(test_factory_, createClusterInfo(_)).WillRepeatedly(Return(cluster_info_)); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + // Process message + hds_delegate_->onReceiveMessage(std::move(message)); + hds_delegate_->sendResponse(); + + // Get the clusters from HDS + auto original_clusters = hds_delegate_->hdsClusters(); + + // Make sure that even though they have the same name, since they are empty there are two and they + // do not point to the same thing. + ASSERT_EQ(original_clusters.size(), 2); + ASSERT_TRUE(original_clusters[0] != original_clusters[1]); + + // Create message with 3 clusters this time so we force an update. + message = createComplexSpecifier(3, 1, 1); + // Set both clusters to have empty names empty name. + message->mutable_cluster_health_checks(0)->set_cluster_name(""); + message->mutable_cluster_health_checks(1)->set_cluster_name(""); + + // Test that we still get requested number of clusters, even with repeated names on update since + // they are empty. + hds_delegate_->onReceiveMessage(std::move(message)); + auto new_clusters = hds_delegate_->hdsClusters(); + + // Check that since the names are empty, we do not reuse and just reconstruct. + ASSERT_EQ(new_clusters.size(), 3); + ASSERT_TRUE(original_clusters[0] != new_clusters[0]); + ASSERT_TRUE(original_clusters[1] != new_clusters[1]); + + // Create a new message. + message = createComplexSpecifier(2, 1, 1); + // Set both clusters to have the same, non-empty name. + message->mutable_cluster_health_checks(0)->set_cluster_name("anna"); + message->mutable_cluster_health_checks(1)->set_cluster_name("anna"); + + hds_delegate_->onReceiveMessage(std::move(message)); + + // Check that since they both have the same name, only one of them gets used. + auto final_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(final_clusters.size(), 1); + + // Check to see that HDS got three requests, and updated three times with it. + checkHdsCounters(3, 0, 0, 3); +} + +// Test that a transport_socket_matches and transport_socket_match_criteria filter fail when not +// matching, and then after an update the same cluster is used but now matches. +TEST_F(HdsTest, TestUpdateSocketContext) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessageRaw_(_, _)); + createHdsDelegate(); + + // Create a new active connection on request, setting its status to connected + // to mock a found endpoint. + expectCreateClientConnection(); + + // Pull out socket_matcher object normally internal to createClusterInfo, to test that a matcher + // would match the expected socket. + std::vector> socket_matchers; + EXPECT_CALL(test_factory_, createClusterInfo(_)) + .WillRepeatedly(Invoke([&](const ClusterInfoFactory::CreateClusterInfoParams& params) { + // Build scope, factory_context as does ProdClusterInfoFactory. + Envoy::Stats::ScopePtr scope = + params.stats_.createScope(fmt::format("cluster.{}.", params.cluster_.name())); + Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context( + params.admin_, params.ssl_context_manager_, *scope, params.cm_, params.local_info_, + params.dispatcher_, params.stats_, params.singleton_manager_, params.tls_, + params.validation_visitor_, params.api_); + + // Create a mock socket_factory for the scope of this unit test. + std::unique_ptr socket_factory = + std::make_unique(); + + // set socket_matcher object in test scope. + socket_matchers.push_back(std::make_unique( + params.cluster_.transport_socket_matches(), factory_context, socket_factory, *scope)); + + // But still use the fake cluster_info_. + return cluster_info_; + })); + EXPECT_CALL(dispatcher_, deferredDelete_(_)).Times(AtLeast(1)); + EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(AtLeast(1)); + + // Create Message, with a non-valid match and process. + message.reset(createSimpleMessage()); + addTransportSocketMatches(message->mutable_cluster_health_checks(0), "bad_match", "test_match"); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get our health checker to match against. + const auto first_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(first_clusters.size(), 1); + const auto first_hcs = first_clusters[0]->healthCheckers(); + ASSERT_EQ(first_hcs.size(), 1); + + // Check that our fails so it uses default. + HealthCheckerImplBase* first_health_checker_base = + dynamic_cast(first_hcs[0].get()); + const auto first_match = + socket_matchers[0]->resolve(first_health_checker_base->transportSocketMatchMetadata().get()); + EXPECT_EQ(first_match.name_, "default"); + + // Create a new Message, this time with a good match. + message.reset(createSimpleMessage()); + addTransportSocketMatches(message->mutable_cluster_health_checks(0), "test_match", "test_match"); + hds_delegate_->onReceiveMessage(std::move(message)); + + // Get our new health checker to match against. + const auto second_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(second_clusters.size(), 1); + // Check that this new pointer is actually the same pointer to the first cluster. + ASSERT_EQ(second_clusters[0], first_clusters[0]); + const auto second_hcs = second_clusters[0]->healthCheckers(); + ASSERT_EQ(second_hcs.size(), 1); + + // Check that since we made no change to our health checkers, the pointer was reused. + EXPECT_EQ(first_hcs[0], second_hcs[0]); + + // Check that our match hits. + HealthCheckerImplBase* second_health_checker_base = + dynamic_cast(second_hcs[0].get()); + ASSERT_EQ(socket_matchers.size(), 2); + const auto second_match = + socket_matchers[1]->resolve(second_health_checker_base->transportSocketMatchMetadata().get()); + EXPECT_EQ(second_match.name_, "test_socket"); + + // Create a new Message, this we leave the transport socket the same but change the health check's + // filter. This means that the health checker changes but the transport_socket_matches in the + // ClusterHealthCheck does not. + message.reset(createSimpleMessage()); + addTransportSocketMatches(message->mutable_cluster_health_checks(0), "test_match", + "something_new"); + + hds_delegate_->onReceiveMessage(std::move(message)); + // Get our new health checker to match against. + const auto third_clusters = hds_delegate_->hdsClusters(); + ASSERT_EQ(third_clusters.size(), 1); + // Check that this new pointer is actually the same pointer to the first cluster. + ASSERT_EQ(third_clusters[0], first_clusters[0]); + const auto third_hcs = third_clusters[0]->healthCheckers(); + ASSERT_EQ(third_hcs.size(), 1); + + // Check that since we made a change to our HC, it is a new pointer. + EXPECT_TRUE(first_hcs[0] != third_hcs[0]); + + HealthCheckerImplBase* third_health_checker_base = + dynamic_cast(third_hcs[0].get()); + + // Check that our socket matchers is still a size 2. This is because createClusterInfo(_) is never + // called again since there was no update to transportSocketMatches. + ASSERT_EQ(socket_matchers.size(), 2); + const auto third_match = + socket_matchers[1]->resolve(third_health_checker_base->transportSocketMatchMetadata().get()); + // Since this again does not match, it uses default. + EXPECT_EQ(third_match.name_, "default"); +} + } // namespace Upstream } // namespace Envoy diff --git a/test/integration/hds_integration_test.cc b/test/integration/hds_integration_test.cc index effe4a86bb1bc..9f49ed87f0b2c 100644 --- a/test/integration/hds_integration_test.cc +++ b/test/integration/hds_integration_test.cc @@ -185,6 +185,31 @@ class HdsIntegrationTest : public Grpc::VersionedGrpcClientIntegrationParamTest, return server_health_check_specifier_; } + envoy::service::health::v3::ClusterHealthCheck createSecondCluster(std::string name) { + // Add endpoint + envoy::service::health::v3::ClusterHealthCheck health_check; + + health_check.set_cluster_name(name); + Network::Utility::addressToProtobufAddress( + *host2_upstream_->localAddress(), + *health_check.add_locality_endpoints()->add_endpoints()->mutable_address()); + health_check.mutable_locality_endpoints(0)->mutable_locality()->set_region("kounopetra"); + health_check.mutable_locality_endpoints(0)->mutable_locality()->set_zone("emplisi"); + health_check.mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("paris"); + + health_check.add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); + health_check.mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); + health_check.mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); + health_check.mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); + health_check.mutable_health_checks(0)->mutable_grpc_health_check(); + health_check.mutable_health_checks(0) + ->mutable_http_health_check() + ->set_hidden_envoy_deprecated_use_http2(false); + health_check.mutable_health_checks(0)->mutable_http_health_check()->set_path("/healthcheck"); + + return health_check; + } + // Creates a basic HealthCheckSpecifier message containing one endpoint and // one TCP health_check envoy::service::health::v3::HealthCheckSpecifier makeTcpHealthCheckSpecifier() { @@ -694,26 +719,8 @@ TEST_P(HdsIntegrationTest, TwoEndpointsDifferentClusters) { server_health_check_specifier_ = makeHttpHealthCheckSpecifier(envoy::type::v3::CodecClientType::HTTP1, false); - // Add endpoint - auto* health_check = server_health_check_specifier_.add_cluster_health_checks(); - - health_check->set_cluster_name("cat"); - Network::Utility::addressToProtobufAddress( - *host2_upstream_->localAddress(), - *health_check->add_locality_endpoints()->add_endpoints()->mutable_address()); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_region("kounopetra"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_zone("emplisi"); - health_check->mutable_locality_endpoints(0)->mutable_locality()->set_sub_zone("paris"); - - health_check->add_health_checks()->mutable_timeout()->set_seconds(MaxTimeout); - health_check->mutable_health_checks(0)->mutable_interval()->set_seconds(MaxTimeout); - health_check->mutable_health_checks(0)->mutable_unhealthy_threshold()->set_value(2); - health_check->mutable_health_checks(0)->mutable_healthy_threshold()->set_value(2); - health_check->mutable_health_checks(0)->mutable_grpc_health_check(); - health_check->mutable_health_checks(0) - ->mutable_http_health_check() - ->set_hidden_envoy_deprecated_use_http2(false); - health_check->mutable_health_checks(0)->mutable_http_health_check()->set_path("/healthcheck"); + // Add Second Cluster + server_health_check_specifier_.add_cluster_health_checks()->MergeFrom(createSecondCluster("cat")); // Server <--> Envoy waitForHdsStream(); @@ -1040,5 +1047,124 @@ TEST_P(HdsIntegrationTest, SingleEndpointUnhealthyTlsMissingSocketMatch) { cleanupHdsConnection(); } +TEST_P(HdsIntegrationTest, UpdateEndpoints) { + initialize(); + server_health_check_specifier_ = + makeHttpHealthCheckSpecifier(envoy::type::v3::CodecClientType::HTTP1, false); + + // Add Second Cluster. + server_health_check_specifier_.add_cluster_health_checks()->MergeFrom(createSecondCluster("cat")); + + // Server <--> Envoy + waitForHdsStream(); + ASSERT_TRUE(hds_stream_->waitForGrpcMessage(*dispatcher_, envoy_msg_)); + + // Server asks for health checking + hds_stream_->startGrpcStream(); + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Envoy sends health check messages to two endpoints + healthcheckEndpoints("cat"); + + // Endpoint responds to the health check + host_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "404"}}, false); + host_stream_->encodeData(1024, true); + host2_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + host2_stream_->encodeData(1024, true); + + // Receive updates until the one we expect arrives + ASSERT_TRUE(waitForClusterHealthResponse(envoy::config::core::v3::HEALTHY, + host2_upstream_->localAddress(), 1, 0, 0)); + + ASSERT_EQ(response_.endpoint_health_response().cluster_endpoints_health_size(), 2); + + // store cluster response info for easier reference. + const auto& cluster_resp0 = response_.endpoint_health_response().cluster_endpoints_health(0); + const auto& cluster_resp1 = response_.endpoint_health_response().cluster_endpoints_health(1); + + // check cluster info and sizes. + EXPECT_EQ(cluster_resp0.cluster_name(), "anna"); + ASSERT_EQ(cluster_resp0.locality_endpoints_health_size(), 1); + EXPECT_EQ(cluster_resp1.cluster_name(), "cat"); + ASSERT_EQ(cluster_resp1.locality_endpoints_health_size(), 1); + + // store locality response info for easier reference. + const auto& locality_resp0 = cluster_resp0.locality_endpoints_health(0); + const auto& locality_resp1 = cluster_resp1.locality_endpoints_health(0); + + // check locality info and sizes. + EXPECT_EQ(locality_resp0.locality().sub_zone(), "hobbiton"); + ASSERT_EQ(locality_resp0.endpoints_health_size(), 1); + EXPECT_EQ(locality_resp1.locality().sub_zone(), "paris"); + ASSERT_EQ(locality_resp1.endpoints_health_size(), 1); + + // Check endpoints. + EXPECT_TRUE(checkEndpointHealthResponse(locality_resp0.endpoints_health(0), + envoy::config::core::v3::UNHEALTHY, + host_upstream_->localAddress())); + + checkCounters(1, 2, 0, 1); + EXPECT_EQ(1, test_server_->counter("cluster.cat.health_check.success")->value()); + EXPECT_EQ(0, test_server_->counter("cluster.cat.health_check.failure")->value()); + + // Create new specifier that removes the second cluster, and adds an endpoint to the first. + server_health_check_specifier_ = + makeHttpHealthCheckSpecifier(envoy::type::v3::CodecClientType::HTTP1, false); + Network::Utility::addressToProtobufAddress( + *host2_upstream_->localAddress(), + *server_health_check_specifier_.mutable_cluster_health_checks(0) + ->mutable_locality_endpoints(0) + ->add_endpoints() + ->mutable_address()); + + // Reset second endpoint for usage in our cluster. + ASSERT_TRUE(host2_fake_connection_->close()); + ASSERT_TRUE(host2_fake_connection_->waitForDisconnect()); + + // Send new specifier. + hds_stream_->sendGrpcMessage(server_health_check_specifier_); + // TODO: add stats reporting and verification for Clusters added/removed/reused and Endpoints + // added/removed/reused. + test_server_->waitForCounterGe("hds_delegate.requests", ++hds_requests_); + + // Set up second endpoint again. + ASSERT_TRUE(host2_upstream_->waitForHttpConnection(*dispatcher_, host2_fake_connection_)); + ASSERT_TRUE(host2_fake_connection_->waitForNewStream(*dispatcher_, host2_stream_)); + ASSERT_TRUE(host2_stream_->waitForEndStream(*dispatcher_)); + EXPECT_EQ(host2_stream_->headers().getPathValue(), "/healthcheck"); + EXPECT_EQ(host2_stream_->headers().getMethodValue(), "GET"); + EXPECT_EQ(host2_stream_->headers().getHostValue(), "anna"); + + // Endpoints respond to the health check + host2_stream_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); + host2_stream_->encodeData(1024, true); + + // Receive updates until the one we expect arrives + ASSERT_TRUE(waitForClusterHealthResponse(envoy::config::core::v3::HEALTHY, + host2_upstream_->localAddress(), 0, 0, 1)); + + // Ensure we have at least one cluster before trying to read it. + ASSERT_EQ(response_.endpoint_health_response().cluster_endpoints_health_size(), 1); + + // store cluster response info for easier reference. + const auto& cluster_response = response_.endpoint_health_response().cluster_endpoints_health(0); + + // Check cluster has correct name and number of localities (1) + EXPECT_EQ(cluster_response.cluster_name(), "anna"); + ASSERT_EQ(cluster_response.locality_endpoints_health_size(), 1); + + // check the only locality and its endpoints. + const auto& locality_response = cluster_response.locality_endpoints_health(0); + EXPECT_EQ(locality_response.locality().sub_zone(), "hobbiton"); + ASSERT_EQ(locality_response.endpoints_health_size(), 2); + EXPECT_TRUE(checkEndpointHealthResponse(locality_response.endpoints_health(0), + envoy::config::core::v3::UNHEALTHY, + host_upstream_->localAddress())); + + cleanupHostConnections(); + cleanupHdsConnection(); +} + } // namespace } // namespace Envoy