Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLogge
* @param config supplies the configuration for the logger.
* @return GrpcAccessLoggerSharedPtr ready for logging requests.
*/
virtual typename GrpcAccessLogger::SharedPtr getOrCreateLogger(const ConfigProto& config,
GrpcAccessLoggerType logger_type,
Stats::Scope& scope) PURE;
virtual typename GrpcAccessLogger::SharedPtr
getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) PURE;
};

template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
Expand Down Expand Up @@ -252,15 +251,14 @@ class GrpcAccessLoggerCache : public Singleton::Instance,

GrpcAccessLoggerCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls)
: async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()) {
: scope_(scope), async_client_manager_(async_client_manager), tls_slot_(tls.allocateSlot()) {
tls_slot_->set([](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalCache>(dispatcher);
});
}

typename GrpcAccessLogger::SharedPtr getOrCreateLogger(const ConfigProto& config,
GrpcAccessLoggerType logger_type,
Stats::Scope& scope) override {
typename GrpcAccessLogger::SharedPtr
getOrCreateLogger(const ConfigProto& config, GrpcAccessLoggerType logger_type) override {
// TODO(euroelessar): Consider cleaning up loggers.
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
Expand All @@ -273,12 +271,14 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false)
->createUncachedRawAsyncClient(),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
scope);
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_);
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}

protected:
Stats::Scope& scope_;

private:
/**
* Per-thread cache.
Expand All @@ -297,10 +297,9 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
virtual typename GrpcAccessLogger::SharedPtr
createLogger(const ConfigProto& config, const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) PURE;
Event::Dispatcher& dispatcher) PURE;

Grpc::AsyncClientManager& async_client_manager_;
Stats::Scope& scope_;
ThreadLocal::SlotPtr tls_slot_;
};

Expand Down
16 changes: 14 additions & 2 deletions source/extensions/access_loggers/grpc/config_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,21 @@ GrpcCommon::GrpcAccessLoggerCacheSharedPtr
getGrpcAccessLoggerCacheSingleton(Server::Configuration::CommonFactoryContext& context) {
return context.singletonManager().getTyped<GrpcCommon::GrpcAccessLoggerCacheImpl>(
SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] {
auto* filter_factory_context =
Comment thread
lambdai marked this conversation as resolved.
Outdated
dynamic_cast<Server::Configuration::FactoryContext*>(&context);
// Note that the factory context can be server factory context. The life of the scope in
// server factory context is good.
FANCY_LOG(trace, "in {} access log cache is created from {}", __FUNCTION__,
filter_factory_context != nullptr ? "unsafe filter factory context"
: "safe server/listener factory context");
FANCY_LOG(trace, "maybe unsafe scope addr = {} ", static_cast<void*>(&context.scope()));

auto& scope = filter_factory_context == nullptr
? context.scope()
: filter_factory_context->getServerFactoryContext().scope();
return std::make_shared<GrpcCommon::GrpcAccessLoggerCacheImpl>(
context.clusterManager().grpcAsyncClientManager(), context.scope(),
context.threadLocal(), context.localInfo());
context.clusterManager().grpcAsyncClientManager(), scope, context.threadLocal(),
context.localInfo());
});
}
} // namespace GrpcCommon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) {
Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope);
dispatcher, local_info_, scope_);
}

} // namespace GrpcCommon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class GrpcAccessLoggerCacheImpl
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) override;
Event::Dispatcher& dispatcher) override;

const LocalInfo::LocalInfo& local_info_;
};
Expand Down
6 changes: 3 additions & 3 deletions source/extensions/access_loggers/grpc/http_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance(
const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig&>(
config, context.messageValidationVisitor());

return std::make_shared<HttpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
return std::make_shared<HttpGrpcAccessLog>(
std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context));
}

ProtobufTypes::MessagePtr HttpGrpcAccessLogFactory::createEmptyConfigProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ HttpGrpcAccessLog::ThreadLocalLogger::ThreadLocalLogger(
HttpGrpcAccessLog::HttpGrpcAccessLog(
AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config,
ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope)
: Common::ImplBase(std::move(filter)), scope_(scope), config_(std::move(config)),
ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache)
: Common::ImplBase(std::move(filter)), config_(std::move(config)),
tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) {
for (const auto& header : config_.additional_request_headers_to_log()) {
request_headers_to_log_.emplace_back(header);
Expand All @@ -44,7 +43,7 @@ HttpGrpcAccessLog::HttpGrpcAccessLog(
Envoy::Config::Utility::checkTransportVersion(config_.common_config());
tls_slot_->set([this](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(access_logger_cache_->getOrCreateLogger(
config_.common_config(), Common::GrpcAccessLoggerType::HTTP, scope_));
config_.common_config(), Common::GrpcAccessLoggerType::HTTP));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class HttpGrpcAccessLog : public Common::ImplBase {
HttpGrpcAccessLog(AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config,
ThreadLocal::SlotAllocator& tls,
GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope);
GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache);

private:
/**
Expand All @@ -48,7 +47,6 @@ class HttpGrpcAccessLog : public Common::ImplBase {
const Http::ResponseTrailerMap& response_trailers,
const StreamInfo::StreamInfo& stream_info) override;

Stats::Scope& scope_;
const envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config_;
const ThreadLocal::SlotPtr tls_slot_;
const GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache_;
Expand Down
3 changes: 1 addition & 2 deletions source/extensions/access_loggers/grpc/tcp_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ AccessLog::InstanceSharedPtr TcpGrpcAccessLogFactory::createAccessLogInstance(
config, context.messageValidationVisitor());

return std::make_shared<TcpGrpcAccessLog>(std::move(filter), proto_config, context.threadLocal(),
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context),
context.scope());
GrpcCommon::getGrpcAccessLoggerCacheSingleton(context));
}

ProtobufTypes::MessagePtr TcpGrpcAccessLogFactory::createEmptyConfigProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ TcpGrpcAccessLog::ThreadLocalLogger::ThreadLocalLogger(GrpcCommon::GrpcAccessLog
TcpGrpcAccessLog::TcpGrpcAccessLog(
AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig config,
ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope)
: Common::ImplBase(std::move(filter)), scope_(scope), config_(std::move(config)),
ThreadLocal::SlotAllocator& tls, GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache)
: Common::ImplBase(std::move(filter)), config_(std::move(config)),
tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) {
Config::Utility::checkTransportVersion(config_.common_config());
tls_slot_->set([this](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(access_logger_cache_->getOrCreateLogger(
config_.common_config(), Common::GrpcAccessLoggerType::TCP, scope_));
config_.common_config(), Common::GrpcAccessLoggerType::TCP));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class TcpGrpcAccessLog : public Common::ImplBase {
TcpGrpcAccessLog(AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig config,
ThreadLocal::SlotAllocator& tls,
GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope);
GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache);

private:
/**
Expand All @@ -47,7 +46,6 @@ class TcpGrpcAccessLog : public Common::ImplBase {
const Http::ResponseTrailerMap& response_trailers,
const StreamInfo::StreamInfo& stream_info) override;

Stats::Scope& scope_;
const envoy::extensions::access_loggers::grpc::v3::TcpGrpcAccessLogConfig config_;
const ThreadLocal::SlotPtr tls_slot_;
const GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ AccessLog::ThreadLocalLogger::ThreadLocalLogger(GrpcAccessLoggerSharedPtr logger
AccessLog::AccessLog(
::Envoy::AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::open_telemetry::v3alpha::OpenTelemetryAccessLogConfig config,
ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope)
: Common::ImplBase(std::move(filter)), scope_(scope), tls_slot_(tls.allocateSlot()),
ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache)
: Common::ImplBase(std::move(filter)), tls_slot_(tls.allocateSlot()),
access_logger_cache_(std::move(access_logger_cache)) {

Envoy::Config::Utility::checkTransportVersion(config.common_config());
tls_slot_->set([this, config](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(access_logger_cache_->getOrCreateLogger(
config.common_config(), Common::GrpcAccessLoggerType::HTTP, scope_));
config.common_config(), Common::GrpcAccessLoggerType::HTTP));
});

ProtobufWkt::Struct body_format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class AccessLog : public Common::ImplBase {
AccessLog(::Envoy::AccessLog::FilterPtr&& filter,
envoy::extensions::access_loggers::open_telemetry::v3alpha::OpenTelemetryAccessLogConfig
config,
ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache,
Stats::Scope& scope);
ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache);

private:
/**
Expand All @@ -55,7 +54,6 @@ class AccessLog : public Common::ImplBase {
const Http::ResponseTrailerMap& response_trailers,
const StreamInfo::StreamInfo& stream_info) override;

Stats::Scope& scope_;
const ThreadLocal::SlotPtr tls_slot_;
const GrpcAccessLoggerCacheSharedPtr access_logger_cache_;
std::unique_ptr<Formatter::StructFormatter> body_formatter_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ AccessLogFactory::createAccessLogInstance(const Protobuf::Message& config,
config, context.messageValidationVisitor());

return std::make_shared<AccessLog>(std::move(filter), proto_config, context.threadLocal(),
getAccessLoggerCacheSingleton(context), context.scope());
getAccessLoggerCacheSingleton(context));
}

ProtobufTypes::MessagePtr AccessLogFactory::createEmptyConfigProto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) {
Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config.log_name(),
buffer_flush_interval_msec, max_buffer_size_bytes,
dispatcher, local_info_, scope);
dispatcher, local_info_, scope_);
}

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class GrpcAccessLoggerCacheImpl
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) override;
Event::Dispatcher& dispatcher) override;

const LocalInfo::LocalInfo& local_info_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ class MockGrpcAccessLoggerCache
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig&,
const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, Stats::Scope& scope) override {
Event::Dispatcher& dispatcher) override {
return std::make_shared<MockGrpcAccessLoggerImpl>(
std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, scope,
std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, scope_,
"mock_access_log_prefix.", mockMethodDescriptor());
}
};
Expand Down Expand Up @@ -354,38 +354,31 @@ class GrpcAccessLoggerCacheTest : public testing::Test {
};

TEST_F(GrpcAccessLoggerCacheTest, Deduplication) {
Stats::IsolatedStoreImpl scope;

envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig config;
config.set_log_name("log-1");
config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("cluster-1");

expectClientCreation();
MockGrpcAccessLoggerImpl::SharedPtr logger1 =
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope);
EXPECT_EQ(logger1,
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope));
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP);
EXPECT_EQ(logger1, logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));

// Do not deduplicate different types of logger
expectClientCreation();
EXPECT_NE(logger1,
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::TCP, scope));
EXPECT_NE(logger1, logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::TCP));

// Changing log name leads to another logger.
config.set_log_name("log-2");
expectClientCreation();
EXPECT_NE(logger1,
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope));
EXPECT_NE(logger1, logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));

config.set_log_name("log-1");
EXPECT_EQ(logger1,
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope));
EXPECT_EQ(logger1, logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));

// Changing cluster name leads to another logger.
config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("cluster-2");
expectClientCreation();
EXPECT_NE(logger1,
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope));
EXPECT_NE(logger1, logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));
}

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreation) {
config.mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);

GrpcAccessLoggerSharedPtr logger =
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP, scope_);
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP);
// Note that the local info node() method is mocked, so the node is not really configurable.
grpc_access_logger_impl_test_helper_.expectStreamMessage(R"EOF(
identifier:
Expand Down
Loading