diff --git a/envoy/upstream/cluster_manager.h b/envoy/upstream/cluster_manager.h index 20ca0acca4df0..16f79117b674f 100644 --- a/envoy/upstream/cluster_manager.h +++ b/envoy/upstream/cluster_manager.h @@ -321,6 +321,12 @@ class ClusterManager { * Drain all connection pool connections owned by all clusters in the cluster manager. */ virtual void drainConnections() PURE; + + /** + * Check if the cluster is active and statically configured, and if not, throw excetion. + * @param cluster, the cluster to check. + */ + virtual void checkActiveStaticCluster(const std::string& cluster) PURE; }; using ClusterManagerPtr = std::unique_ptr; diff --git a/source/common/grpc/async_client_manager_impl.cc b/source/common/grpc/async_client_manager_impl.cc index 92c368ebb6f41..ff712fca14bf0 100644 --- a/source/common/grpc/async_client_manager_impl.cc +++ b/source/common/grpc/async_client_manager_impl.cc @@ -45,16 +45,7 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm, if (skip_cluster_check) { return; } - - const std::string& cluster_name = config.envoy_grpc().cluster_name(); - auto all_clusters = cm_.clusters(); - const auto& it = all_clusters.active_clusters_.find(cluster_name); - if (it == all_clusters.active_clusters_.end()) { - throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name)); - } - if (it->second.get().info()->addedViaApi()) { - throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster_name)); - } + cm_.checkActiveStaticCluster(config.envoy_grpc().cluster_name()); } AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm, diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index cbe3507907f5d..b8ed8ec7210c4 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -971,6 +971,16 @@ void ClusterManagerImpl::drainConnections() { }); } +void ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) { + const auto& it = active_clusters_.find(cluster); + if (it == active_clusters_.end()) { + throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster)); + } + if (it->second->added_via_api_) { + throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster)); + } +} + void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed) { tls_.runOnAllThreads([name = cluster.info()->name(), diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 1540defd97712..ce5480b1ea920 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -319,6 +319,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggablesecond; } + // We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing + // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster + // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in + // the main thread if necessary. + auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true) + ->createUncachedRawAsyncClient(); const auto logger = createLogger( - config, - async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false) - ->createUncachedRawAsyncClient(), + config, std::move(client), std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)), PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_, scope); diff --git a/source/extensions/access_loggers/grpc/http_config.cc b/source/extensions/access_loggers/grpc/http_config.cc index 4d333f6d91fdc..6692a73f1d6d6 100644 --- a/source/extensions/access_loggers/grpc/http_config.cc +++ b/source/extensions/access_loggers/grpc/http_config.cc @@ -27,6 +27,10 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance( const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig&>( config, context.messageValidationVisitor()); + const auto service_config = proto_config.common_config().grpc_service(); + if (service_config.has_envoy_grpc()) { + context.clusterManager().checkActiveStaticCluster(service_config.envoy_grpc().cluster_name()); + } return std::make_shared(std::move(filter), proto_config, context.threadLocal(), GrpcCommon::getGrpcAccessLoggerCacheSingleton(context), context.scope()); diff --git a/source/extensions/access_loggers/grpc/tcp_config.cc b/source/extensions/access_loggers/grpc/tcp_config.cc index 185a76e934d8a..e259a2c5f779e 100644 --- a/source/extensions/access_loggers/grpc/tcp_config.cc +++ b/source/extensions/access_loggers/grpc/tcp_config.cc @@ -27,6 +27,10 @@ AccessLog::InstanceSharedPtr TcpGrpcAccessLogFactory::createAccessLogInstance( const envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig&>( config, context.messageValidationVisitor()); + const auto service_config = proto_config.common_config().grpc_service(); + if (service_config.has_envoy_grpc()) { + context.clusterManager().checkActiveStaticCluster(service_config.envoy_grpc().cluster_name()); + } return std::make_shared(std::move(filter), proto_config, context.threadLocal(), GrpcCommon::getGrpcAccessLoggerCacheSingleton(context), context.scope()); diff --git a/test/common/grpc/async_client_manager_impl_test.cc b/test/common/grpc/async_client_manager_impl_test.cc index fc8a365f98f9c..c0fe6baddae8d 100644 --- a/test/common/grpc/async_client_manager_impl_test.cc +++ b/test/common/grpc/async_client_manager_impl_test.cc @@ -39,14 +39,7 @@ class AsyncClientManagerImplTest : public testing::Test { TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - - Upstream::ClusterManager::ClusterInfoMaps cluster_maps; - Upstream::MockClusterMockPrioritySet cluster; - cluster_maps.active_clusters_.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps)); - EXPECT_CALL(cluster, info()); - EXPECT_CALL(*cluster.info_, addedViaApi()); - + EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Return()); async_client_manager_.factoryForGrpcService(grpc_service, scope_, false); } @@ -89,30 +82,15 @@ TEST_F(AsyncClientManagerImplTest, EnableRawAsyncClientCache) { EXPECT_NE(foo_client1.get(), bar_client.get()); } -TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknown) { - envoy::config::core::v3::GrpcService grpc_service; - grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - - EXPECT_CALL(cm_, clusters()); - EXPECT_THROW_WITH_MESSAGE( - async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException, - "Unknown gRPC client cluster 'foo'"); -} - -TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) { +TEST_F(AsyncClientManagerImplTest, EnvoyGrpcInvalid) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - - Upstream::ClusterManager::ClusterInfoMap cluster_map; - Upstream::MockClusterMockPrioritySet cluster; - cluster_map.emplace("foo", cluster); - EXPECT_CALL(cm_, clusters()) - .WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}})); - EXPECT_CALL(cluster, info()); - EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true)); + EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Invoke([](const std::string&) { + throw EnvoyException("fake exception"); + })); EXPECT_THROW_WITH_MESSAGE( async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException, - "gRPC client cluster 'foo' is not static"); + "fake exception"); } TEST_F(AsyncClientManagerImplTest, GoogleGrpc) { @@ -187,11 +165,11 @@ TEST_F(AsyncClientManagerImplTest, GoogleGrpcIllegalCharsInValue) { #endif } -TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownOk) { +TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownSkipClusterCheck) { envoy::config::core::v3::GrpcService grpc_service; grpc_service.mutable_envoy_grpc()->set_cluster_name("foo"); - EXPECT_CALL(cm_, clusters()).Times(0); + EXPECT_CALL(cm_, checkActiveStaticCluster(_)).Times(0); ASSERT_NO_THROW(async_client_manager_.factoryForGrpcService(grpc_service, scope_, true)); } diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 86f133ecef107..faaee472db0d5 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -4960,6 +4960,51 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) { Http::Protocol::Http11, &lb_context))); } +TEST_F(ClusterManagerImplTest, CheckActiveStaticCluster) { + const std::string yaml = R"EOF( + static_resources: + clusters: + - name: good + connect_timeout: 0.250s + lb_policy: ROUND_ROBIN + type: STATIC + load_assignment: + cluster_name: good + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + )EOF"; + create(parseBootstrapFromV3Yaml(yaml)); + const std::string added_via_api_yaml = R"EOF( + name: added_via_api + connect_timeout: 0.250s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: added_via_api + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + )EOF"; + EXPECT_TRUE( + cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(added_via_api_yaml), "v1")); + + EXPECT_EQ(2, cluster_manager_->clusters().active_clusters_.size()); + EXPECT_NO_THROW(cluster_manager_->checkActiveStaticCluster("good")); + EXPECT_THROW_WITH_MESSAGE(cluster_manager_->checkActiveStaticCluster("nonexist"), EnvoyException, + "Unknown gRPC client cluster 'nonexist'"); + EXPECT_THROW_WITH_MESSAGE(cluster_manager_->checkActiveStaticCluster("added_via_api"), + EnvoyException, "gRPC client cluster 'added_via_api' is not static"); +} + class PreconnectTest : public ClusterManagerImplTest { public: void initialize(float ratio) { diff --git a/test/extensions/access_loggers/common/grpc_access_logger_test.cc b/test/extensions/access_loggers/common/grpc_access_logger_test.cc index f2e125df17e06..168a749052163 100644 --- a/test/extensions/access_loggers/common/grpc_access_logger_test.cc +++ b/test/extensions/access_loggers/common/grpc_access_logger_test.cc @@ -336,7 +336,7 @@ class GrpcAccessLoggerCacheTest : public testing::Test { void expectClientCreation() { factory_ = new Grpc::MockAsyncClientFactory; async_client_ = new Grpc::MockAsyncClient; - EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false)) + EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true)) .WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] { return Grpc::RawAsyncClientPtr{async_client_}; diff --git a/test/extensions/access_loggers/grpc/BUILD b/test/extensions/access_loggers/grpc/BUILD index 31616c4a41874..0484f14e7ddff 100644 --- a/test/extensions/access_loggers/grpc/BUILD +++ b/test/extensions/access_loggers/grpc/BUILD @@ -92,10 +92,22 @@ envoy_extension_cc_test( ], ) +envoy_extension_cc_test( + name = "tcp_config_test", + srcs = ["tcp_config_test.cc"], + extension_names = ["envoy.access_loggers.tcp_grpc"], + deps = [ + "//source/extensions/access_loggers/grpc:tcp_config", + "//test/mocks/server:factory_context_mocks", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto", + ], +) + envoy_extension_cc_test( name = "tcp_grpc_access_log_integration_test", srcs = ["tcp_grpc_access_log_integration_test.cc"], - extension_names = ["envoy.access_loggers.http_grpc"], + extension_names = ["envoy.access_loggers.tcp_grpc"], deps = [ "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/grpc:codec_lib", diff --git a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc index 3e5e4f58f9008..737bbf3982f6f 100644 --- a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc @@ -128,7 +128,7 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test { : async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory), logger_cache_(async_client_manager_, scope_, tls_, local_info_), grpc_access_logger_impl_test_helper_(local_info_, async_client_) { - EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false)) + EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true)) .WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] { return Grpc::RawAsyncClientPtr{async_client_}; diff --git a/test/extensions/access_loggers/grpc/http_config_test.cc b/test/extensions/access_loggers/grpc/http_config_test.cc index c4d7db133a78a..933a4a69d967e 100644 --- a/test/extensions/access_loggers/grpc/http_config_test.cc +++ b/test/extensions/access_loggers/grpc/http_config_test.cc @@ -30,17 +30,37 @@ class HttpGrpcAccessLogConfigTest : public testing::Test { message_ = factory_->createEmptyConfigProto(); ASSERT_NE(nullptr, message_); + } - EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _)) - .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { - return std::make_unique>(); + void run(const std::string cluster_name) { + const auto good_cluster = "good_cluster"; + EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name)) + .WillOnce(Invoke([good_cluster](const std::string& cluster_name) { + if (cluster_name != good_cluster) { + throw EnvoyException("fake"); + } })); auto* common_config = http_grpc_access_log_.mutable_common_config(); common_config->set_log_name("foo"); - common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar"); + common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name); common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); TestUtility::jsonConvert(http_grpc_access_log_, *message_); + + if (cluster_name == good_cluster) { + EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _)) + .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + return std::make_unique>(); + })); + AccessLog::InstanceSharedPtr instance = + factory_->createAccessLogInstance(*message_, std::move(filter_), context_); + EXPECT_NE(nullptr, instance); + EXPECT_NE(nullptr, dynamic_cast(instance.get())); + } else { + EXPECT_THROW_WITH_MESSAGE( + factory_->createAccessLogInstance(*message_, std::move(filter_), context_), + EnvoyException, "fake"); + } } AccessLog::FilterPtr filter_; @@ -51,12 +71,10 @@ class HttpGrpcAccessLogConfigTest : public testing::Test { }; // Normal OK configuration. -TEST_F(HttpGrpcAccessLogConfigTest, Ok) { - AccessLog::InstanceSharedPtr instance = - factory_->createAccessLogInstance(*message_, std::move(filter_), context_); - EXPECT_NE(nullptr, instance); - EXPECT_NE(nullptr, dynamic_cast(instance.get())); -} +TEST_F(HttpGrpcAccessLogConfigTest, Ok) { run("good_cluster"); } + +// Wrong configuration with invalid clusters. +TEST_F(HttpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); } } // namespace } // namespace HttpGrpc diff --git a/test/extensions/access_loggers/grpc/tcp_config_test.cc b/test/extensions/access_loggers/grpc/tcp_config_test.cc new file mode 100644 index 0000000000000..9889c337de476 --- /dev/null +++ b/test/extensions/access_loggers/grpc/tcp_config_test.cc @@ -0,0 +1,83 @@ +#include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" +#include "envoy/registry/registry.h" +#include "envoy/server/access_log_config.h" +#include "envoy/stats/scope.h" + +#include "source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h" + +#include "test/mocks/server/factory_context.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace TcpGrpc { +namespace { + +class TcpGrpcAccessLogConfigTest : public testing::Test { +public: + void SetUp() override { + factory_ = + Registry::FactoryRegistry::getFactory( + "envoy.access_loggers.tcp_grpc"); + ASSERT_NE(nullptr, factory_); + + message_ = factory_->createEmptyConfigProto(); + ASSERT_NE(nullptr, message_); + } + + void run(const std::string cluster_name) { + const auto good_cluster = "good_cluster"; + EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name)) + .WillOnce(Invoke([good_cluster](const std::string& cluster_name) { + if (cluster_name != good_cluster) { + throw EnvoyException("fake"); + } + })); + + auto* common_config = tcp_grpc_access_log_.mutable_common_config(); + common_config->set_log_name("foo"); + common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name); + common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); + TestUtility::jsonConvert(tcp_grpc_access_log_, *message_); + + if (cluster_name == good_cluster) { + EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _)) + .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + return std::make_unique>(); + })); + AccessLog::InstanceSharedPtr instance = + factory_->createAccessLogInstance(*message_, std::move(filter_), context_); + EXPECT_NE(nullptr, instance); + EXPECT_NE(nullptr, dynamic_cast(instance.get())); + } else { + EXPECT_THROW_WITH_MESSAGE( + factory_->createAccessLogInstance(*message_, std::move(filter_), context_), + EnvoyException, "fake"); + } + } + + AccessLog::FilterPtr filter_; + NiceMock context_; + envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig tcp_grpc_access_log_; + ProtobufTypes::MessagePtr message_; + Server::Configuration::AccessLogInstanceFactory* factory_{}; +}; + +// Normal OK configuration. +TEST_F(TcpGrpcAccessLogConfigTest, Ok) { run("good_cluster"); } + +// Wrong configuration with invalid clusters. +TEST_F(TcpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); } + +} // namespace +} // namespace TcpGrpc +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc index 850ae1dfa4cdf..5736c94515d96 100644 --- a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc @@ -151,7 +151,7 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test { : async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory), logger_cache_(async_client_manager_, scope_, tls_, local_info_), grpc_access_logger_impl_test_helper_(local_info_, async_client_) { - EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false)) + EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true)) .WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] { return Grpc::RawAsyncClientPtr{async_client_}; diff --git a/test/mocks/upstream/cluster_manager.h b/test/mocks/upstream/cluster_manager.h index f8b43ddb76557..08f4c1c563283 100644 --- a/test/mocks/upstream/cluster_manager.h +++ b/test/mocks/upstream/cluster_manager.h @@ -70,6 +70,7 @@ class MockClusterManager : public ClusterManager { } MOCK_METHOD(void, drainConnections, (const std::string& cluster)); MOCK_METHOD(void, drainConnections, ()); + MOCK_METHOD(void, checkActiveStaticCluster, (const std::string& cluster)); NiceMock thread_local_cluster_; envoy::config::core::v3::BindConfig bind_config_;