Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ class ClusterManager {
* Drain all connection pool connections owned by all clusters in the cluster manager.
*/
virtual void drainConnections() PURE;

/**
* Check if the cluster is active and statically configured, and if not, throw excetion.
* @param cluster, the cluster to check.
*/
virtual void checkActiveStaticCluster(const std::string& cluster) PURE;
};

using ClusterManagerPtr = std::unique_ptr<ClusterManager>;
Expand Down
11 changes: 1 addition & 10 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,7 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
if (skip_cluster_check) {
return;
}

const std::string& cluster_name = config.envoy_grpc().cluster_name();
auto all_clusters = cm_.clusters();
const auto& it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name));
}
if (it->second.get().info()->addedViaApi()) {
throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster_name));
}
cm_.checkActiveStaticCluster(config.envoy_grpc().cluster_name());
}

AsyncClientManagerImpl::AsyncClientManagerImpl(Upstream::ClusterManager& cm,
Expand Down
10 changes: 10 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,16 @@ void ClusterManagerImpl::drainConnections() {
});
}

void ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
const auto& it = active_clusters_.find(cluster);
if (it == active_clusters_.end()) {
throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster));
}
if (it->second->added_via_api_) {
throw EnvoyException(fmt::format("gRPC client cluster '{}' is not static", cluster));
}
}

void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
const HostVector& hosts_removed) {
tls_.runOnAllThreads([name = cluster.info()->name(),
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

void drainConnections() override;

void checkActiveStaticCluster(const std::string& cluster) override;

protected:
virtual void postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed);

Expand Down
10 changes: 7 additions & 3 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,14 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
if (it != cache.access_loggers_.end()) {
return it->second;
}
// We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
// exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
// availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
// the main thread if necessary.
auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
const auto logger = createLogger(
config,
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false)
->createUncachedRawAsyncClient(),
config, std::move(client),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
scope);
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/access_loggers/grpc/http_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance(
const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());

const auto service_config = proto_config.common_config().grpc_service();
if (service_config.has_envoy_grpc()) {
context.clusterManager().checkActiveStaticCluster(service_config.envoy_grpc().cluster_name());
}
return std::make_shared<HttpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/access_loggers/grpc/tcp_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ AccessLog::InstanceSharedPtr TcpGrpcAccessLogFactory::createAccessLogInstance(
const envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());

const auto service_config = proto_config.common_config().grpc_service();
if (service_config.has_envoy_grpc()) {
context.clusterManager().checkActiveStaticCluster(service_config.envoy_grpc().cluster_name());
}
return std::make_shared<TcpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
Expand Down
38 changes: 8 additions & 30 deletions test/common/grpc/async_client_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,7 @@ class AsyncClientManagerImplTest : public testing::Test {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMaps cluster_maps;
Upstream::MockClusterMockPrioritySet cluster;
cluster_maps.active_clusters_.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi());

EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Return());
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false);
}

Expand Down Expand Up @@ -89,30 +82,15 @@ TEST_F(AsyncClientManagerImplTest, EnableRawAsyncClientCache) {
EXPECT_NE(foo_client1.get(), bar_client.get());
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknown) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

EXPECT_CALL(cm_, clusters());
EXPECT_THROW_WITH_MESSAGE(
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException,
"Unknown gRPC client cluster 'foo'");
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcInvalid) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters())
.WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}}));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true));
EXPECT_CALL(cm_, checkActiveStaticCluster("foo")).WillOnce(Invoke([](const std::string&) {
throw EnvoyException("fake exception");
}));
EXPECT_THROW_WITH_MESSAGE(
async_client_manager_.factoryForGrpcService(grpc_service, scope_, false), EnvoyException,
"gRPC client cluster 'foo' is not static");
"fake exception");
}

TEST_F(AsyncClientManagerImplTest, GoogleGrpc) {
Expand Down Expand Up @@ -187,11 +165,11 @@ TEST_F(AsyncClientManagerImplTest, GoogleGrpcIllegalCharsInValue) {
#endif
}

TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownOk) {
TEST_F(AsyncClientManagerImplTest, EnvoyGrpcUnknownSkipClusterCheck) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

EXPECT_CALL(cm_, clusters()).Times(0);
EXPECT_CALL(cm_, checkActiveStaticCluster(_)).Times(0);
ASSERT_NO_THROW(async_client_manager_.factoryForGrpcService(grpc_service, scope_, true));
}

Expand Down
45 changes: 45 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4960,6 +4960,51 @@ TEST_F(ClusterManagerImplTest, ConnectionPoolPerDownstreamConnection) {
Http::Protocol::Http11, &lb_context)));
}

TEST_F(ClusterManagerImplTest, CheckActiveStaticCluster) {
const std::string yaml = R"EOF(
static_resources:
clusters:
- name: good
connect_timeout: 0.250s
lb_policy: ROUND_ROBIN
type: STATIC
load_assignment:
cluster_name: good
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11001
)EOF";
create(parseBootstrapFromV3Yaml(yaml));
const std::string added_via_api_yaml = R"EOF(
name: added_via_api
connect_timeout: 0.250s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: added_via_api
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11001
)EOF";
EXPECT_TRUE(
cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(added_via_api_yaml), "v1"));

EXPECT_EQ(2, cluster_manager_->clusters().active_clusters_.size());
EXPECT_NO_THROW(cluster_manager_->checkActiveStaticCluster("good"));
EXPECT_THROW_WITH_MESSAGE(cluster_manager_->checkActiveStaticCluster("nonexist"), EnvoyException,
"Unknown gRPC client cluster 'nonexist'");
EXPECT_THROW_WITH_MESSAGE(cluster_manager_->checkActiveStaticCluster("added_via_api"),
EnvoyException, "gRPC client cluster 'added_via_api' is not static");
}

class PreconnectTest : public ClusterManagerImplTest {
public:
void initialize(float ratio) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class GrpcAccessLoggerCacheTest : public testing::Test {
void expectClientCreation() {
factory_ = new Grpc::MockAsyncClientFactory;
async_client_ = new Grpc::MockAsyncClient;
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false))
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
Expand Down
14 changes: 13 additions & 1 deletion test/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,22 @@ envoy_extension_cc_test(
],
)

envoy_extension_cc_test(
name = "tcp_config_test",
srcs = ["tcp_config_test.cc"],
extension_names = ["envoy.access_loggers.tcp_grpc"],
deps = [
"//source/extensions/access_loggers/grpc:tcp_config",
"//test/mocks/server:factory_context_mocks",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
],
)

envoy_extension_cc_test(
name = "tcp_grpc_access_log_integration_test",
srcs = ["tcp_grpc_access_log_integration_test.cc"],
extension_names = ["envoy.access_loggers.http_grpc"],
extension_names = ["envoy.access_loggers.tcp_grpc"],
deps = [
"//source/common/buffer:zero_copy_input_stream_lib",
"//source/common/grpc:codec_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test {
: async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory),
logger_cache_(async_client_manager_, scope_, tls_, local_info_),
grpc_access_logger_impl_test_helper_(local_info_, async_client_) {
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, false))
EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true))
.WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] {
return Grpc::RawAsyncClientPtr{async_client_};
Expand Down
38 changes: 28 additions & 10 deletions test/extensions/access_loggers/grpc/http_config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,37 @@ class HttpGrpcAccessLogConfigTest : public testing::Test {

message_ = factory_->createEmptyConfigProto();
ASSERT_NE(nullptr, message_);
}

EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
void run(const std::string cluster_name) {
const auto good_cluster = "good_cluster";
EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name))
.WillOnce(Invoke([good_cluster](const std::string& cluster_name) {
if (cluster_name != good_cluster) {
throw EnvoyException("fake");
}
}));

auto* common_config = http_grpc_access_log_.mutable_common_config();
common_config->set_log_name("foo");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name);
common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
TestUtility::jsonConvert(http_grpc_access_log_, *message_);

if (cluster_name == good_cluster) {
EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
}));
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<HttpGrpcAccessLog*>(instance.get()));
} else {
EXPECT_THROW_WITH_MESSAGE(
factory_->createAccessLogInstance(*message_, std::move(filter_), context_),
EnvoyException, "fake");
}
}

AccessLog::FilterPtr filter_;
Expand All @@ -51,12 +71,10 @@ class HttpGrpcAccessLogConfigTest : public testing::Test {
};

// Normal OK configuration.
TEST_F(HttpGrpcAccessLogConfigTest, Ok) {
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<HttpGrpcAccessLog*>(instance.get()));
}
TEST_F(HttpGrpcAccessLogConfigTest, Ok) { run("good_cluster"); }

// Wrong configuration with invalid clusters.
TEST_F(HttpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); }

} // namespace
} // namespace HttpGrpc
Expand Down
83 changes: 83 additions & 0 deletions test/extensions/access_loggers/grpc/tcp_config_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
#include "envoy/registry/registry.h"
#include "envoy/server/access_log_config.h"
#include "envoy/stats/scope.h"

#include "source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.h"

#include "test/mocks/server/factory_context.h"

#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::_;
using testing::Invoke;

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace TcpGrpc {
namespace {

class TcpGrpcAccessLogConfigTest : public testing::Test {
public:
void SetUp() override {
factory_ =
Registry::FactoryRegistry<Server::Configuration::AccessLogInstanceFactory>::getFactory(
"envoy.access_loggers.tcp_grpc");
ASSERT_NE(nullptr, factory_);

message_ = factory_->createEmptyConfigProto();
ASSERT_NE(nullptr, message_);
}

void run(const std::string cluster_name) {
const auto good_cluster = "good_cluster";
EXPECT_CALL(context_.cluster_manager_, checkActiveStaticCluster(cluster_name))
.WillOnce(Invoke([good_cluster](const std::string& cluster_name) {
if (cluster_name != good_cluster) {
throw EnvoyException("fake");
}
}));

auto* common_config = tcp_grpc_access_log_.mutable_common_config();
common_config->set_log_name("foo");
common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name(cluster_name);
common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
TestUtility::jsonConvert(tcp_grpc_access_log_, *message_);

if (cluster_name == good_cluster) {
EXPECT_CALL(context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _))
.WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) {
return std::make_unique<NiceMock<Grpc::MockAsyncClientFactory>>();
}));
AccessLog::InstanceSharedPtr instance =
factory_->createAccessLogInstance(*message_, std::move(filter_), context_);
EXPECT_NE(nullptr, instance);
EXPECT_NE(nullptr, dynamic_cast<TcpGrpcAccessLog*>(instance.get()));
} else {
EXPECT_THROW_WITH_MESSAGE(
factory_->createAccessLogInstance(*message_, std::move(filter_), context_),
EnvoyException, "fake");
}
}

AccessLog::FilterPtr filter_;
NiceMock<Server::Configuration::MockServerFactoryContext> context_;
envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig tcp_grpc_access_log_;
ProtobufTypes::MessagePtr message_;
Server::Configuration::AccessLogInstanceFactory* factory_{};
};

// Normal OK configuration.
TEST_F(TcpGrpcAccessLogConfigTest, Ok) { run("good_cluster"); }

// Wrong configuration with invalid clusters.
TEST_F(TcpGrpcAccessLogConfigTest, InvalidCluster) { run("invalid"); }

} // namespace
} // namespace TcpGrpc
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
Loading