diff --git a/api/envoy/extensions/access_loggers/grpc/v3/BUILD b/api/envoy/extensions/access_loggers/grpc/v3/BUILD index 1c1a6f6b44235..25c7a9c3aed13 100644 --- a/api/envoy/extensions/access_loggers/grpc/v3/BUILD +++ b/api/envoy/extensions/access_loggers/grpc/v3/BUILD @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/config/accesslog/v3:pkg", "//envoy/config/core/v3:pkg", "@com_github_cncf_udpa//udpa/annotations:pkg", ], diff --git a/api/envoy/extensions/access_loggers/grpc/v3/als.proto b/api/envoy/extensions/access_loggers/grpc/v3/als.proto index a671873580f31..d2c6fe1ba3eda 100644 --- a/api/envoy/extensions/access_loggers/grpc/v3/als.proto +++ b/api/envoy/extensions/access_loggers/grpc/v3/als.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package envoy.extensions.access_loggers.grpc.v3; +import "envoy/config/accesslog/v3/accesslog.proto"; import "envoy/config/core/v3/base.proto"; import "envoy/config/core/v3/config_source.proto"; import "envoy/config/core/v3/grpc_service.proto"; @@ -55,7 +56,7 @@ message TcpGrpcAccessLogConfig { } // Common configuration for gRPC access logs. -// [#next-free-field: 8] +// [#next-free-field: 11] message CommonGrpcAccessLogConfig { option (udpa.annotations.versioning).previous_message_type = "envoy.config.accesslog.v2.CommonGrpcAccessLogConfig"; @@ -96,4 +97,20 @@ message CommonGrpcAccessLogConfig { // will be used in this configuration. This feature is used only when you are using // :ref:`Envoy gRPC client `. config.core.v3.RetryPolicy grpc_stream_retry_policy = 7; + + // Define the log condition for critical access logs. + // Logs that match the filter are not sent to `StreamAccessLogs`, + // but are sent to `CriticalAccessLogs`. + config.accesslog.v3.AccessLogFilter critical_buffer_log_filter = 8; + + // The time to wait for an ACK message. If no ACK message is returned after this time, + // the message is considered undeliverable and the failed transmission is buffered again. + // The re-buffered message will be sent again at the next time a log matching *critical_buffer_log_filter* is queued. + google.protobuf.Duration message_ack_timeout = 9 + [(validate.rules).duration = {gte {nanos: 1000000}}]; + + // Size limit (in bytes) of the buffer used to store messages during processing in a + // critical logger. A critical logger buffers messages until it receives an ACK from upstream. + // The default is 16384. + google.protobuf.UInt32Value max_pending_buffer_size_bytes = 10; } diff --git a/api/envoy/service/accesslog/v3/als.proto b/api/envoy/service/accesslog/v3/als.proto index 94a290ad4a325..4f594130c2d3b 100644 --- a/api/envoy/service/accesslog/v3/als.proto +++ b/api/envoy/service/accesslog/v3/als.proto @@ -21,12 +21,40 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE; service AccessLogService { // Envoy will connect and send StreamAccessLogsMessage messages forever. It does not expect any // response to be sent as nothing would be done in the case of failure. The server should - // disconnect if it expects Envoy to reconnect. In the future we may decide to add a different - // API for "critical" access logs in which Envoy will buffer access logs for some period of time - // until it gets an ACK so it could then retry. This API is designed for high throughput with the + // disconnect if it expects Envoy to reconnect. This API is designed for high throughput with the // expectation that it might be lossy. rpc StreamAccessLogs(stream StreamAccessLogsMessage) returns (StreamAccessLogsResponse) { } + + // This endpoint provides acknowledgment of logs marked as requiring acknowledgment. + // The requirement for an acknowledgment can be set in + // :ref:`critical_buffer_log_filter `. + // Log messages that match this filter will be guaranteed delivery. In order to guarantee + // the arrival, this endpoint performs the following process. + // + // 1. A response message is returned for each log. The response message includes ACK/NACK status, + // and in case of NACK, the target log is not flushed but buffered by Envoy. + // 2. Timeout for response message is set and if no message is returned within a certain time, + // it will be considered as unreachable and buffered by Envoy without flushing the target log. This timeout is set by + // :ref:`message_ack_timeout `. + // + // On the ALS receiver side, ACK is expected to be returned to indicate that the log was saved properly, + // and NACK is expected to be returned when the log could not be saved due to some error. + // + // .. attention:: + // + // Buffers for guaranteed reachability can be extremely memory-intensive. Therefore, the following points + // should be considered when using this endpoint. + // + // 1. :ref:`critical_buffer_log_filter ` + // should be set strictly. A loose filter may encourage rapid buffer overwhelm and leading to OOM. + // 2. :ref:`max_pending_buffer_size_bytes ` + // should be set appropriately to prevent OOM. + // 3. Make sure that ALS receiver is implemented properly. If it is not implemented, all messages will + // be buffered, which may cause OOM soon. + rpc CriticalAccessLogs(stream CriticalAccessLogsMessage) + returns (stream CriticalAccessLogsResponse) { + } } // Empty response for the StreamAccessLogs API. Will never be sent. See below. @@ -35,6 +63,23 @@ message StreamAccessLogsResponse { "envoy.service.accesslog.v2.StreamAccessLogsResponse"; } +// Response received to identify undelivered or delivered messages in CriticalAccessLogs. +message CriticalAccessLogsResponse { + enum Status { + // Indicates that the message has been received. + ACK = 0; + + // Indicates that the message has not been received. + NACK = 1; + } + + // This field is used to indicate the arrival status. + Status status = 1; + + // Message ID that identifies a message. + uint64 id = 2; +} + // Stream message for the StreamAccessLogs API. Envoy will open a stream to the server and stream // access logs without ever expecting a response. message StreamAccessLogsMessage { @@ -85,3 +130,16 @@ message StreamAccessLogsMessage { TCPAccessLogEntries tcp_logs = 3; } } + +// Stream message for the CriticalAccessLogs API. +// Envoy opens a stream to the server and streams the access log, +// expecting a response. Each message sent is assigned an individual ID, +// and the state of the message is tracked based on the ID. +message CriticalAccessLogsMessage { + // The body of the log message sent to CriticalAccessLogs. + StreamAccessLogsMessage message = 1; + + // This is an ID to identify the message, and should be added to the Critical Endpoint + // response message to uniquely identify the message being ACK/NACKed. + uint64 id = 4; +} diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 0d9dcd8fa1d65..bc38607690c46 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -49,6 +49,7 @@ Removed Config or Runtime New Features ------------ +* access log: added :ref:`critical log message ` to AccessLogService to guarantee log arrival. * access log: added :ref:`grpc_stream_retry_policy ` to the gRPC logger to reconnect when a connection fails to be established. * api: added support for *xds.type.v3.TypedStruct* in addition to the now-deprecated *udpa.type.v1.TypedStruct* proto message, which is a wrapper proto used to encode typed JSON data in a *google.protobuf.Any* field. * bootstrap: added :ref:`typed_dns_resolver_config ` in the bootstrap to support DNS resolver as an extension. diff --git a/source/extensions/access_loggers/common/BUILD b/source/extensions/access_loggers/common/BUILD index d7968b6e0560c..bf4c5d10669bf 100644 --- a/source/extensions/access_loggers/common/BUILD +++ b/source/extensions/access_loggers/common/BUILD @@ -59,5 +59,6 @@ envoy_cc_library( "//source/common/protobuf:utility_lib", "@com_google_absl//absl/types:optional", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/service/accesslog/v3:pkg_cc_proto", ], ) diff --git a/source/extensions/access_loggers/common/grpc_access_logger.h b/source/extensions/access_loggers/common/grpc_access_logger.h index 446dd2698f67a..844cfc74cf854 100644 --- a/source/extensions/access_loggers/common/grpc_access_logger.h +++ b/source/extensions/access_loggers/common/grpc_access_logger.h @@ -5,10 +5,12 @@ #include "envoy/config/core/v3/config_source.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/grpc/async_client_manager.h" +#include "envoy/service/accesslog/v3/als.pb.h" #include "envoy/singleton/instance.h" #include "envoy/stats/scope.h" #include "envoy/thread_local/thread_local.h" +#include "source/common/access_log/access_log_impl.h" #include "source/common/common/assert.h" #include "source/common/grpc/typed_async_client.h" #include "source/common/http/utility.h" @@ -42,17 +44,28 @@ template class GrpcAccessLogger { virtual ~GrpcAccessLogger() = default; + /** + * Start interval log flusher. + * @param dispatcher Event dispatcher to create timer to flush log message. + * @param buffer_flush_interval_msec Buffer flush interval msec. + */ + virtual void + startIntervalFlushTimer(Event::Dispatcher& dispatcher, + const std::chrono::milliseconds buffer_flush_interval_msec) PURE; + /** * Log http access entry. * @param entry supplies the access log to send. + * @param is_critical determine whether log entry is critical or not. */ - virtual void log(HttpLogProto&& entry) PURE; + virtual void log(HttpLogProto&& entry, bool is_critical) PURE; /** * Log tcp access entry. * @param entry supplies the access log to send. + * @param is_critical determine whether log entry is critical or not. */ - virtual void log(TcpLogProto&& entry) PURE; + virtual void log(TcpLogProto&& entry, bool is_critical) PURE; }; /** @@ -172,24 +185,25 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger; - GrpcAccessLogger(const Grpc::RawAsyncClientSharedPtr& client, - std::chrono::milliseconds buffer_flush_interval_msec, - uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, + GrpcAccessLogger(const Grpc::RawAsyncClientSharedPtr& client, uint64_t max_buffer_size_bytes, Stats::Scope& scope, std::string access_log_prefix, const Protobuf::MethodDescriptor& service_method, const envoy::config::core::v3::RetryPolicy& retry_policy) : client_(client, service_method, retry_policy), - buffer_flush_interval_msec_(buffer_flush_interval_msec), - flush_timer_(dispatcher.createTimer([this]() { - flush(); - flush_timer_->enableTimer(buffer_flush_interval_msec_); - })), max_buffer_size_bytes_(max_buffer_size_bytes), - stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) { - flush_timer_->enableTimer(buffer_flush_interval_msec_); + stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {} + + void + startIntervalFlushTimer(Event::Dispatcher& dispatcher, + const std::chrono::milliseconds buffer_flush_interval_msec) override { + flush_timer_ = dispatcher.createTimer([this, buffer_flush_interval_msec]() { + flush(); + flush_timer_->enableTimer(buffer_flush_interval_msec); + }); + flush_timer_->enableTimer(buffer_flush_interval_msec); } - void log(HttpLogProto&& entry) override { + void log(HttpLogProto&& entry, bool) override { if (!canLogMore()) { return; } @@ -200,7 +214,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger= max_buffer_size_bytes_) { @@ -209,16 +223,6 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger client_; - LogRequest message_; - -private: - virtual bool isEmpty() PURE; - virtual void initMessage() PURE; - virtual void addEntry(HttpLogProto&& entry) PURE; - virtual void addEntry(TcpLogProto&& entry) PURE; - virtual void clearMessage() { message_.Clear(); } - void flush() { if (isEmpty()) { // Nothing to flush. @@ -236,6 +240,17 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger client_; + LogRequest message_; + Event::TimerPtr flush_timer_; + +private: + virtual bool isEmpty() PURE; + virtual void initMessage() PURE; + virtual void addEntry(HttpLogProto&& entry) PURE; + virtual void addEntry(TcpLogProto&& entry) PURE; + virtual void clearMessage() { message_.Clear(); } + bool canLogMore() { if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) { stats_.logs_written_.inc(); @@ -250,8 +265,6 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger + #include "envoy/data/accesslog/v3/accesslog.pb.h" #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" #include "envoy/grpc/async_client_manager.h" @@ -8,8 +10,6 @@ #include "source/common/config/utility.h" #include "source/common/grpc/typed_async_client.h" -const char GRPC_LOG_STATS_PREFIX[] = "access_logs.grpc_access_log."; - namespace Envoy { namespace Extensions { namespace AccessLoggers { @@ -18,14 +18,21 @@ namespace GrpcCommon { GrpcAccessLoggerImpl::GrpcAccessLoggerImpl( const Grpc::RawAsyncClientSharedPtr& client, const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, - std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, - Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) - : GrpcAccessLogger(std::move(client), buffer_flush_interval_msec, max_buffer_size_bytes, - dispatcher, scope, GRPC_LOG_STATS_PREFIX, + uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, + const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) + : GrpcAccessLogger(client, max_buffer_size_bytes, scope, GRPC_LOG_STATS_PREFIX.data(), *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs"), config.grpc_stream_retry_policy()), - log_name_(config.log_name()), local_info_(local_info) {} + log_name_(config.log_name()), local_info_(local_info) { + critical_log_client_ = std::make_unique( + client, + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.accesslog.v3.AccessLogService.CriticalAccessLogs"), + dispatcher, scope, local_info, log_name_, + PROTOBUF_GET_MS_OR_DEFAULT(config, message_ack_timeout, 5000), + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_pending_buffer_size_bytes, 16384)); +} void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) { message_.mutable_http_logs()->mutable_log_entry()->Add(std::move(entry)); @@ -35,6 +42,18 @@ void GrpcAccessLoggerImpl::addEntry(envoy::data::accesslog::v3::TCPAccessLogEntr message_.mutable_tcp_logs()->mutable_log_entry()->Add(std::move(entry)); } +void GrpcAccessLoggerImpl::addCriticalMessageEntry( + envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) { + critical_message_.mutable_message()->mutable_http_logs()->mutable_log_entry()->Add( + std::move(entry)); +} + +void GrpcAccessLoggerImpl::addCriticalMessageEntry( + envoy::data::accesslog::v3::TCPAccessLogEntry&& entry) { + critical_message_.mutable_message()->mutable_tcp_logs()->mutable_log_entry()->Add( + std::move(entry)); +} + bool GrpcAccessLoggerImpl::isEmpty() { return !message_.has_http_logs() && !message_.has_tcp_logs(); } @@ -45,6 +64,61 @@ void GrpcAccessLoggerImpl::initMessage() { identifier->set_log_name(log_name_); } +bool GrpcAccessLoggerImpl::isCriticalMessageEmpty() { + return !critical_message_.message().has_http_logs() && + !critical_message_.message().has_tcp_logs(); +} + +void GrpcAccessLoggerImpl::flushCriticalMessage() { + if (isCriticalMessageEmpty()) { + return; + } + + approximate_critical_message_size_bytes_ = 0; + critical_log_client_->flush(critical_message_); + clearCriticalMessage(); +} + +GrpcCriticalAccessLogClient::GrpcCriticalAccessLogClient( + const Grpc::RawAsyncClientSharedPtr& client, const Protobuf::MethodDescriptor& method, + Event::Dispatcher& dispatcher, Stats::Scope& scope, const LocalInfo::LocalInfo& local_info, + const std::string& log_name, uint64_t message_ack_timeout, + uint64_t max_pending_buffer_size_bytes) + : dispatcher_(dispatcher), message_ack_timeout_(message_ack_timeout), + stats_({CRITICAL_ACCESS_LOGGER_GRPC_CLIENT_STATS( + POOL_COUNTER_PREFIX(scope, GRPC_LOG_STATS_PREFIX.data()), + POOL_GAUGE_PREFIX(scope, GRPC_LOG_STATS_PREFIX.data()))}), + local_info_(local_info), log_name_(log_name), stream_callback_(*this) { + client_ = std::make_unique>( + max_pending_buffer_size_bytes, method, stream_callback_, client); +} + +void GrpcCriticalAccessLogClient::flush(GrpcCriticalAccessLogClient::RequestType& message) { + if (inflight_message_ttl_ == nullptr) { + inflight_message_ttl_ = std::make_unique( + dispatcher_, stats_, *client_, message_ack_timeout_); + } + + if (!client_->hasActiveStream()) { + setLogIdentifier(message); + } + + const auto message_id = client_->bufferMessage(message); + if (!message_id.has_value()) { + return; + } + + message.set_id(message_id.value()); + stats_.pending_critical_logs_.inc(); + inflight_message_ttl_->setDeadline(client_->sendBufferedMessages()); +} + +void GrpcCriticalAccessLogClient::setLogIdentifier(RequestType& message) { + auto* identifier = message.mutable_message()->mutable_identifier(); + *identifier->mutable_node() = local_info_.node(); + identifier->set_log_name(log_name_); +} + GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope, ThreadLocal::SlotAllocator& tls, @@ -56,9 +130,10 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger( const Grpc::RawAsyncClientSharedPtr& client, std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher) { - return std::make_shared(client, config, buffer_flush_interval_msec, - max_buffer_size_bytes, dispatcher, local_info_, - scope_); + auto logger = std::make_shared(client, config, max_buffer_size_bytes, + dispatcher, local_info_, scope_); + logger->startIntervalFlushTimer(dispatcher, buffer_flush_interval_msec); + return logger; } } // namespace GrpcCommon diff --git a/source/extensions/access_loggers/grpc/grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/grpc_access_log_impl.h index 34aab4cf6edc9..9f1691f9dc4ea 100644 --- a/source/extensions/access_loggers/grpc/grpc_access_log_impl.h +++ b/source/extensions/access_loggers/grpc/grpc_access_log_impl.h @@ -1,15 +1,20 @@ #pragma once +#include #include +#include "envoy/common/time.h" #include "envoy/data/accesslog/v3/accesslog.pb.h" #include "envoy/event/dispatcher.h" #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" #include "envoy/grpc/async_client_manager.h" #include "envoy/local_info/local_info.h" #include "envoy/service/accesslog/v3/als.pb.h" +#include "envoy/stats/stats_macros.h" #include "envoy/thread_local/thread_local.h" +#include "source/common/common/linked_object.h" +#include "source/common/grpc/buffered_async_client.h" #include "source/extensions/access_loggers/common/grpc_access_logger.h" namespace Envoy { @@ -17,25 +22,201 @@ namespace Extensions { namespace AccessLoggers { namespace GrpcCommon { +static constexpr absl::string_view GRPC_LOG_STATS_PREFIX = "access_logs.grpc_access_log."; + +#define CRITICAL_ACCESS_LOGGER_GRPC_CLIENT_STATS(COUNTER, GAUGE) \ + COUNTER(critical_logs_message_timeout) \ + COUNTER(critical_logs_nack_received) \ + COUNTER(critical_logs_ack_received) \ + GAUGE(pending_critical_logs, Accumulate) + +struct GrpcCriticalAccessLogClientGrpcClientStats { + CRITICAL_ACCESS_LOGGER_GRPC_CLIENT_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +class GrpcCriticalAccessLogClient { +public: + using RequestType = envoy::service::accesslog::v3::CriticalAccessLogsMessage; + using ResponseType = envoy::service::accesslog::v3::CriticalAccessLogsResponse; + + struct CriticalLogStream : public Grpc::AsyncStreamCallbacks { + explicit CriticalLogStream(GrpcCriticalAccessLogClient& parent) : parent_(parent) {} + + // Grpc::AsyncStreamCallbacks + void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} + void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} + void onReceiveMessage(std::unique_ptr&& message) override { + const auto& id = message->id(); + + switch (message->status()) { + case envoy::service::accesslog::v3::CriticalAccessLogsResponse::ACK: + parent_.stats_.critical_logs_ack_received_.inc(); + parent_.stats_.pending_critical_logs_.dec(); + parent_.client_->onSuccess(id); + break; + case envoy::service::accesslog::v3::CriticalAccessLogsResponse::NACK: + parent_.stats_.critical_logs_nack_received_.inc(); + parent_.client_->onError(id); + break; + default: + return; + } + } + void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} + void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { + // TODO(shikugawa): should it be cleanuped? + } + + GrpcCriticalAccessLogClient& parent_; + }; + + class InflightMessageTtlManager { + public: + InflightMessageTtlManager(Event::Dispatcher& dispatcher, + GrpcCriticalAccessLogClientGrpcClientStats& stats, + Grpc::BufferedAsyncClient& client, + std::chrono::milliseconds message_ack_timeout) + : dispatcher_(dispatcher), message_ack_timeout_(message_ack_timeout), stats_(stats), + client_(client), timer_(dispatcher_.createTimer([this] { callback(); })) {} + + ~InflightMessageTtlManager() { timer_->disableTimer(); } + + void setDeadline(absl::flat_hash_set&& ids) { + const auto expires_at = dispatcher_.timeSource().monotonicTime() + message_ack_timeout_; + deadline_.emplace(expires_at, std::move(ids)); + + if (!timer_->enabled()) { + timer_->enableTimer(message_ack_timeout_); + } + } + + private: + void callback() { + const auto now = dispatcher_.timeSource().monotonicTime(); + + auto begin_it = deadline_.lower_bound(now); + const auto& message_buffer = client_.messageBuffer(); + + for (auto it = begin_it; it != deadline_.end(); ++it) { + for (auto&& id : it->second) { + const auto& message_it = message_buffer.find(id); + + if (message_it == message_buffer.end()) { + continue; + } + + // If the retrieved message is a PendingFlush, it means that the message + // has timed out. A timeout is treated as an error, and the callback will + // re-buffer the message. + if (message_it->second.first == Grpc::BufferState::PendingFlush) { + client_.onError(id); + stats_.critical_logs_message_timeout_.inc(); + } + } + } + + deadline_.erase(begin_it, deadline_.end()); + + if (!deadline_.empty()) { + const auto earliest_timepoint = deadline_.rbegin()->first; + timer_->enableTimer( + std::chrono::duration_cast(earliest_timepoint - now)); + } + } + + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds message_ack_timeout_; + GrpcCriticalAccessLogClientGrpcClientStats& stats_; + Grpc::BufferedAsyncClient& client_; + Event::TimerPtr timer_; + std::map, std::greater<>> deadline_; + }; + + GrpcCriticalAccessLogClient(const Grpc::RawAsyncClientSharedPtr& client, + const Protobuf::MethodDescriptor& method, + Event::Dispatcher& dispatcher, Stats::Scope& scope, + const LocalInfo::LocalInfo& local_info, const std::string& log_name, + uint64_t message_ack_timeout, uint64_t max_pending_buffer_size_bytes); + + void flush(RequestType& message); + +private: + friend CriticalLogStream; + + void setLogIdentifier(RequestType& request); + + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds message_ack_timeout_; + GrpcCriticalAccessLogClientGrpcClientStats stats_; + const LocalInfo::LocalInfo& local_info_; + const std::string log_name_; + CriticalLogStream stream_callback_; + Grpc::BufferedAsyncClientPtr client_; + std::unique_ptr inflight_message_ttl_; +}; + class GrpcAccessLoggerImpl : public Common::GrpcAccessLogger { public: + using TcpLogProto = envoy::data::accesslog::v3::TCPAccessLogEntry; + using HttpLogProto = envoy::data::accesslog::v3::HTTPAccessLogEntry; + using BaseLogger = + Common::GrpcAccessLogger; + GrpcAccessLoggerImpl( const Grpc::RawAsyncClientSharedPtr& client, const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, - std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, - Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope); + uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher, + const LocalInfo::LocalInfo& local_info, Stats::Scope& scope); + + void + startIntervalFlushTimer(Event::Dispatcher& dispatcher, + const std::chrono::milliseconds buffer_flush_interval_msec) override { + flush_timer_ = dispatcher.createTimer([this, buffer_flush_interval_msec]() { + flush(); + flushCriticalMessage(); + flush_timer_->enableTimer(buffer_flush_interval_msec); + }); + flush_timer_->enableTimer(buffer_flush_interval_msec); + } + + void log(HttpLogProto&& entry, bool is_critical) override { + if (is_critical) { + approximate_critical_message_size_bytes_ += entry.ByteSizeLong(); + addCriticalMessageEntry(std::move(entry)); + + if (approximate_critical_message_size_bytes_ >= max_critical_message_size_bytes_) { + flushCriticalMessage(); + } + return; + } + BaseLogger::log(std::move(entry), false); + } + + void log(TcpLogProto&& entry, bool) override { BaseLogger::log(std::move(entry), false); } private: + bool isCriticalMessageEmpty(); + void addCriticalMessageEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry); + void addCriticalMessageEntry(envoy::data::accesslog::v3::TCPAccessLogEntry&& entry); + void flushCriticalMessage(); + void clearCriticalMessage() { critical_message_.Clear(); } + // Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger void addEntry(envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) override; void addEntry(envoy::data::accesslog::v3::TCPAccessLogEntry&& entry) override; bool isEmpty() override; void initMessage() override; + uint64_t approximate_critical_message_size_bytes_ = 0; + uint64_t max_critical_message_size_bytes_ = 0; + std::unique_ptr critical_log_client_; + envoy::service::accesslog::v3::CriticalAccessLogsMessage critical_message_; const std::string log_name_; const LocalInfo::LocalInfo& local_info_; }; diff --git a/source/extensions/access_loggers/grpc/http_config.cc b/source/extensions/access_loggers/grpc/http_config.cc index 5d3b795100670..da8f22fc87680 100644 --- a/source/extensions/access_loggers/grpc/http_config.cc +++ b/source/extensions/access_loggers/grpc/http_config.cc @@ -31,9 +31,9 @@ AccessLog::InstanceSharedPtr HttpGrpcAccessLogFactory::createAccessLogInstance( if (service_config.has_envoy_grpc()) { context.clusterManager().checkActiveStaticCluster(service_config.envoy_grpc().cluster_name()); } - return std::make_shared( - std::move(filter), proto_config, context.threadLocal(), - GrpcCommon::getGrpcAccessLoggerCacheSingleton(context)); + return std::make_shared(std::move(filter), proto_config, + GrpcCommon::getGrpcAccessLoggerCacheSingleton(context), + context); } ProtobufTypes::MessagePtr HttpGrpcAccessLogFactory::createEmptyConfigProto() { diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc index e3de3291a40cf..5a7320186062a 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.cc @@ -1,5 +1,7 @@ #include "source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h" +#include + #include "envoy/config/core/v3/base.pb.h" #include "envoy/data/accesslog/v3/accesslog.pb.h" #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" @@ -25,11 +27,12 @@ HttpGrpcAccessLog::ThreadLocalLogger::ThreadLocalLogger( HttpGrpcAccessLog::HttpGrpcAccessLog(AccessLog::FilterPtr&& filter, const HttpGrpcAccessLogConfig config, - ThreadLocal::SlotAllocator& tls, - GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache) - : Common::ImplBase(std::move(filter)), + GrpcCommon::GrpcAccessLoggerCacheSharedPtr access_logger_cache, + Server::Configuration::CommonFactoryContext& context) + : Common::ImplBase(std::move(filter)), /* scope_(context.scope()), */ config_(std::make_shared(std::move(config))), - tls_slot_(tls.allocateSlot()), access_logger_cache_(std::move(access_logger_cache)) { + tls_slot_(context.threadLocal().allocateSlot()), + access_logger_cache_(std::move(access_logger_cache)) { for (const auto& header : config_->additional_request_headers_to_log()) { request_headers_to_log_.emplace_back(header); } @@ -42,11 +45,18 @@ HttpGrpcAccessLog::HttpGrpcAccessLog(AccessLog::FilterPtr&& filter, response_trailers_to_log_.emplace_back(header); } Envoy::Config::Utility::checkTransportVersion(config_->common_config()); + tls_slot_->set( [config = config_, access_logger_cache = access_logger_cache_](Event::Dispatcher&) { return std::make_shared(access_logger_cache->getOrCreateLogger( config->common_config(), Common::GrpcAccessLoggerType::HTTP)); }); + + if (config_->has_common_config() && config_->common_config().has_critical_buffer_log_filter()) { + critical_log_filter_ = AccessLog::FilterFactory::fromProto( + config_->common_config().critical_buffer_log_filter(), context.runtime(), + context.api().randomGenerator(), context.messageValidationVisitor()); + } } void HttpGrpcAccessLog::emitLog(const Http::RequestHeaderMap& request_headers, @@ -161,7 +171,13 @@ void HttpGrpcAccessLog::emitLog(const Http::RequestHeaderMap& request_headers, } } - tls_slot_->getTyped().logger_->log(std::move(log_entry)); + bool is_critical = false; + if (critical_log_filter_) { + is_critical = critical_log_filter_->evaluate(stream_info, request_headers, response_headers, + response_trailers); + } + + tls_slot_->getTyped().logger_->log(std::move(log_entry), is_critical); } } // namespace HttpGrpc diff --git a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h index 6cfaf97d56177..a9d31645d8903 100644 --- a/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h +++ b/source/extensions/access_loggers/grpc/http_grpc_access_log_impl.h @@ -30,8 +30,8 @@ using HttpGrpcAccessLogConfigConstSharedPtr = std::shared_ptr response_headers_to_log_; std::vector response_trailers_to_log_; std::vector filter_states_to_log_; + AccessLog::FilterPtr critical_log_filter_; }; using HttpGrpcAccessLogPtr = std::unique_ptr; diff --git a/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc index fb1a2a4d0bd2c..0b07fc9d2c74c 100644 --- a/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/grpc/tcp_grpc_access_log_impl.cc @@ -46,7 +46,8 @@ void TcpGrpcAccessLog::emitLog(const Http::RequestHeaderMap&, const Http::Respon connection_properties.set_sent_bytes(stream_info.bytesSent()); // request_properties->set_request_body_bytes(stream_info.bytesReceived()); - tls_slot_->getTyped().logger_->log(std::move(log_entry)); + // TODO(shikugawa): implement TCP gRPC access critical message buffering. + tls_slot_->getTyped().logger_->log(std::move(log_entry), false); } } // namespace TcpGrpc diff --git a/source/extensions/access_loggers/open_telemetry/access_log_impl.cc b/source/extensions/access_loggers/open_telemetry/access_log_impl.cc index 38b6a5d644b93..0f9a865cf6e64 100644 --- a/source/extensions/access_loggers/open_telemetry/access_log_impl.cc +++ b/source/extensions/access_loggers/open_telemetry/access_log_impl.cc @@ -73,7 +73,7 @@ void AccessLog::emitLog(const Http::RequestHeaderMap& request_headers, attributes); *log_entry.mutable_attributes() = attributes.values(); - tls_slot_->getTyped().logger_->log(std::move(log_entry)); + tls_slot_->getTyped().logger_->log(std::move(log_entry), false); } } // namespace OpenTelemetry diff --git a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc index 0c08ce1194e11..8007cb1c28167 100644 --- a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc @@ -78,10 +78,12 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger( const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, const Grpc::RawAsyncClientSharedPtr& client, std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, - Event::Dispatcher& dispatcher) { - return std::make_shared(client, config, buffer_flush_interval_msec, - max_buffer_size_bytes, dispatcher, local_info_, - scope_); + Event::Dispatcher& dispatcher, Stats::Scope& scope) { + auto logger = + std::make_shared(client, config.log_name(), buffer_flush_interval_msec, + max_buffer_size_bytes, local_info_, scope); + logger->startIntervalFlushTimer(dispatcher, buffer_flush_interval_msec); + return logger; } } // namespace OpenTelemetry diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index 372e0abd1d38a..069f17ca40c39 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -114,6 +114,7 @@ class DeltaSotwIntegrationParamTest #define GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest())) + #define DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), \ diff --git a/test/extensions/access_loggers/common/grpc_access_logger_test.cc b/test/extensions/access_loggers/common/grpc_access_logger_test.cc index 0a8bd0d819946..31b8de3f16ded 100644 --- a/test/extensions/access_loggers/common/grpc_access_logger_test.cc +++ b/test/extensions/access_loggers/common/grpc_access_logger_test.cc @@ -89,7 +89,6 @@ class MockGrpcAccessLoggerImpl } bool isEmpty() override { return message_.fields().empty(); } - void initMessage() override { ++num_inits_; } void clearMessage() override { @@ -164,7 +163,7 @@ TEST_F(GrpcAccessLogTest, BasicFlow) { expectStreamStart(stream, &callbacks); // Log an HTTP entry. expectFlushedLogEntriesCount(stream, MOCK_HTTP_LOG_FIELD_NAME, 1); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); EXPECT_EQ(1, logger_->numInits()); // Messages should be cleared after each flush. EXPECT_EQ(1, logger_->numClears()); @@ -173,7 +172,7 @@ TEST_F(GrpcAccessLogTest, BasicFlow) { // Log a TCP entry. expectFlushedLogEntriesCount(stream, MOCK_TCP_LOG_FIELD_NAME, 1); - logger_->log(ProtobufWkt::Empty()); + logger_->log(ProtobufWkt::Empty(), false); EXPECT_EQ(2, logger_->numClears()); // TCP logging doesn't change the logs_written counter. EXPECT_EQ(1, @@ -188,7 +187,7 @@ TEST_F(GrpcAccessLogTest, BasicFlow) { expectStreamStart(stream, &callbacks); // Log an HTTP entry. expectFlushedLogEntriesCount(stream, MOCK_HTTP_LOG_FIELD_NAME, 1); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); // Message should be initialized again. EXPECT_EQ(2, logger_->numInits()); EXPECT_EQ(3, logger_->numClears()); @@ -210,7 +209,7 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { // Fail to flush, so the log stays buffered up. EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); EXPECT_CALL(stream, sendMessageRaw_(_, false)).Times(0); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); // No entry was logged so no clear expected. EXPECT_EQ(0, logger_->numClears()); EXPECT_EQ(1, @@ -221,7 +220,7 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { // Now canLogMore will fail, and the next log will be dropped. EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); EXPECT_CALL(stream, sendMessageRaw_(_, _)).Times(0); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); EXPECT_EQ(1, logger_->numInits()); // Still no entry was logged so no clear expected. EXPECT_EQ(0, logger_->numClears()); @@ -236,7 +235,7 @@ TEST_F(GrpcAccessLogTest, WatermarksOverrun) { EXPECT_CALL(stream, sendMessageRaw_(_, _)); EXPECT_CALL(stream, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); EXPECT_CALL(stream, sendMessageRaw_(_, _)); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); // Now both entries were logged separately so we expect 2 clears. EXPECT_EQ(2, logger_->numClears()); EXPECT_EQ(2, @@ -256,7 +255,7 @@ TEST_F(GrpcAccessLogTest, StreamFailure) { callbacks.onRemoteClose(Grpc::Status::Internal, "bad"); return nullptr; })); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); EXPECT_EQ(1, logger_->numInits()); } @@ -291,9 +290,9 @@ TEST_F(GrpcAccessLogTest, Batching) { expectStreamStart(stream, &callbacks); expectFlushedLogEntriesCount(stream, MOCK_HTTP_LOG_FIELD_NAME, 3); - logger_->log(mockHttpEntry()); - logger_->log(mockHttpEntry()); - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); + logger_->log(mockHttpEntry(), false); + logger_->log(mockHttpEntry(), false); EXPECT_EQ(1, logger_->numInits()); // The entries were batched and logged together so we expect a single clear. EXPECT_EQ(1, logger_->numClears()); @@ -303,7 +302,7 @@ TEST_F(GrpcAccessLogTest, Batching) { ProtobufWkt::Struct big_entry = mockHttpEntry(); const std::string big_key(max_buffer_size, 'a'); big_entry.mutable_fields()->insert({big_key, ProtobufWkt::Value()}); - logger_->log(std::move(big_entry)); + logger_->log(std::move(big_entry), false); EXPECT_EQ(2, logger_->numClears()); } @@ -316,7 +315,7 @@ TEST_F(GrpcAccessLogTest, Flushing) { timer_->invokeCallback(); // Not enough data yet to trigger flush on batch size. - logger_->log(mockHttpEntry()); + logger_->log(mockHttpEntry(), false); MockAccessLogStream stream; AccessLogCallbacks* callbacks; @@ -346,10 +345,12 @@ class MockGrpcAccessLoggerCache createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config, const Grpc::RawAsyncClientSharedPtr& client, std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes, - Event::Dispatcher& dispatcher) override { - return std::make_shared( - std::move(client), config, buffer_flush_interval_msec, max_buffer_size_bytes, dispatcher, - scope_, "mock_access_log_prefix.", mockMethodDescriptor()); + Event::Dispatcher& dispatcher, Stats::Scope& scope) override { + auto logger = std::make_shared( + std::move(client), config, buffer_flush_interval_msec, max_buffer_size_bytes, scope, + "mock_access_log_prefix.", mockMethodDescriptor()); + logger->startIntervalFlushTimer(dispatcher, buffer_flush_interval_msec); + return logger; } }; diff --git a/test/extensions/access_loggers/grpc/BUILD b/test/extensions/access_loggers/grpc/BUILD index 0484f14e7ddff..5e5a49711ff31 100644 --- a/test/extensions/access_loggers/grpc/BUILD +++ b/test/extensions/access_loggers/grpc/BUILD @@ -53,6 +53,7 @@ envoy_extension_cc_test( "//test/mocks/access_log:access_log_mocks", "//test/mocks/grpc:grpc_mocks", "//test/mocks/local_info:local_info_mocks", + "//test/mocks/server:factory_context_mocks", "//test/mocks/ssl:ssl_mocks", "//test/mocks/stream_info:stream_info_mocks", "//test/mocks/thread_local:thread_local_mocks", diff --git a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc index 44f2d4f6f4d10..75937fdaad365 100644 --- a/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/grpc/grpc_access_log_impl_test.cc @@ -59,6 +59,24 @@ class GrpcAccessLoggerImplTestHelper { })); } + void expectStreamCriticalMessage(const std::string& expected_message_yaml) { + envoy::service::accesslog::v3::CriticalAccessLogsMessage expected_message; + TestUtility::loadFromYaml(expected_message_yaml, expected_message); + EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + EXPECT_CALL(stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([expected_message](Buffer::InstancePtr& request, bool) { + envoy::service::accesslog::v3::CriticalAccessLogsMessage message; + Buffer::ZeroCopyInputStreamImpl request_stream(std::move(request)); + EXPECT_TRUE(message.ParseFromZeroCopyStream(&request_stream)); + message.set_id(0); + EXPECT_EQ(message.DebugString(), expected_message.DebugString()); + })); + } + + void expectStreamCriticalNoMessage() { + EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + } + private: MockAccessLogStream stream_; AccessLogCallbacks* callbacks_; @@ -72,8 +90,9 @@ class GrpcAccessLoggerImplTest : public testing::Test { EXPECT_CALL(*timer_, enableTimer(_, _)); *config_.mutable_log_name() = "test_log_name"; logger_ = std::make_unique(Grpc::RawAsyncClientPtr{async_client_}, - config_, FlushInterval, BUFFER_SIZE_BYTES, - dispatcher_, local_info_, stats_store_); + config_, BUFFER_SIZE_BYTES, dispatcher_, + local_info_, stats_store_); + logger_->startIntervalFlushTimer(dispatcher_, FlushInterval); } Grpc::MockAsyncClient* async_client_; @@ -102,7 +121,7 @@ TEST_F(GrpcAccessLoggerImplTest, LogHttp) { )EOF"); envoy::data::accesslog::v3::HTTPAccessLogEntry entry; entry.mutable_request()->set_path("/test/path1"); - logger_->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry)); + logger_->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry), false); } TEST_F(GrpcAccessLoggerImplTest, LogTcp) { @@ -121,7 +140,41 @@ TEST_F(GrpcAccessLoggerImplTest, LogTcp) { )EOF"); envoy::data::accesslog::v3::TCPAccessLogEntry tcp_entry; tcp_entry.mutable_common_properties()->set_sample_rate(1); - logger_->log(envoy::data::accesslog::v3::TCPAccessLogEntry(tcp_entry)); + logger_->log(envoy::data::accesslog::v3::TCPAccessLogEntry(tcp_entry), false); +} + +class CriticalGrpcAccessLoggerImplTest : public GrpcAccessLoggerImplTest { +public: + CriticalGrpcAccessLoggerImplTest() { + mock_buffer_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*mock_buffer_timer_, enabled()); + EXPECT_CALL(*mock_buffer_timer_, enableTimer(_, _)); + EXPECT_CALL(*mock_buffer_timer_, disableTimer()); + } + +private: + Event::MockTimer* mock_buffer_timer_; +}; + +TEST_F(CriticalGrpcAccessLoggerImplTest, CriticalLogHttp) { + grpc_access_logger_impl_test_helper_.expectStreamCriticalMessage(R"EOF( +message: + identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + log_name: test_log_name + http_logs: + log_entry: + request: + path: /test/path1 +id: 0 +)EOF"); + envoy::data::accesslog::v3::HTTPAccessLogEntry entry; + entry.mutable_request()->set_path("/test/path1"); + logger_->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry), true); } class GrpcAccessLoggerCacheImplTest : public testing::Test { @@ -175,7 +228,7 @@ TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreation) { )EOF"); envoy::data::accesslog::v3::HTTPAccessLogEntry entry; entry.mutable_request()->set_path("/test/path1"); - logger->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry)); + logger->log(envoy::data::accesslog::v3::HTTPAccessLogEntry(entry), false); } } // namespace diff --git a/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc b/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc index 481c249c779c2..10e28c1ba6c97 100644 --- a/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/grpc/http_grpc_access_log_impl_test.cc @@ -12,6 +12,7 @@ #include "test/mocks/access_log/mocks.h" #include "test/mocks/grpc/mocks.h" #include "test/mocks/local_info/mocks.h" +#include "test/mocks/server/factory_context.h" #include "test/mocks/ssl/mocks.h" #include "test/mocks/stream_info/mocks.h" #include "test/mocks/thread_local/mocks.h" @@ -19,6 +20,7 @@ using namespace std::chrono_literals; using testing::_; using testing::An; +using testing::Eq; using testing::InSequence; using testing::Invoke; using testing::NiceMock; @@ -36,8 +38,12 @@ using envoy::data::accesslog::v3::HTTPAccessLogEntry; class MockGrpcAccessLogger : public GrpcCommon::GrpcAccessLogger { public: // GrpcAccessLogger - MOCK_METHOD(void, log, (HTTPAccessLogEntry && entry)); - MOCK_METHOD(void, log, (envoy::data::accesslog::v3::TCPAccessLogEntry && entry)); + MOCK_METHOD(void, startIntervalFlushTimer, + (Event::Dispatcher & dispatcher, + const std::chrono::milliseconds buffer_flush_interval_msec)); + MOCK_METHOD(void, log, (HTTPAccessLogEntry && entry, bool is_critical)); + MOCK_METHOD(void, log, + (envoy::data::accesslog::v3::TCPAccessLogEntry && entry, bool is_critical)); }; class MockGrpcAccessLoggerCache : public GrpcCommon::GrpcAccessLoggerCache { @@ -56,6 +62,7 @@ TEST(HttpGrpcAccessLog, TlsLifetimeCheck) { tls.defer_data_ = true; { AccessLog::MockFilter* filter{new NiceMock()}; + NiceMock factory_context; envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config; config.mutable_common_config()->set_transport_api_version( envoy::config::core::v3::ApiVersion::V3); @@ -69,8 +76,8 @@ TEST(HttpGrpcAccessLog, TlsLifetimeCheck) { }); // Set tls callback in the HttpGrpcAccessLog constructor, // but it is not called yet since we have defer_data_ = true. - const auto access_log = std::make_unique(AccessLog::FilterPtr{filter}, - config, tls, logger_cache); + const auto access_log = std::make_unique( + AccessLog::FilterPtr{filter}, config, logger_cache, factory_context); // Intentionally make access_log die earlier in this scope to simulate the situation where the // creator has been deleted yet the tls callback is not called yet. } @@ -97,20 +104,20 @@ class HttpGrpcAccessLogTest : public testing::Test { EXPECT_EQ(Common::GrpcAccessLoggerType::HTTP, logger_type); return logger_; }); - access_log_ = std::make_unique(AccessLog::FilterPtr{filter_}, config_, tls_, - logger_cache_); + access_log_ = std::make_unique(AccessLog::FilterPtr{filter_}, config_, + logger_cache_, factory_context_); } - void expectLog(const std::string& expected_log_entry_yaml) { + void expectLog(const std::string& expected_log_entry_yaml, bool expect_critical = false) { if (access_log_ == nullptr) { init(); } HTTPAccessLogEntry expected_log_entry; TestUtility::loadFromYaml(expected_log_entry_yaml, expected_log_entry); - EXPECT_CALL(*logger_, log(An())) - .WillOnce( - Invoke([expected_log_entry](envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry) { + EXPECT_CALL(*logger_, log(An(), Eq(expect_critical))) + .WillOnce(Invoke( + [expected_log_entry](envoy::data::accesslog::v3::HTTPAccessLogEntry&& entry, bool) { EXPECT_EQ(entry.DebugString(), expected_log_entry.DebugString()); })); } @@ -156,6 +163,7 @@ response: {{}} std::shared_ptr logger_{new MockGrpcAccessLogger()}; std::shared_ptr logger_cache_{new MockGrpcAccessLoggerCache()}; HttpGrpcAccessLogPtr access_log_; + NiceMock factory_context_; }; class TestSerializedFilterState : public StreamInfo::FilterState::Object { @@ -763,6 +771,65 @@ TEST_F(HttpGrpcAccessLogTest, LogWithRequestMethod) { expectLogRequestMethod("PATCH"); } +TEST_F(HttpGrpcAccessLogTest, BufferLogFilterTest) { + const std::string filter_yaml = R"EOF( +status_code_filter: + comparison: + op: EQ + value: + default_value: 200 + runtime_key: access_log.access_error.status + )EOF"; + + envoy::config::accesslog::v3::AccessLogFilter config; + TestUtility::loadFromYaml(filter_yaml, config); + *config_.mutable_common_config()->mutable_critical_buffer_log_filter() = config; + + init(); + + { + NiceMock stream_info; + stream_info.host_ = nullptr; + stream_info.start_time_ = SystemTime(1h); + stream_info.response_code_ = 200; + + Http::TestRequestHeaderMapImpl request_headers{ + {":scheme", "scheme_value"}, + {":authority", "authority_value"}, + {":path", "path_value"}, + {":method", "POST"}, + }; + + expectLog(R"EOF( +common_properties: + downstream_remote_address: + socket_address: + address: "127.0.0.1" + port_value: 0 + downstream_direct_remote_address: + socket_address: + address: "127.0.0.1" + port_value: 0 + downstream_local_address: + socket_address: + address: "127.0.0.2" + port_value: 0 + start_time: + seconds: 3600 +request: + scheme: "scheme_value" + authority: "authority_value" + path: "path_value" + request_method: "POST" + request_headers_bytes: 70 +response: + response_code: 200 +)EOF", + true); + access_log_->log(&request_headers, nullptr, nullptr, stream_info); + } +} + } // namespace } // namespace HttpGrpc } // namespace AccessLoggers diff --git a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc index 75b064169115b..81a8744f99488 100644 --- a/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc +++ b/test/extensions/access_loggers/grpc/http_grpc_access_log_integration_test.cc @@ -110,6 +110,244 @@ class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, FakeStreamPtr access_log_request_; }; +class CriticalAccessLogIntegrationTest : public AccessLogIntegrationTest { +public: + void initialize() override { + config_helper_.addConfigModifier([](envoy::config::bootstrap::v3::Bootstrap& bootstrap) { + auto* accesslog_cluster = bootstrap.mutable_static_resources()->add_clusters(); + accesslog_cluster->MergeFrom(bootstrap.static_resources().clusters()[0]); + accesslog_cluster->set_name("accesslog"); + ConfigHelper::setHttp2(*accesslog_cluster); + }); + + config_helper_.addConfigModifier( + [this]( + envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) { + auto* access_log = hcm.add_access_log(); + access_log->set_name("grpc_accesslog"); + + envoy::extensions::access_loggers::grpc::v3::HttpGrpcAccessLogConfig config; + auto* common_config = config.mutable_common_config(); + common_config->set_log_name("foo"); + common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); + setGrpcService(*common_config->mutable_grpc_service(), "accesslog", + fake_upstreams_.back()->localAddress()); + + const std::string filter_yaml = R"EOF( + status_code_filter: + comparison: + op: GE + value: + default_value: 400 + runtime_key: access_log.access_error.status + )EOF"; + + envoy::config::accesslog::v3::AccessLogFilter filter_config; + TestUtility::loadFromYaml(filter_yaml, filter_config); + *common_config->mutable_critical_buffer_log_filter() = filter_config; + + access_log->mutable_typed_config()->PackFrom(config); + }); + + HttpIntegrationTest::initialize(); + } + + ABSL_MUST_USE_RESULT + AssertionResult waitForCriticalAccessLogRequest(const std::string& expected_request_msg_yaml) { + envoy::service::accesslog::v3::CriticalAccessLogsMessage request_msg; + VERIFY_ASSERTION(access_log_request_->waitForGrpcMessage(*dispatcher_, request_msg)); + EXPECT_EQ("POST", access_log_request_->headers().getMethodValue()); + EXPECT_EQ("/envoy.service.accesslog.v3.AccessLogService/CriticalAccessLogs", + access_log_request_->headers().getPathValue()); + EXPECT_EQ("application/grpc", access_log_request_->headers().getContentTypeValue()); + + envoy::service::accesslog::v3::CriticalAccessLogsMessage expected_request_msg; + TestUtility::loadFromYaml(expected_request_msg_yaml, expected_request_msg); + + // Clear fields which are not deterministic. + auto* log_entry = request_msg.mutable_message()->mutable_http_logs()->mutable_log_entry(0); + log_entry->mutable_common_properties()->clear_downstream_remote_address(); + log_entry->mutable_common_properties()->clear_downstream_direct_remote_address(); + log_entry->mutable_common_properties()->clear_downstream_local_address(); + log_entry->mutable_common_properties()->clear_start_time(); + log_entry->mutable_common_properties()->clear_time_to_last_rx_byte(); + log_entry->mutable_common_properties()->clear_time_to_first_downstream_tx_byte(); + log_entry->mutable_common_properties()->clear_time_to_last_downstream_tx_byte(); + log_entry->mutable_request()->clear_request_id(); + if (request_msg.message().has_identifier()) { + auto* node = request_msg.mutable_message()->mutable_identifier()->mutable_node(); + node->clear_extensions(); + node->clear_user_agent_build_version(); + } + EXPECT_GE(request_msg.id(), 0); + pending_message_id_ = request_msg.id(); + request_msg.clear_id(); + EXPECT_THAT(request_msg, ProtoEq(expected_request_msg)); + return AssertionSuccess(); + } + + uint32_t pending_message_id_; +}; + +INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, CriticalAccessLogIntegrationTest, + GRPC_CLIENT_INTEGRATION_PARAMS, + Grpc::GrpcClientIntegrationParamTest::protocolTestParamsToString); + +TEST_P(CriticalAccessLogIntegrationTest, BasicAckFlow) { + testRouterNotFound(); + ASSERT_TRUE(waitForAccessLogConnection()); + ASSERT_TRUE(waitForAccessLogStream()); + + ASSERT_TRUE(waitForCriticalAccessLogRequest(fmt::format(R"EOF( +message: + identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + user_agent_name: "envoy" + log_name: foo + http_logs: + log_entry: + common_properties: + response_flags: + no_route_found: true + protocol_version: HTTP11 + request: + scheme: http + authority: host + path: /notfound + request_headers_bytes: 118 + request_method: GET + response: + response_code: + value: 404 + response_code_details: "route_not_found" + response_headers_bytes: 54 +)EOF"))); + + access_log_request_->startGrpcStream(); + envoy::service::accesslog::v3::CriticalAccessLogsResponse response_msg; + response_msg.set_id(pending_message_id_); + pending_message_id_ = 0; + response_msg.set_status(envoy::service::accesslog::v3::CriticalAccessLogsResponse::ACK); + access_log_request_->sendGrpcMessage(response_msg); + access_log_request_->finishGrpcStream(Grpc::Status::Ok); + switch (clientType()) { + case Grpc::ClientType::EnvoyGrpc: + test_server_->waitForGaugeEq("cluster.accesslog.upstream_rq_active", 0); + break; + case Grpc::ClientType::GoogleGrpc: + test_server_->waitForCounterGe("grpc.accesslog.streams_closed_0", 1); + break; + default: + NOT_REACHED_GCOVR_EXCL_LINE; + } + + test_server_->waitForCounterEq("access_logs.grpc_access_log.critical_logs_ack_received", 1); + test_server_->waitForGaugeEq("access_logs.grpc_access_log.pending_critical_logs", 0); + cleanup(); +} + +TEST_P(CriticalAccessLogIntegrationTest, BasicNackFlow) { + testRouterNotFound(); + ASSERT_TRUE(waitForAccessLogConnection()); + ASSERT_TRUE(waitForAccessLogStream()); + + ASSERT_TRUE(waitForCriticalAccessLogRequest(fmt::format(R"EOF( +message: + identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + user_agent_name: "envoy" + log_name: foo + http_logs: + log_entry: + common_properties: + response_flags: + no_route_found: true + protocol_version: HTTP11 + request: + scheme: http + authority: host + path: /notfound + request_headers_bytes: 118 + request_method: GET + response: + response_code: + value: 404 + response_code_details: "route_not_found" + response_headers_bytes: 54 +)EOF"))); + + access_log_request_->startGrpcStream(); + envoy::service::accesslog::v3::CriticalAccessLogsResponse response_msg; + response_msg.set_id(pending_message_id_); + pending_message_id_ = 0; + response_msg.set_status(envoy::service::accesslog::v3::CriticalAccessLogsResponse::NACK); + access_log_request_->sendGrpcMessage(response_msg); + access_log_request_->finishGrpcStream(Grpc::Status::Ok); + switch (clientType()) { + case Grpc::ClientType::EnvoyGrpc: + test_server_->waitForGaugeEq("cluster.accesslog.upstream_rq_active", 0); + break; + case Grpc::ClientType::GoogleGrpc: + test_server_->waitForCounterGe("grpc.accesslog.streams_closed_0", 1); + break; + default: + NOT_REACHED_GCOVR_EXCL_LINE; + } + + test_server_->waitForCounterEq("access_logs.grpc_access_log.critical_logs_nack_received", 1); + test_server_->waitForGaugeEq("access_logs.grpc_access_log.pending_critical_logs", 1); + cleanup(); +} + +TEST_P(CriticalAccessLogIntegrationTest, NoResponseFlow) { + testRouterNotFound(); + ASSERT_TRUE(waitForAccessLogConnection()); + ASSERT_TRUE(waitForAccessLogStream()); + + ASSERT_TRUE(waitForCriticalAccessLogRequest(fmt::format(R"EOF( +message: + identifier: + node: + id: node_name + cluster: cluster_name + locality: + zone: zone_name + user_agent_name: "envoy" + log_name: foo + http_logs: + log_entry: + common_properties: + response_flags: + no_route_found: true + protocol_version: HTTP11 + request: + scheme: http + authority: host + path: /notfound + request_headers_bytes: 118 + request_method: GET + response: + response_code: + value: 404 + response_code_details: "route_not_found" + response_headers_bytes: 54 +)EOF"))); + + test_server_->notifyingStatsAllocator().waitForCounterFromStringEq( + "access_logs.grpc_access_log.critical_logs_message_timeout", 1); + test_server_->waitForGaugeEq("access_logs.grpc_access_log.pending_critical_logs", 1); + cleanup(); +} + INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS, Grpc::GrpcClientIntegrationParamTest::protocolTestParamsToString); diff --git a/test/extensions/access_loggers/open_telemetry/access_log_impl_test.cc b/test/extensions/access_loggers/open_telemetry/access_log_impl_test.cc index 8ddbe5b1d91b4..c271e16fa6dc1 100644 --- a/test/extensions/access_loggers/open_telemetry/access_log_impl_test.cc +++ b/test/extensions/access_loggers/open_telemetry/access_log_impl_test.cc @@ -29,6 +29,7 @@ using ::Envoy::AccessLog::MockFilter; using opentelemetry::proto::logs::v1::LogRecord; using testing::_; using testing::An; +using testing::Eq; using testing::InSequence; using testing::Invoke; using testing::NiceMock; @@ -43,8 +44,11 @@ namespace { class MockGrpcAccessLogger : public GrpcAccessLogger { public: // GrpcAccessLogger - MOCK_METHOD(void, log, (LogRecord && entry)); - MOCK_METHOD(void, log, (ProtobufWkt::Empty && entry)); + MOCK_METHOD(void, startIntervalFlushTimer, + (Event::Dispatcher & dispatcher, + std::chrono::milliseconds buffer_flush_interval_msec)); + MOCK_METHOD(void, log, (LogRecord && entry, bool)); + MOCK_METHOD(void, log, (ProtobufWkt::Empty && entry, bool)); }; class MockGrpcAccessLoggerCache : public GrpcAccessLoggerCache { @@ -101,8 +105,8 @@ string_value: "x-request-header: %REQ(x-request-header)%, protocol: %PROTOCOL%" LogRecord expected_log_entry; TestUtility::loadFromYaml(expected_log_entry_yaml, expected_log_entry); - EXPECT_CALL(*logger_, log(An())) - .WillOnce(Invoke([expected_log_entry](LogRecord&& entry) { + EXPECT_CALL(*logger_, log(An(), Eq(false))) + .WillOnce(Invoke([expected_log_entry](LogRecord&& entry, bool) { EXPECT_EQ(entry.DebugString(), expected_log_entry.DebugString()); })); } diff --git a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc index 6fa166cdf5c3f..dc0f503e73d87 100644 --- a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc @@ -83,6 +83,7 @@ class GrpcAccessLoggerImplTest : public testing::Test { logger_ = std::make_unique(Grpc::RawAsyncClientPtr{async_client_}, config_, FlushInterval, BUFFER_SIZE_BYTES, dispatcher_, local_info_, stats_store_); + logger_->startIntervalFlushTimer(dispatcher_, FlushInterval); } Grpc::MockAsyncClient* async_client_; @@ -118,7 +119,7 @@ TEST_F(GrpcAccessLoggerImplTest, LogHttp) { )EOF"); opentelemetry::proto::logs::v1::LogRecord entry; entry.set_severity_text("test-severity-text"); - logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry)); + logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry), false); } TEST_F(GrpcAccessLoggerImplTest, LogTcp) { @@ -144,7 +145,7 @@ TEST_F(GrpcAccessLoggerImplTest, LogTcp) { )EOF"); opentelemetry::proto::logs::v1::LogRecord entry; entry.set_severity_text("test-severity-text"); - logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry)); + logger_->log(opentelemetry::proto::logs::v1::LogRecord(entry), false); } class GrpcAccessLoggerCacheImplTest : public testing::Test { @@ -204,7 +205,7 @@ TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreation) { )EOF"); opentelemetry::proto::logs::v1::LogRecord entry; entry.set_severity_text("test-severity-text"); - logger->log(opentelemetry::proto::logs::v1::LogRecord(entry)); + logger->log(opentelemetry::proto::logs::v1::LogRecord(entry), false); } } // namespace diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index b0da4a180dc89..6ec29b8f879d0 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -999,6 +999,7 @@ ratelimited ratelimiter rawseti rc +reachability readded readonly readv