diff --git a/api/envoy/config/accesslog/v2/als.proto b/api/envoy/config/accesslog/v2/als.proto index c71fe70a8c855..a7291e4e9780e 100644 --- a/api/envoy/config/accesslog/v2/als.proto +++ b/api/envoy/config/accesslog/v2/als.proto @@ -9,6 +9,9 @@ option go_package = "v2"; import "envoy/api/v2/core/grpc_service.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; + import "validate/validate.proto"; // [#protodoc-title: gRPC Access Log Service (ALS)] @@ -49,4 +52,14 @@ message CommonGrpcAccessLogConfig { // The gRPC service for the access log service. envoy.api.v2.core.GrpcService grpc_service = 2 [(validate.rules).message.required = true]; + + // Interval for flushing access logs to the gRPC stream. Logger will flush requests every time + // this interval is elapsed, or when batch size limit is hit, whichever comes first. Defaults to + // 1 second. + google.protobuf.Duration buffer_flush_interval = 3 [(validate.rules).duration.gt = {}]; + + // Soft size limit in bytes for access log entries buffer. Logger will buffer requests until + // this limit it hit, or every time flush interval is elapsed, whichever comes first. Setting it + // to zero effectively disables the batching. Defaults to 16384. + google.protobuf.UInt32Value buffer_size_bytes = 4; } diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 1ae489d86fefe..d13c8caecd40c 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -3,6 +3,7 @@ Version history 1.12.0 (pending) ================ +* access log: added :ref:`buffering ` and :ref:`periodical flushing ` support to gRPC access logger. Defaults to 16KB buffer and flushing every 1 second. * admin: added ability to configure listener :ref:`socket options `. * admin: added config dump support for Secret Discovery Service :ref:`SecretConfigDump `. * config: added access log :ref:`extension filter`. diff --git a/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.cc b/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.cc index 928d3699c6378..17a443881c5cd 100644 --- a/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.cc @@ -43,13 +43,33 @@ void GrpcAccessLoggerImpl::LocalStream::onRemoteClose(Grpc::Status::GrpcStatus, } GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name, + std::chrono::milliseconds buffer_flush_interval_msec, + uint64_t buffer_size_bytes, + Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info) - : client_(std::move(client)), log_name_(std::move(log_name)), local_info_(local_info) {} + : client_(std::move(client)), log_name_(log_name), + buffer_flush_interval_msec_(buffer_flush_interval_msec), + flush_timer_(dispatcher.createTimer([this]() { + flush(); + flush_timer_->enableTimer(buffer_flush_interval_msec_); + })), + buffer_size_bytes_(buffer_size_bytes), local_info_(local_info) { + flush_timer_->enableTimer(buffer_flush_interval_msec_); +} void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) { - // TODO(euroelessar): Add batching and flushing. - envoy::service::accesslog::v2::StreamAccessLogsMessage message; - message.mutable_http_logs()->add_log_entry()->Swap(&entry); + approximate_message_size_bytes_ += entry.ByteSizeLong(); + message_.mutable_http_logs()->add_log_entry()->Swap(&entry); + if (approximate_message_size_bytes_ >= buffer_size_bytes_) { + flush(); + } +} + +void GrpcAccessLoggerImpl::flush() { + if (!message_.has_http_logs()) { + // Nothing to flush. + return; + } if (stream_ == absl::nullopt) { stream_.emplace(*this); @@ -61,17 +81,21 @@ void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& "envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs"), *stream_); - auto* identifier = message.mutable_identifier(); + auto* identifier = message_.mutable_identifier(); *identifier->mutable_node() = local_info_.node(); identifier->set_log_name(log_name_); } if (stream_->stream_ != nullptr) { - stream_->stream_->sendMessage(message, false); + stream_->stream_->sendMessage(message_, false); } else { // Clear out the stream data due to stream creation failure. stream_.reset(); } + + // Clear the message regardless of the success. + approximate_message_size_bytes_ = 0; + message_.Clear(); } GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, @@ -80,7 +104,8 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a const LocalInfo::LocalInfo& local_info) : async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()), local_info_(local_info) { - tls_slot_->set([](Event::Dispatcher&) { return std::make_shared(); }); + tls_slot_->set( + [](Event::Dispatcher& dispatcher) { return std::make_shared(dispatcher); }); } GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger( @@ -94,8 +119,11 @@ GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger( } const Grpc::AsyncClientFactoryPtr factory = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false); - const GrpcAccessLoggerSharedPtr logger = - std::make_shared(factory->create(), config.log_name(), local_info_); + const GrpcAccessLoggerSharedPtr logger = std::make_shared( + factory->create(), config.log_name(), + std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_, + local_info_); cache.access_loggers_.emplace(cache_key, logger); return logger; } diff --git a/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.h b/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.h index c846be0ca171c..5fb8e152108c6 100644 --- a/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.h +++ b/source/extensions/access_loggers/http_grpc/grpc_access_log_impl.h @@ -62,6 +62,8 @@ using GrpcAccessLoggerCacheSharedPtr = std::shared_ptr; class GrpcAccessLoggerImpl : public GrpcAccessLogger { public: GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name, + std::chrono::milliseconds buffer_flush_interval_msec, + uint64_t buffer_size_bytes, Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info); void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) override; @@ -83,10 +85,17 @@ class GrpcAccessLoggerImpl : public GrpcAccessLogger { Grpc::AsyncStream stream_{}; }; + void flush(); + Grpc::AsyncClient client_; const std::string log_name_; + const std::chrono::milliseconds buffer_flush_interval_msec_; + const Event::TimerPtr flush_timer_; + const uint64_t buffer_size_bytes_; + uint64_t approximate_message_size_bytes_ = 0; + envoy::service::accesslog::v2::StreamAccessLogsMessage message_; absl::optional stream_; const LocalInfo::LocalInfo& local_info_; }; @@ -105,6 +114,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL * Per-thread cache. */ struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject { + ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + + Event::Dispatcher& dispatcher_; // Access loggers indexed by the hash of logger's configuration. absl::flat_hash_map access_loggers_; }; diff --git a/test/extensions/access_loggers/http_grpc/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/http_grpc/grpc_access_log_impl_test.cc index f78eae541e263..ecf1e048fd5d0 100644 --- a/test/extensions/access_loggers/http_grpc/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/http_grpc/grpc_access_log_impl_test.cc @@ -25,15 +25,20 @@ namespace AccessLoggers { namespace HttpGrpc { namespace { +constexpr std::chrono::milliseconds FlushInterval(10); + class GrpcAccessLoggerImplTest : public testing::Test { public: using MockAccessLogStream = Grpc::MockAsyncStream; using AccessLogCallbacks = Grpc::AsyncStreamCallbacks; - GrpcAccessLoggerImplTest() { + void initLogger(std::chrono::milliseconds buffer_flush_interval_msec, size_t buffer_size_bytes) { + timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*timer_, enableTimer(buffer_flush_interval_msec)); logger_ = std::make_unique(Grpc::RawAsyncClientPtr{async_client_}, - log_name_, local_info_); + log_name_, buffer_flush_interval_msec, + buffer_size_bytes, dispatcher_, local_info_); } void expectStreamStart(MockAccessLogStream& stream, AccessLogCallbacks** callbacks_to_set) { @@ -59,6 +64,8 @@ class GrpcAccessLoggerImplTest : public testing::Test { std::string log_name_ = "test_log_name"; LocalInfo::MockLocalInfo local_info_; + Event::MockTimer* timer_ = nullptr; + Event::MockDispatcher dispatcher_; Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient}; std::unique_ptr logger_; }; @@ -66,6 +73,7 @@ class GrpcAccessLoggerImplTest : public testing::Test { // Test basic stream logging flow. TEST_F(GrpcAccessLoggerImplTest, BasicFlow) { InSequence s; + initLogger(FlushInterval, 0); // Start a stream for the first log. MockAccessLogStream stream; @@ -126,6 +134,7 @@ TEST_F(GrpcAccessLoggerImplTest, BasicFlow) { // Test that stream failure is handled correctly. TEST_F(GrpcAccessLoggerImplTest, StreamFailure) { InSequence s; + initLogger(FlushInterval, 0); EXPECT_CALL(*async_client_, startRaw(_, _, _)) .WillOnce(Invoke( @@ -138,6 +147,95 @@ TEST_F(GrpcAccessLoggerImplTest, StreamFailure) { logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); } +// Test that log entries are batched. +TEST_F(GrpcAccessLoggerImplTest, Batching) { + InSequence s; + initLogger(FlushInterval, 100); + + MockAccessLogStream stream; + AccessLogCallbacks* callbacks; + expectStreamStart(stream, &callbacks); + EXPECT_CALL(local_info_, node()); + const std::string path1(30, '1'); + const std::string path2(30, '2'); + const std::string path3(80, '3'); + expectStreamMessage(stream, fmt::format(R"EOF( +identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + log_name: test_log_name +http_logs: + log_entry: + - request: + path: "{}" + - request: + path: "{}" + - request: + path: "{}" +)EOF", + path1, path2, path3)); + envoy::data::accesslog::v2::HTTPAccessLogEntry entry; + entry.mutable_request()->set_path(path1); + logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); + entry.mutable_request()->set_path(path2); + logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); + entry.mutable_request()->set_path(path3); + logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); + + const std::string path4(120, '4'); + expectStreamMessage(stream, fmt::format(R"EOF( +http_logs: + log_entry: + request: + path: "{}" +)EOF", + path4)); + entry.mutable_request()->set_path(path4); + logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); +} + +// Test that log entries are flushed periodically. +TEST_F(GrpcAccessLoggerImplTest, Flushing) { + InSequence s; + initLogger(FlushInterval, 100); + + // Nothing to do yet. + EXPECT_CALL(*timer_, enableTimer(FlushInterval)); + timer_->invokeCallback(); + + envoy::data::accesslog::v2::HTTPAccessLogEntry entry; + // Not enough data yet to trigger flush on batch size. + entry.mutable_request()->set_path("/test/path1"); + logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry)); + + MockAccessLogStream stream; + AccessLogCallbacks* callbacks; + expectStreamStart(stream, &callbacks); + EXPECT_CALL(local_info_, node()); + expectStreamMessage(stream, fmt::format(R"EOF( + identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + log_name: test_log_name + http_logs: + log_entry: + - request: + path: /test/path1 + )EOF")); + EXPECT_CALL(*timer_, enableTimer(FlushInterval)); + timer_->invokeCallback(); + + // Flush on empty message does nothing. + EXPECT_CALL(*timer_, enableTimer(FlushInterval)); + timer_->invokeCallback(); +} + class GrpcAccessLoggerCacheImplTest : public testing::Test { public: GrpcAccessLoggerCacheImplTest() {