diff --git a/DEPRECATED.md b/DEPRECATED.md index ad116f00af851..c9b1e285bd56b 100644 --- a/DEPRECATED.md +++ b/DEPRECATED.md @@ -10,6 +10,11 @@ The following features have been DEPRECATED and will be removed in the specified * DOWNSTREAM_ADDRESS log formatter is deprecated. Use DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT instead. * CLIENT_IP header formatter is deprecated. Use DOWNSTREAM_REMOTE_ADDRESS_WITHOUT_PORT instead. +* Rate limit service configuration via the `cluster_name` field is deprecated. Use `grpc_service` + instead. +* gRPC service configuration via the `cluster_names` field in `ApiConfigSource` is deprecated. Use + `grpc_services` instead. + ## Version 1.5.0 diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 079f6bbbb7ecf..ad82244b56703 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -138,6 +138,7 @@ def _envoy_api_deps(): "cds", "discovery", "eds", + "grpc_service", "health_check", "lds", "metrics", diff --git a/include/envoy/grpc/BUILD b/include/envoy/grpc/BUILD index ba7ad2b13a62f..477ec0049e918 100644 --- a/include/envoy/grpc/BUILD +++ b/include/envoy/grpc/BUILD @@ -20,6 +20,16 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "async_client_manager_interface", + hdrs = ["async_client_manager.h"], + external_deps = ["envoy_grpc_service"], + deps = [ + ":async_client_interface", + "//include/envoy/stats:stats_interface", + ], +) + envoy_cc_library( name = "status", hdrs = ["status.h"], diff --git a/include/envoy/grpc/async_client_manager.h b/include/envoy/grpc/async_client_manager.h new file mode 100644 index 0000000000000..8291a3cbbf53e --- /dev/null +++ b/include/envoy/grpc/async_client_manager.h @@ -0,0 +1,47 @@ +#pragma once + +#include "envoy/grpc/async_client.h" +#include "envoy/stats/stats.h" + +#include "api/grpc_service.pb.h" + +namespace Envoy { +namespace Grpc { + +// Per-service factory for Grpc::AsyncClients. This factory is thread aware and will instantiate +// with thread local state. Clients will use ThreadLocal::Instance::dispatcher() for event handling. +class AsyncClientFactory { +public: + virtual ~AsyncClientFactory() {} + + /** + * Create a gRPC::AsyncClient. + * @return AsyncClientPtr async client. + */ + virtual AsyncClientPtr create() PURE; +}; + +typedef std::unique_ptr AsyncClientFactoryPtr; + +// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service +// Grpc::AsyncClientFactory instances. +class AsyncClientManager { +public: + virtual ~AsyncClientManager() {} + + /** + * Create a Grpc::AsyncClients factory for a service. Validation of the service is performed and + * will raise an exception on failure. + * @param grpc_service envoy::api::v2::GrpcService configuration. + * @param scope stats scope. + * @return AsyncClientFactoryPtr factory for grpc_service. + * @throws EnvoyException when grpc_service validation fails. + */ + virtual AsyncClientFactoryPtr + factoryForGrpcService(const envoy::api::v2::GrpcService& grpc_service, Stats::Scope& scope) PURE; +}; + +typedef std::unique_ptr AsyncClientManagerPtr; + +} // namespace Grpc +} // namespace Envoy diff --git a/include/envoy/upstream/BUILD b/include/envoy/upstream/BUILD index 332562f4efb99..b54670667038f 100644 --- a/include/envoy/upstream/BUILD +++ b/include/envoy/upstream/BUILD @@ -21,6 +21,7 @@ envoy_cc_library( ":upstream_interface", "//include/envoy/access_log:access_log_interface", "//include/envoy/config:grpc_mux_interface", + "//include/envoy/grpc:async_client_manager_interface", "//include/envoy/http:async_client_interface", "//include/envoy/http:conn_pool_interface", "//include/envoy/local_info:local_info_interface", diff --git a/include/envoy/upstream/cluster_manager.h b/include/envoy/upstream/cluster_manager.h index 1599e36a5c6f8..8b73151201864 100644 --- a/include/envoy/upstream/cluster_manager.h +++ b/include/envoy/upstream/cluster_manager.h @@ -8,6 +8,7 @@ #include "envoy/access_log/access_log.h" #include "envoy/config/grpc_mux.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/http/async_client.h" #include "envoy/http/conn_pool.h" #include "envoy/local_info/local_info.h" @@ -128,6 +129,11 @@ class ClusterManager { */ virtual Config::GrpcMux& adsMux() PURE; + /** + * @return Grpc::AsyncClientManager& the cluster manager's gRPC client manager. + */ + virtual Grpc::AsyncClientManager& grpcAsyncClientManager() PURE; + /** * Return the current version info string for dynamic clusters, if CDS is setup. * diff --git a/source/common/access_log/BUILD b/source/common/access_log/BUILD index 763dbe0f96e2d..d116f7a441224 100644 --- a/source/common/access_log/BUILD +++ b/source/common/access_log/BUILD @@ -61,6 +61,7 @@ envoy_cc_library( deps = [ "//include/envoy/access_log:access_log_interface", "//include/envoy/grpc:async_client_interface", + "//include/envoy/grpc:async_client_manager_interface", "//include/envoy/singleton:instance_interface", "//include/envoy/thread_local:thread_local_interface", "//include/envoy/upstream:cluster_manager_interface", diff --git a/source/common/access_log/grpc_access_log_impl.cc b/source/common/access_log/grpc_access_log_impl.cc index cbc383850c620..b747efaf5e766 100644 --- a/source/common/access_log/grpc_access_log_impl.cc +++ b/source/common/access_log/grpc_access_log_impl.cc @@ -6,11 +6,10 @@ namespace Envoy { namespace AccessLog { -GrpcAccessLogStreamerImpl::GrpcAccessLogStreamerImpl(GrpcAccessLogClientFactoryPtr&& factory, +GrpcAccessLogStreamerImpl::GrpcAccessLogStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info) : tls_slot_(tls.allocateSlot()) { - SharedStateSharedPtr shared_state = std::make_shared(std::move(factory), local_info); tls_slot_->set([shared_state](Event::Dispatcher&) { return ThreadLocal::ThreadLocalObjectSharedPtr{new ThreadLocalStreamer(shared_state)}; diff --git a/source/common/access_log/grpc_access_log_impl.h b/source/common/access_log/grpc_access_log_impl.h index e74f2667cdfe7..3d9d5ceac9469 100644 --- a/source/common/access_log/grpc_access_log_impl.h +++ b/source/common/access_log/grpc_access_log_impl.h @@ -4,6 +4,7 @@ #include "envoy/access_log/access_log.h" #include "envoy/grpc/async_client.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/local_info/local_info.h" #include "envoy/singleton/instance.h" #include "envoy/thread_local/thread_local.h" @@ -15,21 +16,6 @@ namespace AccessLog { // TODO(mattklein123): Stats -/** - * Factory for creating a gRPC access log streaming client. - */ -class GrpcAccessLogClientFactory { -public: - virtual ~GrpcAccessLogClientFactory() {} - - /** - * @return GrpcAccessLogClientPtr a new client. - */ - virtual Grpc::AsyncClientPtr create() PURE; -}; - -typedef std::unique_ptr GrpcAccessLogClientFactoryPtr; - /** * Interface for an access log streamer. The streamer deals with threading and sends access logs * on the correct stream. @@ -55,8 +41,7 @@ typedef std::shared_ptr GrpcAccessLogStreamerSharedPtr; */ class GrpcAccessLogStreamerImpl : public Singleton::Instance, public GrpcAccessLogStreamer { public: - GrpcAccessLogStreamerImpl(GrpcAccessLogClientFactoryPtr&& factory, - ThreadLocal::SlotAllocator& tls, + GrpcAccessLogStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info); // GrpcAccessLogStreamer @@ -71,10 +56,10 @@ class GrpcAccessLogStreamerImpl : public Singleton::Instance, public GrpcAccessL * slot to be destroyed while the streamers hold onto the shared state. */ struct SharedState { - SharedState(GrpcAccessLogClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info) + SharedState(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info) : factory_(std::move(factory)), local_info_(local_info) {} - GrpcAccessLogClientFactoryPtr factory_; + Grpc::AsyncClientFactoryPtr factory_; const LocalInfo::LocalInfo& local_info_; }; diff --git a/source/common/config/BUILD b/source/common/config/BUILD index 9094cd22ae025..0173cd4620849 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -128,9 +128,9 @@ envoy_cc_library( ":utility_lib", "//include/envoy/config:grpc_mux_interface", "//include/envoy/config:subscription_interface", + "//include/envoy/grpc:async_client_interface", "//include/envoy/upstream:cluster_manager_interface", "//source/common/common:logger_lib", - "//source/common/grpc:async_client_lib", "//source/common/protobuf", ], ) @@ -158,6 +158,7 @@ envoy_cc_library( ":grpc_mux_subscription_lib", "//include/envoy/config:subscription_interface", "//include/envoy/event:dispatcher_interface", + "//include/envoy/grpc:async_client_interface", ], ) diff --git a/source/common/config/grpc_mux_impl.cc b/source/common/config/grpc_mux_impl.cc index f7f164dc9000e..9549d67cf8edf 100644 --- a/source/common/config/grpc_mux_impl.cc +++ b/source/common/config/grpc_mux_impl.cc @@ -15,14 +15,6 @@ GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); } -GrpcMuxImpl::GrpcMuxImpl(const envoy::api::v2::Node& node, - Upstream::ClusterManager& cluster_manager, - const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method) - : GrpcMuxImpl(node, - std::make_unique(cluster_manager, remote_cluster_name), - dispatcher, service_method) {} - GrpcMuxImpl::~GrpcMuxImpl() { for (const auto& api_state : api_state_) { for (auto watch : api_state.second.watches_) { diff --git a/source/common/config/grpc_mux_impl.h b/source/common/config/grpc_mux_impl.h index 7f8433de453c7..d4f57982d8dcd 100644 --- a/source/common/config/grpc_mux_impl.h +++ b/source/common/config/grpc_mux_impl.h @@ -5,10 +5,10 @@ #include "envoy/config/grpc_mux.h" #include "envoy/config/subscription.h" #include "envoy/event/dispatcher.h" +#include "envoy/grpc/async_client.h" #include "envoy/upstream/cluster_manager.h" #include "common/common/logger.h" -#include "common/grpc/async_client_impl.h" #include "api/discovery.pb.h" @@ -22,9 +22,6 @@ class GrpcMuxImpl : public GrpcMux, Grpc::TypedAsyncStreamCallbacks, Logger::Loggable { public: - GrpcMuxImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cluster_manager, - const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method); GrpcMuxImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method); ~GrpcMuxImpl(); diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index 73666d271ed3a..13ee5cc5e6ff2 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -2,6 +2,7 @@ #include "envoy/config/subscription.h" #include "envoy/event/dispatcher.h" +#include "envoy/grpc/async_client.h" #include "common/config/grpc_mux_impl.h" #include "common/config/grpc_mux_subscription_impl.h" @@ -14,12 +15,6 @@ namespace Config { template class GrpcSubscriptionImpl : public Config::Subscription { public: - GrpcSubscriptionImpl(const envoy::api::v2::Node& node, Upstream::ClusterManager& cm, - const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, - const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats) - : GrpcSubscriptionImpl(node, std::make_unique(cm, remote_cluster_name), - dispatcher, service_method, stats) {} - GrpcSubscriptionImpl(const envoy::api::v2::Node& node, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats) diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index d38bd551eb2af..7b7d2d05cd916 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -64,11 +64,16 @@ class SubscriptionFactory { Utility::apiConfigSourceRefreshDelay(api_config_source), *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats)); break; - case envoy::api::v2::ApiConfigSource::GRPC: + case envoy::api::v2::ApiConfigSource::GRPC: { result.reset(new GrpcSubscriptionImpl( - node, cm, cluster_name, dispatcher, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats)); + node, + Config::Utility::factoryForApiConfigSource(cm.grpcAsyncClientManager(), + config.api_config_source(), scope) + ->create(), + dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), + stats)); break; + } default: NOT_REACHED; } diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index 220da05cb87b3..ea7c6e417f645 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -198,5 +198,37 @@ void Utility::checkObjNameLength(const std::string& error_prefix, const std::str } } +Grpc::AsyncClientFactoryPtr +Utility::factoryForApiConfigSource(Grpc::AsyncClientManager& async_client_manager, + const envoy::api::v2::ApiConfigSource& api_config_source, + Stats::Scope& scope) { + ASSERT(api_config_source.api_type() == envoy::api::v2::ApiConfigSource::GRPC); + envoy::api::v2::GrpcService grpc_service; + if (api_config_source.cluster_names().empty()) { + if (api_config_source.grpc_services().empty()) { + throw EnvoyException( + fmt::format("Missing gRPC services in envoy::api::v2::ApiConfigSource: {}", + api_config_source.DebugString())); + } + // TODO(htuch): Implement multiple gRPC services. + if (api_config_source.grpc_services().size() != 1) { + throw EnvoyException(fmt::format( + "Only singleton gRPC service lists supported in envoy::api::v2::ApiConfigSource: {}", + api_config_source.DebugString())); + } + grpc_service.MergeFrom(api_config_source.grpc_services(0)); + } else { + // TODO(htuch): cluster_names is deprecated, remove after 1.6.0. + if (api_config_source.cluster_names().size() != 1) { + throw EnvoyException(fmt::format( + "Only singleton cluster name lists supported in envoy::api::v2::ApiConfigSource: {}", + api_config_source.DebugString())); + } + grpc_service.mutable_envoy_grpc()->set_cluster_name(api_config_source.cluster_names(0)); + } + + return async_client_manager.factoryForGrpcService(grpc_service, scope); +} + } // namespace Config } // namespace Envoy diff --git a/source/common/config/utility.h b/source/common/config/utility.h index c8aa124811dd9..72d6489c8d84d 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -245,6 +245,17 @@ class Utility { * @param name supplies the name to check for length limits. */ static void checkObjNameLength(const std::string& error_prefix, const std::string& name); + + /** + * Obtain gRPC async client factory from a envoy::api::v2::ApiConfigSource. + * @param async_client_manager gRPC async client manager. + * @param api_config_source envoy::api::v2::ApiConfigSource. Must have config type GRPC. + * @return Grpc::AsyncClientFactoryPtr gRPC async client factory. + */ + static Grpc::AsyncClientFactoryPtr + factoryForApiConfigSource(Grpc::AsyncClientManager& async_client_manager, + const envoy::api::v2::ApiConfigSource& api_config_source, + Stats::Scope& scope); }; } // namespace Config diff --git a/source/common/grpc/BUILD b/source/common/grpc/BUILD index ecc677613897e..bb0ccae947856 100644 --- a/source/common/grpc/BUILD +++ b/source/common/grpc/BUILD @@ -21,6 +21,19 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "async_client_manager_lib", + srcs = ["async_client_manager_impl.cc"], + hdrs = ["async_client_manager_impl.h"], + deps = [ + ":async_client_lib", + "//include/envoy/grpc:async_client_manager_interface", + "//include/envoy/singleton:manager_interface", + "//include/envoy/thread_local:thread_local_interface", + "//include/envoy/upstream:cluster_manager_interface", + ], +) + envoy_cc_library( name = "codec_lib", srcs = ["codec.cc"], diff --git a/source/common/grpc/async_client_manager_impl.cc b/source/common/grpc/async_client_manager_impl.cc new file mode 100644 index 0000000000000..408ff43bb3530 --- /dev/null +++ b/source/common/grpc/async_client_manager_impl.cc @@ -0,0 +1,44 @@ +#include "common/grpc/async_client_manager_impl.h" + +#include "common/grpc/async_client_impl.h" + +namespace Envoy { +namespace Grpc { + +AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm, + const std::string& cluster_name) + : cm_(cm), cluster_name_(cluster_name) { + auto clusters = cm_.clusters(); + const auto& it = clusters.find(cluster_name_); + if (it == clusters.end()) { + throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name_)); + } + if (it->second.get().info()->addedViaApi()) { + throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster_name_)); + } +} + +AsyncClientPtr AsyncClientFactoryImpl::create() { + return std::make_unique(cm_, cluster_name_); +} + +AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm, + ThreadLocal::Instance& /*tls*/) + : cm_(cm) {} + +AsyncClientFactoryPtr +AsyncClientManagerImpl::factoryForGrpcService(const envoy::api::v2::GrpcService& grpc_service, + Stats::Scope& /*scope*/) { + switch (grpc_service.target_specifier_case()) { + case envoy::api::v2::GrpcService::kEnvoyGrpc: + return std::make_unique(cm_, grpc_service.envoy_grpc().cluster_name()); + case envoy::api::v2::GrpcService::kGoogleGrpc: + throw EnvoyException("Google C++ gRPC client is not implemented yet"); + default: + NOT_REACHED; + } + return nullptr; +} + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/grpc/async_client_manager_impl.h b/source/common/grpc/async_client_manager_impl.h new file mode 100644 index 0000000000000..db2b685be6f2b --- /dev/null +++ b/source/common/grpc/async_client_manager_impl.h @@ -0,0 +1,35 @@ +#pragma once + +#include "envoy/grpc/async_client_manager.h" +#include "envoy/singleton/manager.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/upstream/cluster_manager.h" + +namespace Envoy { +namespace Grpc { + +class AsyncClientFactoryImpl : public AsyncClientFactory { +public: + AsyncClientFactoryImpl(Upstream::ClusterManager& cm, const std::string& cluster_name); + + AsyncClientPtr create() override; + +private: + Upstream::ClusterManager& cm_; + const std::string cluster_name_; +}; + +class AsyncClientManagerImpl : public AsyncClientManager { +public: + AsyncClientManagerImpl(Upstream::ClusterManager& cm, ThreadLocal::Instance& tls); + + // Grpc::AsyncClientManager + AsyncClientFactoryPtr factoryForGrpcService(const envoy::api::v2::GrpcService& grpc_service, + Stats::Scope& scope) override; + +private: + Upstream::ClusterManager& cm_; +}; + +} // namespace Grpc +} // namespace Envoy diff --git a/source/common/ratelimit/BUILD b/source/common/ratelimit/BUILD index 785959be8a28e..fcc70e9fd3bf3 100644 --- a/source/common/ratelimit/BUILD +++ b/source/common/ratelimit/BUILD @@ -17,10 +17,10 @@ envoy_cc_library( deps = [ ":ratelimit_proto", "//include/envoy/grpc:async_client_interface", + "//include/envoy/grpc:async_client_manager_interface", "//include/envoy/ratelimit:ratelimit_interface", "//include/envoy/upstream:cluster_manager_interface", "//source/common/common:assert_lib", - "//source/common/grpc:async_client_lib", "//source/common/http:headers_lib", "//source/common/tracing:http_tracer_lib", ], diff --git a/source/common/ratelimit/ratelimit_impl.cc b/source/common/ratelimit/ratelimit_impl.cc index 07a9d005be4bf..70070d373143b 100644 --- a/source/common/ratelimit/ratelimit_impl.cc +++ b/source/common/ratelimit/ratelimit_impl.cc @@ -6,7 +6,6 @@ #include #include "common/common/assert.h" -#include "common/grpc/async_client_impl.h" #include "common/http/headers.h" #include "fmt/format.h" @@ -77,16 +76,19 @@ void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::strin } GrpcFactoryImpl::GrpcFactoryImpl(const envoy::api::v2::RateLimitServiceConfig& config, - Upstream::ClusterManager& cm) - : cluster_name_(config.cluster_name()), cm_(cm) { - if (!cm_.get(cluster_name_)) { - throw EnvoyException(fmt::format("unknown rate limit service cluster '{}'", cluster_name_)); + Grpc::AsyncClientManager& async_client_manager, + Stats::Scope& scope) { + envoy::api::v2::GrpcService grpc_service; + grpc_service.MergeFrom(config.grpc_service()); + // TODO(htuch): cluster_name is deprecated, remove after 1.6.0. + if (config.service_specifier_case() == envoy::api::v2::RateLimitServiceConfig::kClusterName) { + grpc_service.mutable_envoy_grpc()->set_cluster_name(config.cluster_name()); } + async_client_factory_ = async_client_manager.factoryForGrpcService(grpc_service, scope); } ClientPtr GrpcFactoryImpl::create(const Optional& timeout) { - return std::make_unique( - std::make_unique(cm_, cluster_name_), timeout); + return std::make_unique(async_client_factory_->create(), timeout); } } // namespace RateLimit diff --git a/source/common/ratelimit/ratelimit_impl.h b/source/common/ratelimit/ratelimit_impl.h index 9c145c2d01f38..97a6a36d37123 100644 --- a/source/common/ratelimit/ratelimit_impl.h +++ b/source/common/ratelimit/ratelimit_impl.h @@ -6,6 +6,7 @@ #include #include "envoy/grpc/async_client.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/ratelimit/ratelimit.h" #include "envoy/tracing/http_tracer.h" #include "envoy/upstream/cluster_manager.h" @@ -65,14 +66,13 @@ class GrpcClientImpl : public Client, public RateLimitAsyncCallbacks { class GrpcFactoryImpl : public ClientFactory { public: GrpcFactoryImpl(const envoy::api::v2::RateLimitServiceConfig& config, - Upstream::ClusterManager& cm); + Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope); // RateLimit::ClientFactory ClientPtr create(const Optional& timeout) override; private: - const std::string cluster_name_; - Upstream::ClusterManager& cm_; + Grpc::AsyncClientFactoryPtr async_client_factory_; }; class NullClientImpl : public Client { diff --git a/source/common/stats/grpc_metrics_service_impl.cc b/source/common/stats/grpc_metrics_service_impl.cc index 719421316d591..e8f0bc86f5447 100644 --- a/source/common/stats/grpc_metrics_service_impl.cc +++ b/source/common/stats/grpc_metrics_service_impl.cc @@ -18,7 +18,7 @@ namespace Envoy { namespace Stats { namespace Metrics { -GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl(GrpcMetricsServiceClientFactoryPtr&& factory, +GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info) : tls_slot_(tls.allocateSlot()) { diff --git a/source/common/stats/grpc_metrics_service_impl.h b/source/common/stats/grpc_metrics_service_impl.h index 5c950ae5f55d4..4cf823030933e 100644 --- a/source/common/stats/grpc_metrics_service_impl.h +++ b/source/common/stats/grpc_metrics_service_impl.h @@ -23,21 +23,6 @@ namespace Metrics { // TODO : Move the common code to a base class so that Accesslog and Metrics Service can reuse. -/** - * Factory for creating a gRPC metrics service streaming client. - */ -class GrpcMetricsServiceClientFactory { -public: - virtual ~GrpcMetricsServiceClientFactory() {} - - /** - * @return GrpcMetricsServiceClientPtr a new client. - */ - virtual Grpc::AsyncClientPtr create() PURE; -}; - -typedef std::unique_ptr GrpcMetricsServiceClientFactoryPtr; - /** * Interface for metrics streamer. The streamer deals with threading and sends * metrics on the correct stream. @@ -61,8 +46,8 @@ typedef std::shared_ptr GrpcMetricsStreamerSharedPtr; */ class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsStreamer { public: - GrpcMetricsStreamerImpl(GrpcMetricsServiceClientFactoryPtr&& factory, - ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info); + GrpcMetricsStreamerImpl(Grpc::AsyncClientFactoryPtr&& factory, ThreadLocal::SlotAllocator& tls, + const LocalInfo::LocalInfo& local_info); // GrpcMetricsStreamer void send(envoy::api::v2::StreamMetricsMessage& message) override { @@ -75,11 +60,10 @@ class GrpcMetricsStreamerImpl : public Singleton::Instance, public GrpcMetricsSt * main streamer/TLS slot to be destroyed while the streamers hold onto the shared state. */ struct SharedState { - SharedState(GrpcMetricsServiceClientFactoryPtr&& factory, - const LocalInfo::LocalInfo& local_info) + SharedState(Grpc::AsyncClientFactoryPtr&& factory, const LocalInfo::LocalInfo& local_info) : factory_(std::move(factory)), local_info_(local_info) {} - GrpcMetricsServiceClientFactoryPtr factory_; + Grpc::AsyncClientFactoryPtr factory_; const LocalInfo::LocalInfo& local_info_; }; diff --git a/source/common/upstream/BUILD b/source/common/upstream/BUILD index ed3adf030e53d..2b9921d3d7518 100644 --- a/source/common/upstream/BUILD +++ b/source/common/upstream/BUILD @@ -74,6 +74,7 @@ envoy_cc_library( "//source/common/config:grpc_mux_lib", "//source/common/config:utility_lib", "//source/common/filesystem:filesystem_lib", + "//source/common/grpc:async_client_manager_lib", "//source/common/http:async_client_lib", "//source/common/http/http1:conn_pool_lib", "//source/common/http/http2:conn_pool_lib", diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index f9eedcfeeafa9..f0d1eb337dd20 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -16,6 +16,7 @@ #include "common/config/cds_json.h" #include "common/config/utility.h" #include "common/filesystem/filesystem_impl.h" +#include "common/grpc/async_client_manager_impl.h" #include "common/http/async_client_impl.h" #include "common/http/http1/conn_pool.h" #include "common/http/http2/conn_pool.h" @@ -164,7 +165,7 @@ void ClusterManagerInitHelper::setInitializedCb(std::function callback) ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstrap, ClusterManagerFactory& factory, Stats::Store& stats, - ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime, + ThreadLocal::Instance& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager, @@ -172,22 +173,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra : factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()), random_(random), local_info_(local_info), cm_stats_(generateStats(stats)), init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }) { - const auto& ads_config = bootstrap.dynamic_resources().ads_config(); - if (ads_config.cluster_names().empty()) { - ENVOY_LOG(debug, "No ADS clusters defined, ADS will not be initialized."); - ads_mux_.reset(new Config::NullGrpcMuxImpl()); - } else { - if (ads_config.cluster_names().size() != 1) { - // TODO(htuch): Add support for multiple clusters, #1170. - throw EnvoyException( - "envoy::api::v2::ApiConfigSource must have a singleton cluster name specified"); - } - ads_mux_.reset(new Config::GrpcMuxImpl( - bootstrap.node(), *this, ads_config.cluster_names()[0], primary_dispatcher, - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources"))); - } - + async_client_manager_ = std::make_unique(*this, tls); const auto& cm_config = bootstrap.cluster_manager(); if (cm_config.has_outlier_detection()) { const std::string event_log_file_path = cm_config.outlier_detection().event_log_path(); @@ -251,9 +237,6 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra "Missing config source specifier in envoy::api::v2::ConfigSource for SDS config"); } } - if (!ads_config.cluster_names().empty()) { - Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(loaded_clusters, ads_config); - } Optional local_cluster_name; if (!cm_config.local_cluster_name().empty()) { @@ -272,6 +255,21 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra return std::make_shared(*this, dispatcher, local_cluster_name); }); + // Now setup ADS if needed, this might rely on a primary cluster and the + // thread local cluster manager. + if (bootstrap.dynamic_resources().has_ads_config()) { + ads_mux_.reset(new Config::GrpcMuxImpl( + bootstrap.node(), + Config::Utility::factoryForApiConfigSource( + *async_client_manager_, bootstrap.dynamic_resources().ads_config(), stats) + ->create(), + primary_dispatcher, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources"))); + } else { + ads_mux_.reset(new Config::NullGrpcMuxImpl()); + } + // We can now potentially create the CDS API once the backing cluster exists. if (bootstrap.dynamic_resources().has_cds_config()) { cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this); @@ -295,13 +293,11 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra if (cm_config.has_load_stats_config()) { const auto& load_stats_config = cm_config.load_stats_config(); - if (load_stats_config.cluster_names().size() != 1) { - // TODO(htuch): Add support for multiple clusters, #1170. - throw EnvoyException( - "envoy::api::v2::ApiConfigSource must have a singleton cluster name specified"); - } load_stats_reporter_.reset(new LoadStatsReporter( - bootstrap.node(), *this, stats, load_stats_config.cluster_names()[0], primary_dispatcher)); + bootstrap.node(), *this, stats, + Config::Utility::factoryForApiConfigSource(*async_client_manager_, load_stats_config, stats) + ->create(), + primary_dispatcher)); } } diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 03642f8b5e8ce..4f95a4a5dd045 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -147,7 +147,7 @@ struct ClusterManagerStats { class ClusterManagerImpl : public ClusterManager, Logger::Loggable { public: ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstrap, ClusterManagerFactory& factory, - Stats::Store& stats, ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime, + Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager, Event::Dispatcher& primary_dispatcher); @@ -184,6 +184,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable(cluster_manager, remote_cluster_name), - dispatcher) {} - void LoadStatsReporter::setRetryTimer() { retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); } diff --git a/source/common/upstream/load_stats_reporter.h b/source/common/upstream/load_stats_reporter.h index cdc56ead72059..4630bc68d6c15 100644 --- a/source/common/upstream/load_stats_reporter.h +++ b/source/common/upstream/load_stats_reporter.h @@ -35,9 +35,6 @@ class LoadStatsReporter : Grpc::TypedAsyncStreamCallbacks(cluster_manager_, cluster_name_); - }; - - Upstream::ClusterManager& cluster_manager_; - const std::string cluster_name_; -}; - // Singleton registration via macro defined in envoy/singleton/manager.h SINGLETON_MANAGER_REGISTRATION(grpc_access_log_streamer); @@ -37,21 +22,13 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance( const Protobuf::Message& config, AccessLog::FilterPtr&& filter, FactoryContext& context) { const auto& proto_config = MessageUtil::downcastAndValidate< const envoy::api::v2::filter::accesslog::HttpGrpcAccessLogConfig&>(config); - - // TODO(htuch): Support Google gRPC client. - const auto cluster_name = proto_config.common_config().grpc_service().envoy_grpc().cluster_name(); - auto cluster = context.clusterManager().get(cluster_name); - if (cluster == nullptr || cluster->info()->addedViaApi()) { - throw EnvoyException(fmt::format( - "invalid access log cluster '{}'. Missing or not a static cluster.", cluster_name)); - } - std::shared_ptr grpc_access_log_streamer = context.singletonManager().getTyped( - SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_log_streamer), [&context, cluster_name] { + SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_log_streamer), + [&context, grpc_service = proto_config.common_config().grpc_service() ] { return std::make_shared( - std::make_unique(context.clusterManager(), - cluster_name), + context.clusterManager().grpcAsyncClientManager().factoryForGrpcService( + grpc_service, context.scope()), context.threadLocal(), context.localInfo()); }); diff --git a/source/server/config/stats/metrics_service.cc b/source/server/config/stats/metrics_service.cc index 1402b8e72b508..a9208568c10ed 100644 --- a/source/server/config/stats/metrics_service.cc +++ b/source/server/config/stats/metrics_service.cc @@ -16,32 +16,17 @@ namespace Envoy { namespace Server { namespace Configuration { -class GrpcMetricsServiceClientFactoryImpl : public Stats::Metrics::GrpcMetricsServiceClientFactory { -public: - GrpcMetricsServiceClientFactoryImpl(Upstream::ClusterManager& cluster_manager, - const std::string& cluster_name) - : cluster_manager_(cluster_manager), cluster_name_(cluster_name) {} - - // Metrics::GrpcMetricsServiceClientPtr - Grpc::AsyncClientPtr create() override { - return std::make_unique(cluster_manager_, cluster_name_); - }; - - Upstream::ClusterManager& cluster_manager_; - const std::string cluster_name_; -}; - Stats::SinkPtr MetricsServiceSinkFactory::createStatsSink(const Protobuf::Message& config, Server::Instance& server) { const auto& sink_config = MessageUtil::downcastAndValidate(config); - const std::string cluster_name = sink_config.grpc_service().envoy_grpc().cluster_name(); - ENVOY_LOG(debug, "Metrics Service cluster name: {}", cluster_name); + const auto& grpc_service = sink_config.grpc_service(); + ENVOY_LOG(debug, "Metrics Service gRPC service configuration: {}", grpc_service.DebugString()); std::shared_ptr grpc_metrics_streamer = std::make_shared( - std::make_unique(server.clusterManager(), - cluster_name), + server.clusterManager().grpcAsyncClientManager().factoryForGrpcService(grpc_service, + server.stats()), server.threadLocal(), server.localInfo()); return Stats::SinkPtr( diff --git a/source/server/configuration_impl.cc b/source/server/configuration_impl.cc index 24ea15eaac401..933d845b64bb8 100644 --- a/source/server/configuration_impl.cc +++ b/source/server/configuration_impl.cc @@ -72,7 +72,8 @@ void MainImpl::initialize(const envoy::api::v2::Bootstrap& bootstrap, Instance& if (bootstrap.has_rate_limit_service()) { ratelimit_client_factory_.reset( - new RateLimit::GrpcFactoryImpl(bootstrap.rate_limit_service(), *cluster_manager_)); + new RateLimit::GrpcFactoryImpl(bootstrap.rate_limit_service(), + cluster_manager_->grpcAsyncClientManager(), server.stats())); } else { ratelimit_client_factory_.reset(new RateLimit::NullFactoryImpl()); } diff --git a/test/common/access_log/grpc_access_log_impl_test.cc b/test/common/access_log/grpc_access_log_impl_test.cc index 072e9a35070c9..d06b739158e74 100644 --- a/test/common/access_log/grpc_access_log_impl_test.cc +++ b/test/common/access_log/grpc_access_log_impl_test.cc @@ -19,20 +19,21 @@ namespace AccessLog { class GrpcAccessLogStreamerImplTest : public testing::Test { public: - struct TestGrpcAccessLogClientFactory : public GrpcAccessLogClientFactory { - // AccessLog::GrpcAccessLogClientFactory - Grpc::AsyncClientPtr create() { return Grpc::AsyncClientPtr{async_client_}; } - - Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient()}; - }; - typedef Grpc::MockAsyncStream MockAccessLogStream; typedef Grpc::TypedAsyncStreamCallbacks< envoy::api::v2::filter::accesslog::StreamAccessLogsResponse> AccessLogCallbacks; + GrpcAccessLogStreamerImplTest() { + EXPECT_CALL(*factory_, create()).WillOnce(Invoke([this] { + return Grpc::AsyncClientPtr{async_client_}; + })); + streamer_ = std::make_unique(Grpc::AsyncClientFactoryPtr{factory_}, + tls_, local_info_); + } + void expectStreamStart(MockAccessLogStream& stream, AccessLogCallbacks** callbacks_to_set) { - EXPECT_CALL(*factory_->async_client_, start(_, _)) + EXPECT_CALL(*async_client_, start(_, _)) .WillOnce(Invoke([&stream, callbacks_to_set](const Protobuf::MethodDescriptor&, Grpc::AsyncStreamCallbacks& callbacks) { *callbacks_to_set = dynamic_cast(&callbacks); @@ -42,8 +43,9 @@ class GrpcAccessLogStreamerImplTest : public testing::Test { NiceMock tls_; LocalInfo::MockLocalInfo local_info_; - TestGrpcAccessLogClientFactory* factory_{new TestGrpcAccessLogClientFactory}; - GrpcAccessLogStreamerImpl streamer_{GrpcAccessLogClientFactoryPtr{factory_}, tls_, local_info_}; + Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient}; + Grpc::MockAsyncClientFactory* factory_{new Grpc::MockAsyncClientFactory}; + std::unique_ptr streamer_; }; // Test basic stream logging flow. @@ -57,11 +59,11 @@ TEST_F(GrpcAccessLogStreamerImplTest, BasicFlow) { EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream1, sendMessage(_, false)); envoy::api::v2::filter::accesslog::StreamAccessLogsMessage message_log1; - streamer_.send(message_log1, "log1"); + streamer_->send(message_log1, "log1"); message_log1.Clear(); EXPECT_CALL(stream1, sendMessage(_, false)); - streamer_.send(message_log1, "log1"); + streamer_->send(message_log1, "log1"); // Start a stream for the second log. MockAccessLogStream stream2; @@ -70,7 +72,7 @@ TEST_F(GrpcAccessLogStreamerImplTest, BasicFlow) { EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream2, sendMessage(_, false)); envoy::api::v2::filter::accesslog::StreamAccessLogsMessage message_log2; - streamer_.send(message_log2, "log2"); + streamer_->send(message_log2, "log2"); // Verify that sending an empty response message doesn't do anything bad. callbacks1->onReceiveMessage( @@ -81,14 +83,14 @@ TEST_F(GrpcAccessLogStreamerImplTest, BasicFlow) { expectStreamStart(stream2, &callbacks2); EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream2, sendMessage(_, false)); - streamer_.send(message_log2, "log2"); + streamer_->send(message_log2, "log2"); } // Test that stream failure is handled correctly. TEST_F(GrpcAccessLogStreamerImplTest, StreamFailure) { InSequence s; - EXPECT_CALL(*factory_->async_client_, start(_, _)) + EXPECT_CALL(*async_client_, start(_, _)) .WillOnce( Invoke([](const Protobuf::MethodDescriptor&, Grpc::AsyncStreamCallbacks& callbacks) { callbacks.onRemoteClose(Grpc::Status::Internal, "bad"); @@ -96,7 +98,7 @@ TEST_F(GrpcAccessLogStreamerImplTest, StreamFailure) { })); EXPECT_CALL(local_info_, node()); envoy::api::v2::filter::accesslog::StreamAccessLogsMessage message_log1; - streamer_.send(message_log1, "log1"); + streamer_->send(message_log1, "log1"); } class MockGrpcAccessLogStreamer : public GrpcAccessLogStreamer { diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 830ebada06f2f..75e9115bde61c 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -162,7 +162,9 @@ envoy_cc_test( "//source/common/config:utility_lib", "//source/common/config:well_known_names", "//source/common/stats:stats_lib", + "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", + "//test/mocks/stats:stats_mocks", "//test/mocks/upstream:upstream_mocks", "//test/test_common:environment_lib", "//test/test_common:utility_lib", diff --git a/test/common/config/subscription_factory_test.cc b/test/common/config/subscription_factory_test.cc index 4183f6c1b35bd..a97ae2436d48e 100644 --- a/test/common/config/subscription_factory_test.cc +++ b/test/common/config/subscription_factory_test.cc @@ -142,24 +142,25 @@ TEST_F(SubscriptionFactoryTest, GrpcSubscription) { auto* api_config_source = config.mutable_api_config_source(); api_config_source->set_api_type(envoy::api::v2::ApiConfigSource::GRPC); api_config_source->add_cluster_names("eds_cluster"); + envoy::api::v2::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("eds_cluster"); Upstream::ClusterManager::ClusterInfoMap cluster_map; Upstream::MockCluster cluster; cluster_map.emplace("eds_cluster", cluster); EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); EXPECT_CALL(cluster, info()).Times(2); EXPECT_CALL(*cluster.info_, addedViaApi()); + EXPECT_CALL(cm_, grpcAsyncClientManager()).WillOnce(ReturnRef(cm_.async_client_manager_)); + EXPECT_CALL(cm_.async_client_manager_, factoryForGrpcService(ProtoEq(expected_grpc_service), _)) + .WillOnce(Invoke([](const envoy::api::v2::GrpcService&, Stats::Scope&) { + auto async_client_factory = std::make_unique(); + EXPECT_CALL(*async_client_factory, create()).WillOnce(Invoke([] { + return std::make_unique>(); + })); + return async_client_factory; + })); EXPECT_CALL(dispatcher_, createTimer_(_)); - EXPECT_CALL(cm_, httpAsyncClientForCluster("eds_cluster")); - NiceMock stream; - EXPECT_CALL(cm_.async_client_, start(_, _, false)).WillOnce(Return(&stream)); - Http::TestHeaderMapImpl headers{ - {":method", "POST"}, - {":path", "/envoy.api.v2.EndpointDiscoveryService/StreamEndpoints"}, - {":authority", "eds_cluster"}, - {"content-type", "application/grpc"}, - {"te", "trailers"}}; - EXPECT_CALL(stream, sendHeaders(HeaderMapEqualRef(&headers), _)); - EXPECT_CALL(cm_.async_client_.dispatcher_, deferredDelete_(_)); + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); subscriptionFromConfigSource(config)->start({"foo"}, callbacks_); } diff --git a/test/common/config/utility_test.cc b/test/common/config/utility_test.cc index baa08d7e779f8..3368e309b8ee6 100644 --- a/test/common/config/utility_test.cc +++ b/test/common/config/utility_test.cc @@ -8,7 +8,9 @@ #include "common/protobuf/protobuf.h" #include "common/stats/stats_impl.h" +#include "test/mocks/grpc/mocks.h" #include "test/mocks/local_info/mocks.h" +#include "test/mocks/stats/mocks.h" #include "test/mocks/upstream/mocks.h" #include "test/test_common/environment.h" #include "test/test_common/utility.h" @@ -19,6 +21,7 @@ #include "gtest/gtest.h" using testing::AtLeast; +using testing::Ref; using testing::Return; using testing::ReturnRef; @@ -283,5 +286,60 @@ TEST(UtilityTest, CheckApiConfigSourceSubscriptionBackingCluster) { Utility::checkApiConfigSourceSubscriptionBackingCluster(cluster_map, *api_config_source); } +TEST(UtilityTest, FactoryForApiConfigSource) { + Grpc::MockAsyncClientManager async_client_manager; + Stats::MockStore scope; + + { + envoy::api::v2::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::api::v2::ApiConfigSource::GRPC); + EXPECT_THROW_WITH_REGEX( + Utility::factoryForApiConfigSource(async_client_manager, api_config_source, scope), + EnvoyException, "Missing gRPC services in envoy::api::v2::ApiConfigSource:"); + } + + { + envoy::api::v2::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::api::v2::ApiConfigSource::GRPC); + api_config_source.add_grpc_services(); + api_config_source.add_grpc_services(); + EXPECT_THROW_WITH_REGEX( + Utility::factoryForApiConfigSource(async_client_manager, api_config_source, scope), + EnvoyException, + "Only singleton gRPC service lists supported in envoy::api::v2::ApiConfigSource:"); + } + + { + envoy::api::v2::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::api::v2::ApiConfigSource::GRPC); + api_config_source.add_cluster_names(); + api_config_source.add_cluster_names(); + EXPECT_THROW_WITH_REGEX( + Utility::factoryForApiConfigSource(async_client_manager, api_config_source, scope), + EnvoyException, + "Only singleton cluster name lists supported in envoy::api::v2::ApiConfigSource:"); + } + + { + envoy::api::v2::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::api::v2::ApiConfigSource::GRPC); + api_config_source.add_cluster_names("foo"); + envoy::api::v2::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(expected_grpc_service), Ref(scope))); + Utility::factoryForApiConfigSource(async_client_manager, api_config_source, scope); + } + + { + envoy::api::v2::ApiConfigSource api_config_source; + api_config_source.set_api_type(envoy::api::v2::ApiConfigSource::GRPC); + api_config_source.add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("foo"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(api_config_source.grpc_services(0)), Ref(scope))); + Utility::factoryForApiConfigSource(async_client_manager, api_config_source, scope); + } +} + } // namespace Config } // namespace Envoy diff --git a/test/common/grpc/BUILD b/test/common/grpc/BUILD index c4cbb091bef6b..a589785a08855 100644 --- a/test/common/grpc/BUILD +++ b/test/common/grpc/BUILD @@ -26,6 +26,18 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "async_client_manager_impl_test", + srcs = ["async_client_manager_impl_test.cc"], + deps = [ + "//source/common/grpc:async_client_manager_lib", + "//test/mocks/stats:stats_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "//test/mocks/upstream:upstream_mocks", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "codec_test", srcs = ["codec_test.cc"], diff --git a/test/common/grpc/async_client_manager_impl_test.cc b/test/common/grpc/async_client_manager_impl_test.cc new file mode 100644 index 0000000000000..1c35fb63d5e5e --- /dev/null +++ b/test/common/grpc/async_client_manager_impl_test.cc @@ -0,0 +1,75 @@ +#include "common/grpc/async_client_manager_impl.h" + +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/upstream/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using ::testing::Return; + +namespace Envoy { +namespace Grpc { +namespace { + +class AsyncClientManagerImplTest : public testing::Test { +public: + Upstream::MockClusterManager cm_; + ThreadLocal::MockInstance tls_; + Stats::MockStore scope_; +}; + +TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) { + AsyncClientManagerImpl async_client_manager(cm_, tls_); + envoy::api::v2::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + + Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::MockCluster cluster; + cluster_map.emplace("foo", cluster); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + EXPECT_CALL(cluster, info()); + EXPECT_CALL(*cluster.info_, addedViaApi()); + + async_client_manager.factoryForGrpcService(grpc_service, scope_); +} + +TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknown) { + AsyncClientManagerImpl async_client_manager(cm_, tls_); + envoy::api::v2::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + + EXPECT_CALL(cm_, clusters()); + EXPECT_THROW_WITH_MESSAGE(async_client_manager.factoryForGrpcService(grpc_service, scope_), + EnvoyException, "Unknown gRPC client cluster 'foo'"); +} + +TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) { + AsyncClientManagerImpl async_client_manager(cm_, tls_); + envoy::api::v2::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + + Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::MockCluster cluster; + cluster_map.emplace("foo", cluster); + EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); + EXPECT_CALL(cluster, info()); + EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true)); + EXPECT_THROW_WITH_MESSAGE(async_client_manager.factoryForGrpcService(grpc_service, scope_), + EnvoyException, "gRPC client cluster 'foo' is not static"); +} + +TEST_F(AsyncClientManagerImplTest, GoogleGrpc) { + AsyncClientManagerImpl async_client_manager(cm_, tls_); + envoy::api::v2::GrpcService grpc_service; + grpc_service.mutable_google_grpc(); + + EXPECT_THROW_WITH_MESSAGE(async_client_manager.factoryForGrpcService(grpc_service, scope_), + EnvoyException, "Google C++ gRPC client is not implemented yet"); +} + +} // namespace +} // namespace Grpc +} // namespace Envoy diff --git a/test/common/ratelimit/ratelimit_impl_test.cc b/test/common/ratelimit/ratelimit_impl_test.cc index cd9b21756bc67..5aa2aadfd6c17 100644 --- a/test/common/ratelimit/ratelimit_impl_test.cc +++ b/test/common/ratelimit/ratelimit_impl_test.cc @@ -119,22 +119,34 @@ TEST_F(RateLimitGrpcClientTest, Cancel) { client_.cancel(); } -TEST(RateLimitGrpcFactoryTest, NoCluster) { +TEST(RateLimitGrpcFactoryTest, Create) { envoy::api::v2::RateLimitServiceConfig config; - config.set_cluster_name("foo"); - Upstream::MockClusterManager cm; - - EXPECT_CALL(cm, get("foo")).WillOnce(Return(nullptr)); - EXPECT_THROW(GrpcFactoryImpl(config, cm), EnvoyException); + config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("foo"); + Grpc::MockAsyncClientManager async_client_manager; + Stats::MockStore scope; + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(config.grpc_service()), Ref(scope))) + .WillOnce(Invoke([](const envoy::api::v2::GrpcService&, Stats::Scope&) { + return std::make_unique>(); + })); + GrpcFactoryImpl factory(config, async_client_manager, scope); + factory.create(Optional()); } -TEST(RateLimitGrpcFactoryTest, Create) { +// TODO(htuch): cluster_name is deprecated, remove after 1.6.0. +TEST(RateLimitGrpcFactoryTest, CreateLegacy) { envoy::api::v2::RateLimitServiceConfig config; config.set_cluster_name("foo"); - Upstream::MockClusterManager cm; - - EXPECT_CALL(cm, get("foo")).Times(AtLeast(1)); - GrpcFactoryImpl factory(config, cm); + Grpc::MockAsyncClientManager async_client_manager; + Stats::MockStore scope; + envoy::api::v2::GrpcService expected_grpc_service; + expected_grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); + EXPECT_CALL(async_client_manager, + factoryForGrpcService(ProtoEq(expected_grpc_service), Ref(scope))) + .WillOnce(Invoke([](const envoy::api::v2::GrpcService&, Stats::Scope&) { + return std::make_unique>(); + })); + GrpcFactoryImpl factory(config, async_client_manager, scope); factory.create(Optional()); } diff --git a/test/common/stats/grpc_metrics_service_impl_test.cc b/test/common/stats/grpc_metrics_service_impl_test.cc index 1e2424859086f..a6315ab3ac53d 100644 --- a/test/common/stats/grpc_metrics_service_impl_test.cc +++ b/test/common/stats/grpc_metrics_service_impl_test.cc @@ -18,19 +18,20 @@ namespace Metrics { class GrpcMetricsStreamerImplTest : public testing::Test { public: - struct TestGrpcMetricsClientFactory : public GrpcMetricsServiceClientFactory { - - Grpc::AsyncClientPtr create() { return Grpc::AsyncClientPtr{async_client_}; } - - Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient()}; - }; - typedef Grpc::MockAsyncStream MockMetricsStream; typedef Grpc::TypedAsyncStreamCallbacks MetricsServiceCallbacks; + GrpcMetricsStreamerImplTest() { + EXPECT_CALL(*factory_, create()).WillOnce(Invoke([this] { + return Grpc::AsyncClientPtr{async_client_}; + })); + streamer_ = std::make_unique(Grpc::AsyncClientFactoryPtr{factory_}, + tls_, local_info_); + } + void expectStreamStart(MockMetricsStream& stream, MetricsServiceCallbacks** callbacks_to_set) { - EXPECT_CALL(*factory_->async_client_, start(_, _)) + EXPECT_CALL(*async_client_, start(_, _)) .WillOnce(Invoke([&stream, callbacks_to_set](const Protobuf::MethodDescriptor&, Grpc::AsyncStreamCallbacks& callbacks) { *callbacks_to_set = dynamic_cast(&callbacks); @@ -40,9 +41,9 @@ class GrpcMetricsStreamerImplTest : public testing::Test { NiceMock tls_; LocalInfo::MockLocalInfo local_info_; - TestGrpcMetricsClientFactory* factory_{new TestGrpcMetricsClientFactory}; - GrpcMetricsStreamerImpl streamer_{GrpcMetricsServiceClientFactoryPtr{factory_}, tls_, - local_info_}; + Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient}; + Grpc::MockAsyncClientFactory* factory_{new Grpc::MockAsyncClientFactory}; + std::unique_ptr streamer_; }; // Test basic metrics streaming flow. @@ -56,7 +57,7 @@ TEST_F(GrpcMetricsStreamerImplTest, BasicFlow) { EXPECT_CALL(local_info_, node()); EXPECT_CALL(stream1, sendMessage(_, false)); envoy::api::v2::StreamMetricsMessage message_metrics1; - streamer_.send(message_metrics1); + streamer_->send(message_metrics1); // Verify that sending an empty response message doesn't do anything bad. callbacks1->onReceiveMessage(std::make_unique()); } @@ -65,7 +66,7 @@ TEST_F(GrpcMetricsStreamerImplTest, BasicFlow) { TEST_F(GrpcMetricsStreamerImplTest, StreamFailure) { InSequence s; - EXPECT_CALL(*factory_->async_client_, start(_, _)) + EXPECT_CALL(*async_client_, start(_, _)) .WillOnce( Invoke([](const Protobuf::MethodDescriptor&, Grpc::AsyncStreamCallbacks& callbacks) { callbacks.onRemoteClose(Grpc::Status::Internal, "bad"); @@ -73,7 +74,7 @@ TEST_F(GrpcMetricsStreamerImplTest, StreamFailure) { })); EXPECT_CALL(local_info_, node()); envoy::api::v2::StreamMetricsMessage message_metrics1; - streamer_.send(message_metrics1); + streamer_->send(message_metrics1); } class MockGrpcMetricsStreamer : public GrpcMetricsStreamer { diff --git a/test/mocks/grpc/BUILD b/test/mocks/grpc/BUILD index 58b370e6dd376..3488a7fa294e5 100644 --- a/test/mocks/grpc/BUILD +++ b/test/mocks/grpc/BUILD @@ -14,5 +14,6 @@ envoy_cc_mock( hdrs = ["mocks.h"], deps = [ "//include/envoy/grpc:async_client_interface", + "//include/envoy/grpc:async_client_manager_interface", ], ) diff --git a/test/mocks/grpc/mocks.cc b/test/mocks/grpc/mocks.cc index 0b21c5cd5390f..c9b0ca0047d60 100644 --- a/test/mocks/grpc/mocks.cc +++ b/test/mocks/grpc/mocks.cc @@ -6,5 +6,11 @@ namespace Grpc { MockAsyncRequest::MockAsyncRequest() {} MockAsyncRequest::~MockAsyncRequest() {} +MockAsyncClientFactory::MockAsyncClientFactory() {} +MockAsyncClientFactory::~MockAsyncClientFactory() {} + +MockAsyncClientManager::MockAsyncClientManager() {} +MockAsyncClientManager::~MockAsyncClientManager() {} + } // namespace Grpc } // namespace Envoy diff --git a/test/mocks/grpc/mocks.h b/test/mocks/grpc/mocks.h index 76400c28cb08c..780171c629fb5 100644 --- a/test/mocks/grpc/mocks.h +++ b/test/mocks/grpc/mocks.h @@ -4,6 +4,7 @@ #include #include "envoy/grpc/async_client.h" +#include "envoy/grpc/async_client_manager.h" #include "gmock/gmock.h" @@ -68,5 +69,23 @@ class MockAsyncClient : public AsyncClient { AsyncStreamCallbacks& callbacks)); }; +class MockAsyncClientFactory : public AsyncClientFactory { +public: + MockAsyncClientFactory(); + ~MockAsyncClientFactory(); + + MOCK_METHOD0(create, AsyncClientPtr()); +}; + +class MockAsyncClientManager : public AsyncClientManager { +public: + MockAsyncClientManager(); + ~MockAsyncClientManager(); + + MOCK_METHOD2(factoryForGrpcService, + AsyncClientFactoryPtr(const envoy::api::v2::GrpcService& grpc_service, + Stats::Scope& scope)); +}; + } // namespace Grpc } // namespace Envoy diff --git a/test/mocks/upstream/BUILD b/test/mocks/upstream/BUILD index 5f3da16f2cee4..78c6efff3a148 100644 --- a/test/mocks/upstream/BUILD +++ b/test/mocks/upstream/BUILD @@ -45,6 +45,7 @@ envoy_cc_mock( "//include/envoy/upstream:load_balancer_interface", "//include/envoy/upstream:upstream_interface", "//test/mocks/config:config_mocks", + "//test/mocks/grpc:grpc_mocks", "//test/mocks/http:http_mocks", "//test/mocks/runtime:runtime_mocks", "//test/mocks/stats:stats_mocks", diff --git a/test/mocks/upstream/mocks.cc b/test/mocks/upstream/mocks.cc index 1b93411de32b9..a3f5bb52ff831 100644 --- a/test/mocks/upstream/mocks.cc +++ b/test/mocks/upstream/mocks.cc @@ -87,6 +87,7 @@ MockClusterManager::MockClusterManager() { ON_CALL(*this, httpAsyncClientForCluster(_)).WillByDefault((ReturnRef(async_client_))); ON_CALL(*this, sourceAddress()).WillByDefault(ReturnRef(source_address_)); ON_CALL(*this, adsMux()).WillByDefault(ReturnRef(ads_mux_)); + ON_CALL(*this, grpcAsyncClientManager()).WillByDefault(ReturnRef(async_client_manager_)); ON_CALL(*this, localClusterName()).WillByDefault((ReturnRef(local_cluster_name_))); // Matches are LIFO so "" will match first. diff --git a/test/mocks/upstream/mocks.h b/test/mocks/upstream/mocks.h index a1b9943be7a77..b2d2e02133a6a 100644 --- a/test/mocks/upstream/mocks.h +++ b/test/mocks/upstream/mocks.h @@ -14,6 +14,7 @@ #include "common/common/callback_impl.h" #include "test/mocks/config/mocks.h" +#include "test/mocks/grpc/mocks.h" #include "test/mocks/http/mocks.h" #include "test/mocks/runtime/mocks.h" #include "test/mocks/stats/mocks.h" @@ -165,6 +166,7 @@ class MockClusterManager : public ClusterManager { MOCK_METHOD0(shutdown, void()); MOCK_CONST_METHOD0(sourceAddress, const Network::Address::InstanceConstSharedPtr&()); MOCK_METHOD0(adsMux, Config::GrpcMux&()); + MOCK_METHOD0(grpcAsyncClientManager, Grpc::AsyncClientManager&()); MOCK_CONST_METHOD0(versionInfo, const std::string()); MOCK_CONST_METHOD0(localClusterName, const std::string&()); @@ -173,6 +175,7 @@ class MockClusterManager : public ClusterManager { NiceMock thread_local_cluster_; Network::Address::InstanceConstSharedPtr source_address_; NiceMock ads_mux_; + NiceMock async_client_manager_; std::string local_cluster_name_; }; diff --git a/test/server/config/access_log/config_test.cc b/test/server/config/access_log/config_test.cc index 6bdd0a98cc96c..223a65e877f9d 100644 --- a/test/server/config/access_log/config_test.cc +++ b/test/server/config/access_log/config_test.cc @@ -9,7 +9,9 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::Invoke; using testing::Return; +using testing::_; namespace Envoy { namespace Server { @@ -25,6 +27,11 @@ class HttpGrpcAccessLogConfigTest : public testing::Test { message_ = factory_->createEmptyConfigProto(); ASSERT_NE(nullptr, message_); + EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _)) + .WillOnce(Invoke([](const envoy::api::v2::GrpcService&, Stats::Scope&) { + return std::make_unique>(); + })); + auto* common_config = http_grpc_access_log_.mutable_common_config(); common_config->set_log_name("foo"); common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar"); @@ -46,23 +53,6 @@ TEST_F(HttpGrpcAccessLogConfigTest, Ok) { EXPECT_NE(nullptr, dynamic_cast(instance.get())); } -// Configuration with no matching cluster. -TEST_F(HttpGrpcAccessLogConfigTest, NoCluster) { - ON_CALL(context_.cluster_manager_, get("bar")).WillByDefault(Return(nullptr)); - EXPECT_THROW_WITH_MESSAGE( - factory_->createAccessLogInstance(*message_, std::move(filter_), context_), EnvoyException, - "invalid access log cluster 'bar'. Missing or not a static cluster."); -} - -// Configuration with cluster but not a static cluster. -TEST_F(HttpGrpcAccessLogConfigTest, ClusterAddedViaApi) { - ON_CALL(*context_.cluster_manager_.thread_local_cluster_.cluster_.info_, addedViaApi()) - .WillByDefault(Return(true)); - EXPECT_THROW_WITH_MESSAGE( - factory_->createAccessLogInstance(*message_, std::move(filter_), context_), EnvoyException, - "invalid access log cluster 'bar'. Missing or not a static cluster."); -} - } // namespace Configuration } // namespace Server } // namespace Envoy diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 9ede3347e0434..9faa4ec4616b9 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -35,6 +35,14 @@ namespace Envoy { EXPECT_EQ(message, std::string(e.what())); \ } +#define EXPECT_THROW_WITH_REGEX(statement, expected_exception, regex_str) \ + try { \ + statement; \ + ADD_FAILURE() << "Exception should take place. It did not."; \ + } catch (expected_exception & e) { \ + EXPECT_THAT(e.what(), ::testing::ContainsRegex(regex_str)); \ + } + #define VERBOSE_EXPECT_NO_THROW(statement) \ try { \ statement; \