diff --git a/api/envoy/api/v2/rds.proto b/api/envoy/api/v2/rds.proto index 5dd58d62d9dc2..9601868fbbf2a 100644 --- a/api/envoy/api/v2/rds.proto +++ b/api/envoy/api/v2/rds.proto @@ -70,7 +70,8 @@ message RouteConfiguration { string name = 1; // An array of virtual hosts that make up the route table. - repeated route.VirtualHost virtual_hosts = 2 [(gogoproto.nullable) = false]; + repeated route.VirtualHost virtual_hosts = 2; // [(gogoproto.nullable) = false]; + Vhds vhds = 9; // An array of virtual hosts will be dynamically loaded via the VHDS API. // Both *virtual_hosts* and *vhds* fields will be used when present. *virtual_hosts* can be used diff --git a/include/envoy/config/subscription.h b/include/envoy/config/subscription.h index 2897e9798befc..14ab435122eec 100644 --- a/include/envoy/config/subscription.h +++ b/include/envoy/config/subscription.h @@ -9,6 +9,7 @@ #include "envoy/stats/stats_macros.h" #include "common/protobuf/protobuf.h" +#include "common/common/assert.h" namespace Envoy { namespace Config { @@ -81,6 +82,8 @@ class Subscription { * @param resources vector of resource names to fetch. */ virtual void updateResources(const std::vector& resources) PURE; + + virtual void updateResourcesViaAliases(const std::vector&) { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } }; /** diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index c24a07e928b17..5657dc15ee063 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -127,12 +127,14 @@ class StreamFilterCallbacks { */ virtual Router::RouteConstSharedPtr route() PURE; - /** - * Returns the clusterInfo for the cached route. - * This method is to avoid multiple look ups in the filter chain, it also provides a consistent - * view of clusterInfo after a route is picked/repicked. - * NOTE: Cached clusterInfo and route will be updated the same time. - */ + virtual bool requestRouteConfigUpdate(std::function cb) PURE; + + /** + * Returns the clusterInfo for the cached route. + * This method is to avoid multiple look ups in the filter chain, it also provides a consistent + * view of clusterInfo after a route is picked/repicked. + * NOTE: Cached clusterInfo and route will be updated the same time. + */ virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE; /** diff --git a/include/envoy/router/rds.h b/include/envoy/router/rds.h index 8ff43f213f4f5..d603ba585f730 100644 --- a/include/envoy/router/rds.h +++ b/include/envoy/router/rds.h @@ -43,6 +43,8 @@ class RouteConfigProvider { * @return the last time this RouteConfigProvider was updated. Used for config dumps. */ virtual SystemTime lastUpdated() const PURE; + + virtual bool requestConfigUpdate(const std::string for_domain, std::function cb) PURE; }; typedef std::unique_ptr RouteConfigProviderPtr; diff --git a/include/envoy/router/router.h b/include/envoy/router/router.h index e090c9caadc63..c257179a74e60 100644 --- a/include/envoy/router/router.h +++ b/include/envoy/router/router.h @@ -786,6 +786,8 @@ class Config { * @return const std::string the RouteConfiguration name. */ virtual const std::string& name() const PURE; + + virtual bool usesVhds() const PURE; }; typedef std::shared_ptr ConfigConstSharedPtr; diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 5cd583f357a31..c36534c391a9b 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -77,6 +77,8 @@ class DeltaSubscriptionImpl return; // The unpause will send this request. } + request_.set_type_url(type_url_); + request_.mutable_node()->MergeFrom(local_info_.node()); request_.clear_resource_names_subscribe(); request_.clear_resource_names_unsubscribe(); std::copy(diff.added_.begin(), diff.added_.end(), @@ -215,6 +217,14 @@ class DeltaSubscriptionImpl stats_.update_attempt_.inc(); } + void updateResourcesViaAliases(const std::vector& aliases) override { + ResourceNameDiff diff; + std::copy(aliases.begin(), aliases.end(), std::inserter(diff.added_, diff.added_.begin())); + queueDiscoveryRequest(diff); + //sendDiscoveryRequest(diff); + stats_.update_attempt_.inc(); + } + private: void disableInitFetchTimeoutTimer() { if (init_fetch_timeout_timer_) { diff --git a/source/common/config/resources.h b/source/common/config/resources.h index 69ed2d91a46dc..9f35373b511e3 100644 --- a/source/common/config/resources.h +++ b/source/common/config/resources.h @@ -17,6 +17,7 @@ class TypeUrlValues { const std::string ClusterLoadAssignment{"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"}; const std::string Secret{"type.googleapis.com/envoy.api.v2.auth.Secret"}; const std::string RouteConfiguration{"type.googleapis.com/envoy.api.v2.RouteConfiguration"}; + const std::string VirtualHost{"type.googleapis.com/envoy.api.v2.route.VirtualHost"}; }; typedef ConstSingleton TypeUrl; diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 35fffefc2c473..384c7dd43fc87 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -78,6 +78,8 @@ class AsyncStreamImpl : public AsyncClient::Stream, AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options); + bool requestRouteConfigUpdate(std::function) override { return false; } + // Http::AsyncClient::Stream void sendHeaders(HeaderMap& headers, bool end_stream) override; void sendData(Buffer::Instance& data, bool end_stream) override; @@ -172,6 +174,7 @@ class AsyncStreamImpl : public AsyncClient::Stream, } const std::string& name() const override { return EMPTY_STRING; } + bool usesVhds() const { return false; } static const std::list internal_only_headers_; }; @@ -370,6 +373,8 @@ class AsyncRequestImpl final : public AsyncClient::Request, AsyncRequestImpl(MessagePtr&& request, AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options); + bool requestRouteConfigUpdate(std::function) override { return false; } + // AsyncClient::Request virtual void cancel() override; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index 9c23eb39b6d1a..0fcac7a04911f 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -397,6 +397,7 @@ void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_re ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager) : connection_manager_(connection_manager), + route_config_provider_(connection_manager.config_.routeConfigProvider()), snapped_route_config_(connection_manager.config_.routeConfigProvider().config()), stream_id_(connection_manager.random_generator_.random()), request_response_timespan_(new Stats::Timespan( @@ -1063,7 +1064,7 @@ void ConnectionManagerImpl::startDrainSequence() { void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { Router::RouteConstSharedPtr route; if (request_headers_ != nullptr) { - route = snapped_route_config_->route(*request_headers_, stream_id_); + route = route_config_provider_.config()->route(*request_headers_, stream_id_); } stream_info_.route_entry_ = route ? route->routeEntry() : nullptr; cached_route_ = std::move(route); @@ -1076,6 +1077,12 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { } } +bool ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(std::function cb) { + // TODO check for an empty header? + auto host_header = Http::LowerCaseString(request_headers_->Host()->value().c_str()).get(); + return route_config_provider_.requestConfigUpdate(host_header, cb); +} + void ConnectionManagerImpl::ActiveStream::sendLocalReply( bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, bool is_head_request, @@ -1710,13 +1717,17 @@ Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBas } Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() { - if (!parent_.cached_route_.has_value()) { + if (!parent_.cached_route_.has_value() || parent_.cached_route_.value() == nullptr) { parent_.refreshCachedRoute(); } return parent_.cached_route_.value(); } +bool ConnectionManagerImpl::ActiveStreamFilterBase::requestRouteConfigUpdate(std::function cb) { + return parent_.requestRouteConfigUpdate(cb); +} + void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() { parent_.cached_route_ = absl::optional(); parent_.cached_cluster_info_ = absl::optional(); diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index 7b4f486144434..98222b216c8ca 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -121,6 +121,7 @@ class ConnectionManagerImpl : Logger::Loggable, Event::Dispatcher& dispatcher() override; void resetStream() override; Router::RouteConstSharedPtr route() override; + bool requestRouteConfigUpdate(std::function cb) override; Upstream::ClusterInfoConstSharedPtr clusterInfo() override; void clearRouteCache() override; uint64_t streamId() override; @@ -351,6 +352,7 @@ class ConnectionManagerImpl : Logger::Loggable, void traceRequest(); void refreshCachedRoute(); + bool requestRouteConfigUpdate(std::function cb); // Pass on watermark callbacks to watermark subscribers. This boils down to passing watermark // events for this stream and the downstream connection to the router filter. @@ -420,6 +422,7 @@ class ConnectionManagerImpl : Logger::Loggable, void onRequestTimeout(); ConnectionManagerImpl& connection_manager_; + Router::RouteConfigProvider& route_config_provider_; Router::ConfigConstSharedPtr snapped_route_config_; Tracing::SpanPtr active_span_; const uint64_t stream_id_; diff --git a/source/common/router/BUILD b/source/common/router/BUILD index 498babb17239e..b203e4257d3ce 100644 --- a/source/common/router/BUILD +++ b/source/common/router/BUILD @@ -160,6 +160,34 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "on_demand_update_lib", + srcs = ["on_demand_update.cc"], + hdrs = ["on_demand_update.h"], + deps = [ + "//include/envoy/event:dispatcher_interface", + "//include/envoy/event:timer_interface", + "//include/envoy/http:filter_interface", + "//include/envoy/server:filter_config_interface", + "//source/common/access_log:access_log_lib", + "//source/common/buffer:watermark_buffer_lib", + "//source/common/common:assert_lib", + "//source/common/common:empty_string", + "//source/common/common:enum_to_int", + "//source/common/common:hash_lib", + "//source/common/common:hex_lib", + "//source/common/common:minimal_logger_lib", + "//source/common/common:utility_lib", + "//source/common/grpc:common_lib", + "//source/common/http:codes_lib", + "//source/common/http:header_map_lib", + "//source/common/http:headers_lib", + "//source/common/http:message_lib", + "//source/common/http:utility_lib", + "@envoy_api//envoy/config/filter/http/router/v2:router_cc", + ], +) + envoy_cc_library( name = "router_ratelimit_lib", srcs = ["router_ratelimit.cc"], diff --git a/source/common/router/config_impl.cc b/source/common/router/config_impl.cc index 3cfdbc63f596c..83747a3b8aeda 100644 --- a/source/common/router/config_impl.cc +++ b/source/common/router/config_impl.cc @@ -1086,7 +1086,7 @@ VirtualHostImpl::virtualClusterFromEntries(const Http::HeaderMap& headers) const ConfigImpl::ConfigImpl(const envoy::api::v2::RouteConfiguration& config, Server::Configuration::FactoryContext& factory_context, bool validate_clusters_default) - : name_(config.name()) { + : name_(config.name()), uses_vhds_(config.has_vhds()) { route_matcher_ = std::make_unique( config, *this, factory_context, PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, validate_clusters, validate_clusters_default)); diff --git a/source/common/router/config_impl.h b/source/common/router/config_impl.h index c9fdeb40403e3..d98b193774e2e 100644 --- a/source/common/router/config_impl.h +++ b/source/common/router/config_impl.h @@ -768,12 +768,15 @@ class ConfigImpl : public Config { const std::string& name() const override { return name_; } + bool usesVhds() const override { return uses_vhds_; } + private: std::unique_ptr route_matcher_; std::list internal_only_headers_; HeaderParserPtr request_headers_parser_; HeaderParserPtr response_headers_parser_; const std::string name_; + const bool uses_vhds_; }; /** @@ -789,7 +792,7 @@ class NullConfigImpl : public Config { } const std::string& name() const override { return name_; } - + bool usesVhds() const override { return false; } private: std::list internal_only_headers_; const std::string name_; diff --git a/source/common/router/on_demand_update.cc b/source/common/router/on_demand_update.cc new file mode 100644 index 0000000000000..376e40c283323 --- /dev/null +++ b/source/common/router/on_demand_update.cc @@ -0,0 +1,49 @@ +#include "common/router/on_demand_update.h" + +#include "common/common/assert.h" +#include "common/common/enum_to_int.h" +#include "common/http/codes.h" + +#include "extensions/filters/http/well_known_names.h" + +namespace Envoy { +namespace Router { + +void OnDemandRouteUpdate::requestRouteConfigUpdate() { + if (callbacks_->route() != nullptr) { + filter_return_ = FilterReturn::ContinueDecoding; + } else { + auto configUpdateScheduled = callbacks_->requestRouteConfigUpdate([this]() -> void { onComplete(); }); + filter_return_ = configUpdateScheduled ? FilterReturn::StopDecoding : FilterReturn::ContinueDecoding; + } +} + +Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::HeaderMap&, bool) { + requestRouteConfigUpdate(); + return filter_return_ == FilterReturn::StopDecoding ? Http::FilterHeadersStatus::StopIteration + : Http::FilterHeadersStatus::Continue; +} + +Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool) { + return filter_return_ == FilterReturn::StopDecoding + ? Http::FilterDataStatus::StopIterationAndWatermark + : Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus OnDemandRouteUpdate::decodeTrailers(Http::HeaderMap&) { + return filter_return_ == FilterReturn::StopDecoding ? Http::FilterTrailersStatus::StopIteration + : Http::FilterTrailersStatus::Continue; +} + +void OnDemandRouteUpdate::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { + callbacks_ = &callbacks; +} + +void OnDemandRouteUpdate::onComplete() { + filter_return_ = FilterReturn::ContinueDecoding; + // We got completion async. Let the filter chain continue. + callbacks_->continueDecoding(); +} + +} // namespace Router +} // namespace Envoy diff --git a/source/common/router/on_demand_update.h b/source/common/router/on_demand_update.h new file mode 100644 index 0000000000000..291bcd5553d27 --- /dev/null +++ b/source/common/router/on_demand_update.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +#include + +#include "envoy/http/filter.h" +#include "envoy/local_info/local_info.h" +#include "envoy/runtime/runtime.h" +#include "envoy/stats/scope.h" +#include "envoy/upstream/cluster_manager.h" + +#include "common/common/assert.h" +#include "common/common/logger.h" +#include "common/common/matchers.h" +#include "common/http/header_map_impl.h" + +namespace Envoy { +namespace Router { + +class OnDemandRouteUpdate : public Logger::Loggable, + public Http::StreamDecoderFilter { +public: + OnDemandRouteUpdate() {} + + void requestRouteConfigUpdate(); + void onComplete(); + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override; + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; + void onDestroy() override {} + +private: + // State of this filter's communication with the external authorization service. + // The filter has either not started calling the external service, in the middle of calling + // it or has completed. + enum class State { NotStarted, Calling, Complete }; + + // FilterReturn is used to capture what the return code should be to the filter chain. + // if this filter is either in the middle of calling the service or the result is denied then + // the filter chain should stop. Otherwise the filter chain can continue to the next filter. + enum class FilterReturn { ContinueDecoding, StopDecoding }; + + Http::StreamDecoderFilterCallbacks* callbacks_{}; + State state_{State::NotStarted}; + FilterReturn filter_return_{FilterReturn::ContinueDecoding}; +}; + +} // namespace Router +} // namespace Envoy diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index 2aa821ad838ee..dbc9cb35b450b 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -53,20 +53,20 @@ StaticRouteConfigProviderImpl::~StaticRouteConfigProviderImpl() { } // TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_ -// initialization needs to be fixed. RdsRouteConfigSubscription::RdsRouteConfigSubscription( const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, const uint64_t manager_identifier, Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix, Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager) - : route_config_name_(rds.route_config_name()), + : factory_context_(factory_context), + route_config_name_(rds.route_config_name()), init_target_(fmt::format("RdsRouteConfigSubscription {}", route_config_name_), - [this]() { subscription_->start({route_config_name_}, *this); }), + [this]() { subscription_->start({route_config_name_}, *this); }), scope_(factory_context.scope().createScope(stat_prefix + "rds." + route_config_name_ + ".")), - stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_))}), + stat_prefix_(stat_prefix), stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_))}), route_config_provider_manager_(route_config_provider_manager), manager_identifier_(manager_identifier), time_source_(factory_context.timeSource()), - last_updated_(factory_context.timeSource().systemTime()) { + last_updated_(factory_context.timeSource().systemTime()), uses_vhds_(false) { Envoy::Config::Utility::checkLocalInfo("rds", factory_context.localInfo()); subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource( @@ -105,6 +105,7 @@ void RdsRouteConfigSubscription::onConfigUpdate( } auto route_config = MessageUtil::anyConvert(resources[0]); MessageUtil::validate(route_config); + // TODO: validate that either virtual_hosts or vhds is present. // TODO(PiotrSikora): Remove this hack once fixed internally. if (!(route_config.name() == route_config_name_)) { throw EnvoyException(fmt::format("Unexpected RDS configuration (expecting {}): {}", @@ -116,14 +117,22 @@ void RdsRouteConfigSubscription::onConfigUpdate( config_info_ = {new_hash, version_info}; route_config_proto_ = route_config; stats_.config_reload_.inc(); - ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_, - new_hash); - for (auto* provider : route_config_providers_) { - provider->onConfigUpdate(); + + uses_vhds_ = route_config_proto_.has_vhds(); + if (!uses_vhds_) { + ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_, new_hash); + for (auto* provider : route_config_providers_) { + provider->onConfigUpdate(); + } + vhds_subscription_.release(); + } else { + auto s = new VhdsSubscription(route_config_proto_, factory_context_, stat_prefix_, this); + s->registerInitTargetWithInitManager(factory_context_.initManager()); + vhds_subscription_.reset(s); } - } - init_target_.ready(); + init_target_.ready(); + } } void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) { @@ -132,21 +141,43 @@ void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) { init_target_.ready(); } +void RdsRouteConfigSubscription::ondemandUpdate(const std::vector& aliases) { + if (vhds_subscription_.get() == nullptr) return; + vhds_subscription_->ondemandUpdate(aliases); +} + +absl::optional RdsRouteConfigSubscription::configInfo() const { + return (uses_vhds_ ? vhds_subscription_->configInfo() : config_info_); +} + +envoy::api::v2::RouteConfiguration& RdsRouteConfigSubscription::routeConfiguration() { + return (uses_vhds_ ? vhds_subscription_->routeConfiguration() : route_config_proto_); +} + +SystemTime RdsRouteConfigSubscription::lastUpdated() const { + return (uses_vhds_ ? vhds_subscription_->lastUpdated() : last_updated_); +} + +// TODO: RdsRouteConfigSubscription should return correct config_info and route_config_proto_ when asked RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( RdsRouteConfigSubscriptionSharedPtr&& subscription, Server::Configuration::FactoryContext& factory_context) : subscription_(std::move(subscription)), factory_context_(factory_context), - tls_(factory_context.threadLocal().allocateSlot()) { + tls_(factory_context.threadLocal().allocateSlot()), + config_update_callbacks_(factory_context.threadLocal().allocateSlot()) { ConfigConstSharedPtr initial_config; - if (subscription_->config_info_.has_value()) { + if (subscription_->configInfo().has_value()) { initial_config = - std::make_shared(subscription_->route_config_proto_, factory_context_, false); + std::make_shared(subscription_->routeConfiguration(), factory_context_, false); } else { initial_config = std::make_shared(); } tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { return std::make_shared(initial_config); }); + config_update_callbacks_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(); + }); subscription_->route_config_providers_.insert(this); } @@ -159,21 +190,124 @@ Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { } absl::optional RdsRouteConfigProviderImpl::configInfo() const { - if (!subscription_->config_info_) { + if (!subscription_->configInfo()) { return {}; } else { - return ConfigInfo{subscription_->route_config_proto_, - subscription_->config_info_.value().last_config_version_}; + return ConfigInfo{subscription_->routeConfiguration(), + subscription_->configInfo().value().last_config_version_}; } } void RdsRouteConfigProviderImpl::onConfigUpdate() { ConfigConstSharedPtr new_config( - new ConfigImpl(subscription_->route_config_proto_, factory_context_, false)); + new ConfigImpl(subscription_->routeConfiguration(), factory_context_, false)); tls_->runOnAllThreads( - [this, new_config]() -> void { tls_->getTyped().config_ = new_config; }); + [this, new_config]() -> void { + tls_->getTyped().config_ = new_config; + auto callbacks = config_update_callbacks_->getTyped().callbacks_; + if (!callbacks.empty()) { + auto cb = callbacks.front(); + callbacks.pop(); + cb(); + } + }); } +bool RdsRouteConfigProviderImpl::requestConfigUpdate(const std::string for_domain, std::function cb) { + if (!config()->usesVhds()) { return false; } + // TODO check for an empty header? + factory_context_.dispatcher().post([this, for_domain]() -> void { + subscription_->ondemandUpdate({for_domain}); + }); + config_update_callbacks_->getTyped().callbacks_.push(cb); + + return true; +} + +// TODO validate GRPC type is DELTA_GRPC +VhdsSubscription::VhdsSubscription( + const envoy::api::v2::RouteConfiguration& route_configuration, + Server::Configuration::FactoryContext& factory_context, + const std::string& stat_prefix, RdsRouteConfigSubscription* rds_subscription) + : route_config_proto_(route_configuration), + route_config_name_(route_configuration.name()), + init_target_(fmt::format("VhdsConfigSubscription {}", route_config_name_), + [this]() { subscription_->start({}, *this); }), + scope_(factory_context.scope().createScope(stat_prefix + "vhds." + route_config_name_ + ".")), + stats_({ALL_VHDS_STATS(POOL_COUNTER(*scope_))}), time_source_(factory_context.timeSource()), + last_updated_(factory_context.timeSource().systemTime()), rds_subscription_(rds_subscription) { + Envoy::Config::Utility::checkLocalInfo("vhds", factory_context.localInfo()); + + subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource< + envoy::api::v2::route::VirtualHost>( + route_configuration.vhds().config_source(), factory_context.localInfo(), + factory_context.dispatcher(), factory_context.clusterManager(), factory_context.random(), *scope_, + "none", + "envoy.api.v2.VirtualHostDiscoveryService.DeltaVirtualHosts", factory_context.api()); +} + +void VhdsSubscription::ondemandUpdate(const std::vector& aliases) { + subscription_->updateResourcesViaAliases(aliases); +} + +void VhdsSubscription::onConfigUpdateFailed(const EnvoyException*) { + // We need to allow server startup to continue, even if we have a bad + // config. + init_target_.ready(); +} + +void VhdsSubscription::onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, + const Protobuf::RepeatedPtrField& removed_resources, + const std::string& version_info) { + last_updated_ = time_source_.systemTime(); + remove_vhosts(virtual_hosts_, removed_resources); + update_vhosts(virtual_hosts_, added_resources); + rebuild_route_config(virtual_hosts_, route_config_proto_); + + const uint64_t new_hash = MessageUtil::hash(route_config_proto_); + if (!config_info_ || new_hash != config_info_.value().last_config_hash_) { + config_info_ = {new_hash, version_info}; + stats_.config_reload_.inc(); + ENVOY_LOG(debug, "vhds: loading new configuration: config_name={} hash={}", route_config_name_, + new_hash); + for (auto* provider : rds_subscription_->route_config_providers()) { + provider->onConfigUpdate(); + } + } + + init_target_.ready(); +} + +void VhdsSubscription::remove_vhosts( + std::unordered_map& vhosts, + const Protobuf::RepeatedPtrField& removed_vhost_names) { + for(const auto vhost_name : removed_vhost_names) { + vhosts.erase(vhost_name); + } +} + +void VhdsSubscription::update_vhosts( + std::unordered_map& vhosts, + const Protobuf::RepeatedPtrField& added_resources) { + // TODO validate aaded_resources + for(const auto& resource : added_resources) { + envoy::api::v2::route::VirtualHost vhost = + MessageUtil::anyConvert(resource.resource()); + vhosts.emplace(vhost.name(), vhost); + } +} + +void VhdsSubscription::rebuild_route_config( + const std::unordered_map& vhosts, + envoy::api::v2::RouteConfiguration& route_config) { + + route_config.clear_virtual_hosts(); + for(const auto vhost : vhosts) { + route_config.mutable_virtual_hosts()->Add()->CopyFrom(vhost.second); + } +} + + RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) { config_tracker_entry_ = admin.getConfigTracker().add("routes", [this] { return dumpRouteConfigs(); }); diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index a435e564bc5cf..9e341f01b4d11 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "envoy/admin/v2alpha/config_dump.pb.h" #include "envoy/api/v2/rds.pb.h" @@ -62,6 +63,7 @@ class StaticRouteConfigProviderImpl : public RouteConfigProvider { return ConfigInfo{route_config_proto_, ""}; } SystemTime lastUpdated() const override { return last_updated_; } + bool requestConfigUpdate(const std::string, std::function) override { return false; } private: ConfigConstSharedPtr config_; @@ -87,7 +89,25 @@ struct RdsStats { ALL_RDS_STATS(GENERATE_COUNTER_STRUCT) }; +// clang-format off +#define ALL_VHDS_STATS(COUNTER) \ + COUNTER(config_reload) \ + COUNTER(update_empty) + +// clang-format on + +struct VhdsStats { + ALL_VHDS_STATS(GENERATE_COUNTER_STRUCT) +}; + +struct LastConfigInfo { + uint64_t last_config_hash_; + std::string last_config_version_; +}; + class RdsRouteConfigProviderImpl; +class VhdsSubscription; +typedef std::unique_ptr VhdsSubscriptionPtr; /** * A class that fetches the route configuration dynamically using the RDS API and updates them to @@ -110,13 +130,14 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, std::string resourceName(const ProtobufWkt::Any& resource) override { return MessageUtil::anyConvert(resource).name(); } + // TODO: add provider registration call + void ondemandUpdate(const std::vector& aliases); + std::unordered_set& route_config_providers() { return route_config_providers_; } + absl::optional configInfo() const; + envoy::api::v2::RouteConfiguration& routeConfiguration(); + SystemTime lastUpdated() const; private: - struct LastConfigInfo { - uint64_t last_config_hash_; - std::string last_config_version_; - }; - RdsRouteConfigSubscription( const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, const uint64_t manager_identifier, Server::Configuration::FactoryContext& factory_context, @@ -124,9 +145,11 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, RouteConfigProviderManagerImpl& route_config_provider_manager); std::unique_ptr subscription_; + Server::Configuration::FactoryContext& factory_context_; const std::string route_config_name_; Init::TargetImpl init_target_; Stats::ScopePtr scope_; + std::string stat_prefix_; RdsStats stats_; RouteConfigProviderManagerImpl& route_config_provider_manager_; const uint64_t manager_identifier_; @@ -135,6 +158,8 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, absl::optional config_info_; envoy::api::v2::RouteConfiguration route_config_proto_; std::unordered_set route_config_providers_; + VhdsSubscriptionPtr vhds_subscription_; + bool uses_vhds_; friend class RouteConfigProviderManagerImpl; friend class RdsRouteConfigProviderImpl; @@ -142,6 +167,62 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, typedef std::shared_ptr RdsRouteConfigSubscriptionSharedPtr; +class VhdsSubscription : Envoy::Config::SubscriptionCallbacks, + Logger::Loggable { +public: + VhdsSubscription( + const envoy::api::v2::RouteConfiguration& route_configuration, + Server::Configuration::FactoryContext& factory_context, + const std::string& stat_prefix, RdsRouteConfigSubscription* rds_subscription); + ~VhdsSubscription() { init_target_.ready(); } + + // Config::SubscriptionCallbacks + // TODO(fredlas) deduplicate + void onConfigUpdate(const ResourceVector&, const std::string&) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + void onConfigUpdate(const Protobuf::RepeatedPtrField&, + const Protobuf::RepeatedPtrField&, const std::string&) override; + void onConfigUpdateFailed(const EnvoyException* e) override; + std::string resourceName(const ProtobufWkt::Any& resource) override { + return MessageUtil::anyConvert(resource).name(); + } + + void registerInitTargetWithInitManager(Init::Manager& m) { m.add(init_target_); } + void ondemandUpdate(const std::vector& aliases); + void remove_vhosts(std::unordered_map& vhosts, + const Protobuf::RepeatedPtrField& removed_vhost_names); + void update_vhosts(std::unordered_map& vhosts, + const Protobuf::RepeatedPtrField& added_resources); + void rebuild_route_config( + const std::unordered_map& vhosts, + envoy::api::v2::RouteConfiguration& route_config); + absl::optional configInfo() const { return config_info_; } + envoy::api::v2::RouteConfiguration& routeConfiguration() { return route_config_proto_; } + SystemTime lastUpdated() const { return last_updated_; } + + std::unique_ptr> subscription_; + envoy::api::v2::RouteConfiguration route_config_proto_; + const std::string route_config_name_; + Init::TargetImpl init_target_; + Stats::ScopePtr scope_; + VhdsStats stats_; + TimeSource& time_source_; + SystemTime last_updated_; + RdsRouteConfigSubscription* rds_subscription_; + std::unordered_map virtual_hosts_; + absl::optional config_info_; +}; + +struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { + ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} + ConfigConstSharedPtr config_; +}; + +struct ThreadLocalCallbacks : public ThreadLocal::ThreadLocalObject { + std::queue> callbacks_; +}; + /** * Implementation of RouteConfigProvider that fetches the route configuration dynamically using * the subscription. @@ -151,27 +232,25 @@ class RdsRouteConfigProviderImpl : public RouteConfigProvider, public: ~RdsRouteConfigProviderImpl(); - RdsRouteConfigSubscription& subscription() { return *subscription_; } void onConfigUpdate(); // Router::RouteConfigProvider Router::ConfigConstSharedPtr config() override; absl::optional configInfo() const override; - SystemTime lastUpdated() const override { return subscription_->last_updated_; } + SystemTime lastUpdated() const override { return subscription_->lastUpdated(); } + bool requestConfigUpdate(const std::string for_domain, std::function cb) override; private: - struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { - ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} - - ConfigConstSharedPtr config_; - }; - RdsRouteConfigProviderImpl(RdsRouteConfigSubscriptionSharedPtr&& subscription, Server::Configuration::FactoryContext& factory_context); + void addConfigUpdateCallback(std::function cb); + RdsRouteConfigSubscriptionSharedPtr subscription_; Server::Configuration::FactoryContext& factory_context_; + SystemTime last_updated_; ThreadLocal::SlotPtr tls_; + ThreadLocal::SlotPtr config_update_callbacks_; friend class RouteConfigProviderManagerImpl; }; diff --git a/source/extensions/filters/http/router/BUILD b/source/extensions/filters/http/router/BUILD index ddffe3458ebd9..ed67f3827f57c 100644 --- a/source/extensions/filters/http/router/BUILD +++ b/source/extensions/filters/http/router/BUILD @@ -20,6 +20,7 @@ envoy_cc_library( "//source/common/config:filter_json_lib", "//source/common/json:config_schemas_lib", "//source/common/router:router_lib", + "//source/common/router:on_demand_update_lib", "//source/common/router:shadow_writer_lib", "//source/extensions/filters/http:well_known_names", "//source/extensions/filters/http/common:factory_base_lib", diff --git a/source/extensions/filters/http/router/config.cc b/source/extensions/filters/http/router/config.cc index 3c7b97a37d4dc..4a4863cfaa05f 100644 --- a/source/extensions/filters/http/router/config.cc +++ b/source/extensions/filters/http/router/config.cc @@ -6,6 +6,7 @@ #include "common/config/filter_json.h" #include "common/json/config_schemas.h" #include "common/router/router.h" +#include "common/router/on_demand_update.h" #include "common/router/shadow_writer_impl.h" namespace Envoy { @@ -21,6 +22,7 @@ Http::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoTyped( proto_config)); return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter(std::make_shared()); callbacks.addStreamDecoderFilter(std::make_shared(*filter_config)); }; } diff --git a/source/server/http/admin.h b/source/server/http/admin.h index 31d8511a9c579..be36c5473e6fe 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -153,6 +153,7 @@ class AdminImpl : public Admin, Router::ConfigConstSharedPtr config() override { return config_; } absl::optional configInfo() const override { return {}; } SystemTime lastUpdated() const override { return time_source_.systemTime(); } + bool requestConfigUpdate(const std::string, std::function) { return false; } Router::ConfigConstSharedPtr config_; TimeSource& time_source_; diff --git a/test/integration/BUILD b/test/integration/BUILD index f7103c90ee9b9..fbd512635fefa 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -109,6 +109,27 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "vhds_integration_test", + srcs = ["vhds_integration_test.cc"], + data = [ + "//test/config/integration/certs", + ], + deps = [ + ":http_integration_lib", + "//source/common/config:protobuf_link_hacks", + "//source/common/config:resources_lib", + "//source/common/protobuf:utility_lib", + "//test/common/grpc:grpc_client_integration_lib", + "//test/mocks/runtime:runtime_mocks", + "//test/mocks/server:server_mocks", + "//test/test_common:network_utility_lib", + "//test/test_common:utility_lib", + "@envoy_api//envoy/api/v2:rds_cc", + "@envoy_api//envoy/api/v2:discovery_cc", + ], +) + exports_files(["test_utility.sh"]) envoy_sh_test( diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index d2d6e4d33fc78..eaef14da5de08 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -345,7 +345,8 @@ void HttpIntegrationTest::testRouterRequestAndResponseWithBody( IntegrationStreamDecoderPtr HttpIntegrationTest::makeHeaderOnlyRequest(ConnectionCreationFunction* create_connection, - int upstream_index, const std::string& path) { + int upstream_index, const std::string& path, + const std::string& authority) { // This is called multiple times per test in ads_integration_test. Only call // initialize() the first time. if (!initialized()) { @@ -356,15 +357,16 @@ HttpIntegrationTest::makeHeaderOnlyRequest(ConnectionCreationFunction* create_co Http::TestHeaderMapImpl request_headers{{":method", "GET"}, {":path", path}, {":scheme", "http"}, - {":authority", "host"}, + {":authority", authority}, {"x-lyft-user-id", "123"}}; return sendRequestAndWaitForResponse(request_headers, 0, default_response_headers_, 0, upstream_index); } void HttpIntegrationTest::testRouterHeaderOnlyRequestAndResponse( - ConnectionCreationFunction* create_connection, int upstream_index, const std::string& path) { - auto response = makeHeaderOnlyRequest(create_connection, upstream_index, path); + ConnectionCreationFunction* create_connection, int upstream_index, const std::string& path, + const std::string& authority) { + auto response = makeHeaderOnlyRequest(create_connection, upstream_index, path, authority); checkSimpleRequestSuccess(0U, 0U, response.get()); } diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 3fb328ac2be56..f8051e9d67b5e 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -140,7 +140,8 @@ class HttpIntegrationTest : public BaseIntegrationTest { // Sends a simple header-only HTTP request, and waits for a response. IntegrationStreamDecoderPtr makeHeaderOnlyRequest(ConnectionCreationFunction* create_connection, int upstream_index, - const std::string& path = "/test/long/url"); + const std::string& path = "/test/long/url", + const std::string& authority = "host"); void testRouterNotFound(); void testRouterNotFoundWithBody(); @@ -149,7 +150,8 @@ class HttpIntegrationTest : public BaseIntegrationTest { ConnectionCreationFunction* creator = nullptr); void testRouterHeaderOnlyRequestAndResponse(ConnectionCreationFunction* creator = nullptr, int upstream_index = 0, - const std::string& path = "/test/long/url"); + const std::string& path = "/test/long/url", + const std::string& authority = "host"); void testRequestAndResponseShutdownWithActiveConnection(); // Disconnect tests diff --git a/test/integration/integration.cc b/test/integration/integration.cc index e38b97c209eea..f1f592fe8f043 100644 --- a/test/integration/integration.cc +++ b/test/integration/integration.cc @@ -530,9 +530,10 @@ AssertionResult BaseIntegrationTest::compareDeltaDiscoveryRequest( const std::string& expected_type_url, const std::vector& expected_resource_subscriptions, const std::vector& expected_resource_unsubscriptions, + FakeStreamPtr& xds_stream, const Protobuf::int32 expected_error_code, const std::string& expected_error_message) { envoy::api::v2::DeltaDiscoveryRequest request; - VERIFY_ASSERTION(xds_stream_->waitForGrpcMessage(*dispatcher_, request)); + VERIFY_ASSERTION(xds_stream->waitForGrpcMessage(*dispatcher_, request)); EXPECT_TRUE(request.has_node()); EXPECT_FALSE(request.node().id().empty()); diff --git a/test/integration/integration.h b/test/integration/integration.h index cb7e8c937b7f6..5fbfd5f414abe 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -216,16 +216,33 @@ class BaseIntegrationTest : Logger::Loggable { xds_stream_->sendGrpcMessage(discovery_response); } + AssertionResult compareDeltaDiscoveryRequest( + const std::string& expected_type_url, + const std::vector& expected_resource_subscriptions, + const std::vector& expected_resource_unsubscriptions, + const Protobuf::int32 expected_error_code = Grpc::Status::GrpcStatus::Ok, + const std::string& expected_error_message = "") { + return compareDeltaDiscoveryRequest(expected_type_url, expected_resource_subscriptions, + expected_resource_unsubscriptions, xds_stream_, expected_error_code, expected_error_message); + } + AssertionResult compareDeltaDiscoveryRequest( const std::string& expected_type_url, const std::vector& expected_resource_subscriptions, const std::vector& expected_resource_unsubscriptions, + FakeStreamPtr& stream, const Protobuf::int32 expected_error_code = Grpc::Status::GrpcStatus::Ok, const std::string& expected_error_message = ""); template void sendDeltaDiscoveryResponse(const std::vector& added_or_updated, const std::vector& removed, const std::string& version) { + sendDeltaDiscoveryResponse(added_or_updated, removed, version, xds_stream_); + } + template + void sendDeltaDiscoveryResponse(const std::vector& added_or_updated, + const std::vector& removed, + const std::string& version, FakeStreamPtr& stream) { envoy::api::v2::DeltaDiscoveryResponse response; response.set_system_version_info("system_version_info_this_is_a_test"); for (const auto& message : added_or_updated) { @@ -236,7 +253,7 @@ class BaseIntegrationTest : Logger::Loggable { } *response.mutable_removed_resources() = {removed.begin(), removed.end()}; response.set_nonce("noncense"); - xds_stream_->sendGrpcMessage(response); + stream->sendGrpcMessage(response); } private: diff --git a/test/integration/vhds_integration_test.cc b/test/integration/vhds_integration_test.cc new file mode 100644 index 0000000000000..8cd18bae9af35 --- /dev/null +++ b/test/integration/vhds_integration_test.cc @@ -0,0 +1,252 @@ +#include "envoy/api/v2/rds.pb.h" +#include "envoy/api/v2/discovery.pb.h" +#include "envoy/grpc/status.h" +#include "envoy/stats/scope.h" + +#include "common/config/protobuf_link_hacks.h" +#include "common/config/resources.h" +#include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" + +#include "test/common/grpc/grpc_client_integration.h" +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/mocks/server/mocks.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/simulated_time_system.h" +#include "test/test_common/utility.h" + +#include "absl/synchronization/notification.h" +#include "gtest/gtest.h" + +using testing::AssertionFailure; +using testing::AssertionResult; +using testing::AssertionSuccess; +using testing::IsSubstring; + +namespace Envoy { +namespace { + +// TODO(fredlas) Move to test/config/utility.cc once there are other xDS tests that use gRPC. +const char Config[] = R"EOF( +admin: + access_log_path: /dev/null + address: + socket_address: + address: 127.0.0.1 + port_value: 0 +static_resources: + clusters: + - name: xds_cluster + type: STATIC + http2_protocol_options: {} + hosts: + socket_address: + address: 127.0.0.1 + port_value: 0 + - name: my_service + type: STATIC + http2_protocol_options: {} + hosts: + socket_address: + address: 127.0.0.1 + port_value: 0 + listeners: + - name: http + address: + socket_address: + address: 127.0.0.1 + port_value: 0 + filter_chains: + - filters: + - name: envoy.http_connection_manager + config: + stat_prefix: config_test + http_filters: + - name: envoy.router + codec_type: HTTP2 + rds: + route_config_name: my_route + config_source: + api_config_source: + api_type: GRPC + grpc_services: + envoy_grpc: + cluster_name: xds_cluster +)EOF"; + +const envoy::api::v2::RouteConfiguration RdsConfig = TestUtility::parseYaml(R"EOF( +name: my_route +vhds: + route_config_name: my_route + config_source: + api_config_source: + api_type: DELTA_GRPC + grpc_services: + envoy_grpc: + cluster_name: xds_cluster +)EOF"); + +const int UpstreamIndex = 2; + +class VhdsIntegrationTest : public HttpIntegrationTest, public Grpc::GrpcClientIntegrationParamTest { +public: + VhdsIntegrationTest() + : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, ipVersion(), realTime(), Config) {} + + void TearDown() override { + cleanUpXdsConnection(); + test_server_.reset(); + fake_upstreams_.clear(); + } + + std::string virtualHostYaml(std::string name, std::string domain) { + return fmt::format(R"EOF( + name: {} + domains: [{}] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: "my_service" }} + )EOF", name, domain); + } + + envoy::api::v2::route::VirtualHost buildVirtualHost() { + return TestUtility::parseYaml(virtualHostYaml("vhost_0", "host")); + } + + std::vector buildVirtualHost1() { + return {TestUtility::parseYaml(virtualHostYaml("vhost_1", "vhost.first")), + TestUtility::parseYaml(virtualHostYaml("vhost_2", "vhost.second"))}; + } + + envoy::api::v2::route::VirtualHost buildVirtualHost2() { + return TestUtility::parseYaml(virtualHostYaml("vhost_1", "vhost.first")); + } + + // Overridden to insert this stuff into the initialize() at the very beginning of + // HttpIntegrationTest::testRouterRequestAndResponseWithBody(). + void initialize() override { + // Controls how many fake_upstreams_.emplace_back(new FakeUpstream) will happen in + // BaseIntegrationTest::createUpstreams() (which is part of initialize()). + // Make sure this number matches the size of the 'clusters' repeated field in the bootstrap + // config that you use! + setUpstreamCount(2); // the CDS cluster + setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); // CDS uses gRPC uses HTTP2. + + // BaseIntegrationTest::initialize() does many things: + // 1) It appends to fake_upstreams_ as many as you asked for via setUpstreamCount(). + // 2) It updates your bootstrap config with the ports your fake upstreams are actually listening + // on (since you're supposed to leave them as 0). + // 3) It creates and starts an IntegrationTestServer - the thing that wraps the almost-actual + // Envoy used in the tests. + // 4) Bringing up the server usually entails waiting to ensure that any listeners specified in + // the bootstrap config have come up, and registering them in a port map (see lookupPort()). + // However, this test needs to defer all of that to later. + defer_listener_finalization_ = true; + HttpIntegrationTest::initialize(); + + // Create the regular (i.e. not an xDS server) upstream. We create it manually here after + // initialize() because finalize() expects all fake_upstreams_ to correspond to a static + // cluster in the bootstrap config - which we don't want since we're testing dynamic CDS! + +/* + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_, + timeSystem(), enable_half_close_)); + fake_upstreams_[UpstreamIndex]->set_allow_unexpected_disconnects(false); +*/ + + // Now that the upstream has been created, process Envoy's request to discover it. + // (First, we have to let Envoy establish its connection to the CDS server.) + AssertionResult result = // xds_connection_ is filled with the new FakeHttpConnection. + fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, xds_connection_); + RELEASE_ASSERT(result, result.message()); + result = xds_connection_->waitForNewStream(*dispatcher_, xds_stream_); + RELEASE_ASSERT(result, result.message()); + xds_stream_->startGrpcStream(); + fake_upstreams_[0]->set_allow_unexpected_disconnects(true); + + EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "", {"my_route"})); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, {RdsConfig}, "1"); + + result = xds_connection_->waitForNewStream(*dispatcher_, vhds_stream_, true); + RELEASE_ASSERT(result, result.message()); + vhds_stream_->startGrpcStream(); + + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + sendDeltaDiscoveryResponse({buildVirtualHost()}, {}, "1", vhds_stream_); + + //EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {})); + + // We can continue the test once we're sure that Envoy's ClusterManager has made use of + // the DiscoveryResponse describing cluster_0 that we sent. + // 2 because the statically specified CDS server itself counts as a cluster. + // FIXME use a correct gauge + // test_server_->waitForGaugeGe("listener_manager.listener_create_success", 1); + + // Wait for our statically specified listener to become ready, and register its port in the + // test framework's downstream listener port map. + test_server_->waitUntilListenersReady(); + registerTestServerPorts({"http"}); + } + + FakeStreamPtr vhds_stream_; +}; + +INSTANTIATE_TEST_CASE_P(IpVersionsClientType, VhdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(VhdsIntegrationTest, RdsVirtualHostAddUpdateRemove) { + //fake_upstreams_[1]->set_allow_unexpected_disconnects(true); + // Calls our initialize(), which includes establishing a listener, route, and cluster. + testRouterHeaderOnlyRequestAndResponse(nullptr, 1); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + sendDeltaDiscoveryResponse(buildVirtualHost1(), {}, "2", vhds_stream_); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/one", "vhost.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/two", "vhost.second"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + sendDeltaDiscoveryResponse({}, {"vhost_1", "vhost_2"}, "3", vhds_stream_); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {}, {}, vhds_stream_)); + + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/one"}, + {":scheme", "http"}, + {":authority", "vhost.first"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, {"vhost.first"}, {}, vhds_stream_)); + sendDeltaDiscoveryResponse({buildVirtualHost2()}, {}, "4", vhds_stream_); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_STREQ("200", response->headers().Status()->value().c_str()); + + /* + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/one"}, + {":scheme", "http"}, + {":authority", "vhost.first"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + response->waitForHeaders(); + EXPECT_STREQ("404", response->headers().Status()->value().c_str()); +*/ + + cleanupUpstreamAndDownstream(); +} + +} // namespace +} // namespace Envoy