Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ class ClusterManager {
* Drain all connection pool connections owned by all clusters in the cluster manager.
*/
virtual void drainConnections() PURE;

/**
* @param cluster, the cluster to check.
* @return true if the cluster is active and static, false otherwise.
*/
virtual bool checkActiveStaticCluster(const std::string& cluster) PURE;
Comment thread
mathetake marked this conversation as resolved.
Outdated
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
9 changes: 2 additions & 7 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,8 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
}

const std::string& cluster_name = config.envoy_grpc().cluster_name();
auto all_clusters = cm_.clusters();
const auto& it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name));
}
if (it->second.get().info()->addedViaApi()) {
throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster_name));
if (!cm_.checkActiveStaticCluster(cluster_name)) {
throw EnvoyException(fmt::format("Cluster '{}' is unknown or not static", cluster_name));
}
}

Expand Down
5 changes: 5 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ void ClusterManagerImpl::drainConnections() {
});
}

bool ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
Comment thread
mathetake marked this conversation as resolved.
Outdated
const auto& it = active_clusters_.find(cluster);
Comment thread
mathetake marked this conversation as resolved.
return it != active_clusters_.end() && !it->second->added_via_api_;
}

void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
const HostVector& hosts_removed) {
tls_.runOnAllThreads([name = cluster.info()->name(),
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

void drainConnections() override;

bool checkActiveStaticCluster(const std::string& cluster) override;

protected:
virtual void postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed);

Expand Down
10 changes: 7 additions & 3 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,14 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
if (it != cache.access_loggers_.end()) {
return it->second;
}
// We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
// exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
// availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
// the main thread if necessary.
auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
const auto logger = createLogger(
config,
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false)
->createUncachedRawAsyncClient(),
config, std::move(client),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
scope);
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/access_loggers/grpc/http_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance(
const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());

const auto service_config = proto_config.common_config().grpc_service();
if (service_config.has_envoy_grpc()) {
const std::string& cluster_name = service_config.envoy_grpc().cluster_name();
if (!context.clusterManager().checkActiveStaticCluster(cluster_name)) {
throw EnvoyException(fmt::format("Cluster '{}' is unknown or not static", cluster_name));
};
}
return std::make_shared<HttpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
Expand Down
7 changes: 7 additions & 0 deletions source/extensions/access_loggers/grpc/tcp_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ AccessLog::InstanceSharedPtr TcpGrpcAccessLogFactory::createAccessLogInstance(
const envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());

const auto service_config = proto_config.common_config().grpc_service();
if (service_config.has_envoy_grpc()) {
const std::string& cluster_name = service_config.envoy_grpc().cluster_name();
if (!context.clusterManager().checkActiveStaticCluster(cluster_name)) {
throw EnvoyException(fmt::format("Cluster '{}' is unknown or not static", cluster_name));
};
}
return std::make_shared<TcpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
Expand Down
36 changes: 6 additions & 30 deletions test/common/grpc/async_client_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class AsyncClientManagerImplTest : public testing::Test {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMaps cluster_maps;
Upstream::MockClusterMockPrioritySet cluster;
cluster_maps.active_clusters_.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi());

EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Return(true));
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false);
}

Expand Down Expand Up @@ -89,30 +82,13 @@ TEST_F(AsyncClientManagerImplTest, EnableRawAsyncClientCache) {
EXPECT_NE(foo_client1.get(), bar_client.get());
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknown) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

EXPECT_CALL(cm_, clusters());
EXPECT_THROW_WITH_MESSAGE(
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException,
"Unknown gRPC client cluster 'foo'");
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcInvalid) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters())
.WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}}));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true));
EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Return(false));
EXPECT_THROW_WITH_MESSAGE(
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException,
"gRPC client cluster 'foo' is not static");
"Cluster 'foo' is unknown or not static");
}

TEST_F(AsyncClientManagerImplTest, GoogleGrpc) {
Expand Down Expand Up @@ -187,11 +163,11 @@ TEST_F(AsyncClientManagerImplTest, GoogleGrpcIllegalCharsInValue) {
#endif
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownOk) {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownSkipClusterCheck) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

EXPECT_CALL(cm_, clusters()).Times(0);
EXPECT_CALL(cm_, checkActiveStaticCluster(_)).Times(0);
ASSERT_NO_THROW(async_client_manager_.factoryForGrpcService(grpc_service, scope_, true));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class GrpcAccessLoggerCacheTest : public testing::Test {
void expectClientCreation() {
factory_ = new Grpc::MockAsyncClientFactory;
async_client_ = new Grpc::MockAsyncClient;
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false))
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
Expand Down
14 changes: 13 additions & 1 deletion test/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,22 @@ envoy_extension_cc_test(
],
)

envoy_extension_cc_test(
name = "tcp_config_test",
srcs = ["tcp_config_test.cc"],
extension_names = ["envoy.access_loggers.tcp_grpc"],
deps = [
"//source/extensions/access_loggers/grpc:tcp_config",
"//test/mocks/server:factory_context_mocks",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
],
)

envoy_extension_cc_test(
name = "tcp_grpc_access_log_integration_test",
srcs = ["tcp_grpc_access_log_integration_test.cc"],
extension_names = ["envoy.access_loggers.http_grpc"],
extension_names = ["envoy.access_loggers.tcp_grpc"],
deps = [
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/grpc:codec_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test {
: async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory),
logger_cache_(async_client_manager_, scope_, tls_, local_info_),
grpc_access_logger_impl_test_helper_(local_info_, async_client_) {
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false))
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
Expand Down
36 changes: 25 additions & 11 deletions test/extensions/access_loggers/grpc/http_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,33 @@ class HttpGrpcAccessLogConfigTest : public testing::Test {

message_ = factory_->createEmptyConfigProto();
ASSERT_NE(nullptr, message_);
}

EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
}));
void run(const std::string cluster_name) {
const auto fake_static = "fake_static";
EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name))
.WillOnce(Return(fake_static == cluster_name));

auto* common_config = http_grpc_access_log_.mutable_common_config();
common_config->set_log_name("foo");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name);
common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
TestUtility::jsonConvert(http_grpc_access_log_, *message_);

if (cluster_name == fake_static) {
EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
}));
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<HttpGrpcAccessLog*>(instance.get()));
} else {
EXPECT_THROW_WITH_MESSAGE(
factory_->createAccessLogInstance(*message_, std::move(filter_), context_),
EnvoyException, fmt::format("Cluster '{}' is unknown or not static", cluster_name));
}
}

AccessLog::FilterPtr filter_;
Expand All @@ -51,12 +67,10 @@ class HttpGrpcAccessLogConfigTest : public testing::Test {
};

// Normal OK configuration.
TEST_F(HttpGrpcAccessLogConfigTest, Ok) {
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<HttpGrpcAccessLog*>(instance.get()));
}
TEST_F(HttpGrpcAccessLogConfigTest, Ok) { run("fake_static"); }

// Wrong configuration with invalid clusters.
TEST_F(HttpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); }

} // namespace
} // namespace HttpGrpc
Expand Down
79 changes: 79 additions & 0 deletions test/extensions/access_loggers/grpc/tcp_config_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
#include "envoy/registry/registry.h"
#include "envoy/server/access_log_config.h"
#include "envoy/stats/scope.h"

#include "source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h"

#include "test/mocks/server/factory_context.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::_;
using testing::Invoke;

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace TcpGrpc {
namespace {

class TcpGrpcAccessLogConfigTest : public testing::Test {
public:
void SetUp() override {
factory_ =
Registry::FactoryRegistry<Server::Configuration::AccessLogInstanceFactory>::getFactory(
"envoy.access_loggers.tcp_grpc");
ASSERT_NE(nullptr, factory_);

message_ = factory_->createEmptyConfigProto();
ASSERT_NE(nullptr, message_);
}

void run(const std::string cluster_name) {
const auto fake_static = "fake_static";
EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name))
.WillOnce(Return(fake_static == cluster_name));

auto* common_config = tcp_grpc_access_log_.mutable_common_config();
common_config->set_log_name("foo");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name);
common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
TestUtility::jsonConvert(tcp_grpc_access_log_, *message_);

if (cluster_name == fake_static) {
EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
}));
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<TcpGrpcAccessLog*>(instance.get()));
} else {
EXPECT_THROW_WITH_MESSAGE(
factory_->createAccessLogInstance(*message_, std::move(filter_), context_),
EnvoyException, fmt::format("Cluster '{}' is unknown or not static", cluster_name));
}
}

AccessLog::FilterPtr filter_;
NiceMock<Server::Configuration::MockServerFactoryContext> context_;
envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig tcp_grpc_access_log_;
ProtobufTypes::MessagePtr message_;
Server::Configuration::AccessLogInstanceFactory* factory_{};
};

// Normal OK configuration.
TEST_F(TcpGrpcAccessLogConfigTest, Ok) { run("fake_static"); }

// Wrong configuration with invalid clusters.
TEST_F(TcpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); }

} // namespace
} // namespace TcpGrpc
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test {
: async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory),
logger_cache_(async_client_manager_, scope_, tls_, local_info_),
grpc_access_logger_impl_test_helper_(local_info_, async_client_) {
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false))
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
Expand Down
1 change: 1 addition & 0 deletions test/mocks/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class MockClusterManager : public ClusterManager {
}
MOCK_METHOD(void, drainConnections, (const std::string& cluster));
MOCK_METHOD(void, drainConnections, ());
MOCK_METHOD(bool, checkActiveStaticCluster, (const std::string& cluster));

NiceMock<MockThreadLocalCluster> thread_local_cluster_;
envoy::config::core::v3::BindConfig bind_config_;
Expand Down
2 changes: 1 addition & 1 deletion test/server/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ TEST_P(ServerInstanceImplTest, BootstrapRtdsThroughAdsViaEdsFails) {
options_.service_cluster_name_ = "some_service";
options_.service_node_name_ = "some_node_name";
EXPECT_THROW_WITH_REGEX(initialize("test/server/test_data/server/runtime_bootstrap_ads_eds.yaml"),
EnvoyException, "Unknown gRPC client cluster");
EnvoyException, "Cluster 'ads_cluster' is unknown or not static");
}

// Validate invalid runtime in bootstrap is rejected.
Expand Down