Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4df069e
access logging: introduce critical ALS endpoint
Shikugawa Jul 26, 2021
daa2f41
wip
Shikugawa Jul 28, 2021
a63ec67
wip
Shikugawa Jul 29, 2021
37a1e18
timer
Shikugawa Jul 29, 2021
8f51ae2
wip
Shikugawa Aug 2, 2021
5ed729f
test
Shikugawa Aug 2, 2021
6fbaa22
tinyfix
Shikugawa Aug 2, 2021
4110cc7
fix api
Shikugawa Aug 4, 2021
c1d448a
add buffer limit
Shikugawa Aug 4, 2021
69773e4
fix
Shikugawa Aug 4, 2021
35dd187
refactor
Shikugawa Aug 4, 2021
e506fd6
fix build
Shikugawa Aug 5, 2021
d94bcee
fix
Shikugawa Aug 6, 2021
75eb0bd
add docs
Shikugawa Aug 16, 2021
e34e5ff
Merge branch 'main' into als-fatal
Shikugawa Aug 16, 2021
d432070
Merge branch 'main' of https://github.com/envoyproxy/envoy into als-f…
Shikugawa Aug 23, 2021
824aa2a
docs
Shikugawa Aug 23, 2021
9022520
fix
Shikugawa Aug 25, 2021
7e95cb6
fix
Shikugawa Aug 25, 2021
70c589a
not to generate v4alpha
Shikugawa Aug 27, 2021
444dd61
tmp
Shikugawa Sep 2, 2021
6c4cb60
api review
Shikugawa Sep 2, 2021
ee43245
fix
Shikugawa Sep 2, 2021
9d7e6d9
fix
Shikugawa Sep 2, 2021
1a72c4e
tmp
Shikugawa Sep 4, 2021
f6ce0f2
tmp
Shikugawa Sep 6, 2021
263c015
tmp
Shikugawa Sep 6, 2021
1b9991d
test: fix corner-case for waiting stats change utility on integration…
Shikugawa Sep 6, 2021
8bf0872
fix
Shikugawa Sep 6, 2021
76ee0ca
fix
Shikugawa Sep 6, 2021
9b88364
fix build
Shikugawa Sep 6, 2021
8ada51f
tidy
Shikugawa Sep 7, 2021
a400410
grpc: implement BufferedAsyncClient for bidi gRPC stream
Shikugawa Sep 15, 2021
a5a27a3
tmp
Shikugawa Sep 15, 2021
7cfdf0f
fix
Shikugawa Sep 15, 2021
0793deb
fix
Shikugawa Sep 15, 2021
599b05a
fix
Shikugawa Sep 15, 2021
6c43c6c
fix
Shikugawa Sep 15, 2021
44dd7ef
fix
Shikugawa Sep 15, 2021
b7c409e
fix
Shikugawa Sep 15, 2021
f6b9451
fix
Shikugawa Sep 16, 2021
6f673a1
fix
Shikugawa Sep 16, 2021
14e8ad7
fix
Shikugawa Sep 22, 2021
8591511
fix
Shikugawa Sep 22, 2021
1cf0c86
fix conflict
Shikugawa Nov 12, 2021
65e2d41
fix
Shikugawa Nov 12, 2021
3feffd8
fix
Shikugawa Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/envoy/extensions/access_loggers/grpc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"//envoy/config/accesslog/v3:pkg",
"//envoy/config/core/v3:pkg",
"@com_github_cncf_udpa//udpa/annotations:pkg",
],
Expand Down
19 changes: 18 additions & 1 deletion api/envoy/extensions/access_loggers/grpc/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.extensions.access_loggers.grpc.v3;

import "envoy/config/accesslog/v3/accesslog.proto";
import "envoy/config/core/v3/base.proto";
import "envoy/config/core/v3/config_source.proto";
import "envoy/config/core/v3/grpc_service.proto";
Expand Down Expand Up @@ -55,7 +56,7 @@ message TcpGrpcAccessLogConfig {
}

// Common configuration for gRPC access logs.
// [#next-free-field: 8]
// [#next-free-field: 11]
message CommonGrpcAccessLogConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.accesslog.v2.CommonGrpcAccessLogConfig";
Expand Down Expand Up @@ -96,4 +97,20 @@ message CommonGrpcAccessLogConfig {
// will be used in this configuration. This feature is used only when you are using
// :ref:`Envoy gRPC client <envoy_v3_api_field_config.core.v3.GrpcService.envoy_grpc>`.
config.core.v3.RetryPolicy grpc_stream_retry_policy = 7;

// Define the log condition for critical access logs.
// Logs that match the filter are not sent to `StreamAccessLogs`,
// but are sent to `CriticalAccessLogs`.
config.accesslog.v3.AccessLogFilter critical_buffer_log_filter = 8;

// The time to wait for an ACK message. If no ACK message is returned after this time,
// the message is considered undeliverable and the failed transmission is buffered again.
Comment thread
htuch marked this conversation as resolved.
// The re-buffered message will be sent again at the next time a log matching *critical_buffer_log_filter* is queued.
google.protobuf.Duration message_ack_timeout = 9
[(validate.rules).duration = {gte {nanos: 1000000}}];

// Size limit (in bytes) of the buffer used to store messages during processing in a
// critical logger. A critical logger buffers messages until it receives an ACK from upstream.
// The default is 16384.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the buffer size is exceeded? Can you clarify in the comment, please?

google.protobuf.UInt32Value max_pending_buffer_size_bytes = 10;
}
64 changes: 61 additions & 3 deletions api/envoy/service/accesslog/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,40 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
service AccessLogService {
// Envoy will connect and send StreamAccessLogsMessage messages forever. It does not expect any
// response to be sent as nothing would be done in the case of failure. The server should
// disconnect if it expects Envoy to reconnect. In the future we may decide to add a different
// API for "critical" access logs in which Envoy will buffer access logs for some period of time
// until it gets an ACK so it could then retry. This API is designed for high throughput with the
// disconnect if it expects Envoy to reconnect. This API is designed for high throughput with the
// expectation that it might be lossy.
rpc StreamAccessLogs(stream StreamAccessLogsMessage) returns (StreamAccessLogsResponse) {
}

// This endpoint provides acknowledgment of logs marked as requiring acknowledgment.
// The requirement for an acknowledgment can be set in
// :ref:`critical_buffer_log_filter <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.critical_buffer_log_filter>`.
// Log messages that match this filter will be guaranteed delivery. In order to guarantee
// the arrival, this endpoint performs the following process.
//
// 1. A response message is returned for each log. The response message includes ACK/NACK status,
// and in case of NACK, the target log is not flushed but buffered by Envoy.
// 2. Timeout for response message is set and if no message is returned within a certain time,
// it will be considered as unreachable and buffered by Envoy without flushing the target log. This timeout is set by
// :ref:`message_ack_timeout <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.message_ack_timeout>`.
//
// On the ALS receiver side, ACK is expected to be returned to indicate that the log was saved properly,
// and NACK is expected to be returned when the log could not be saved due to some error.
//
// .. attention::
//
// Buffers for guaranteed reachability can be extremely memory-intensive. Therefore, the following points
// should be considered when using this endpoint.
//
// 1. :ref:`critical_buffer_log_filter <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.critical_buffer_log_filter>`
// should be set strictly. A loose filter may encourage rapid buffer overwhelm and leading to OOM.
// 2. :ref:`max_pending_buffer_size_bytes <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.max_pending_buffer_size_bytes>`
// should be set appropriately to prevent OOM.
// 3. Make sure that ALS receiver is implemented properly. If it is not implemented, all messages will
// be buffered, which may cause OOM soon.
Comment on lines +53 to +54

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 2 was set it shouldn't oom right?

@Shikugawa Shikugawa Sep 2, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can avoid OOM by setting pending_critical_buffer_size_bytes correctly.

rpc CriticalAccessLogs(stream CriticalAccessLogsMessage)
returns (stream CriticalAccessLogsResponse) {
}
}

// Empty response for the StreamAccessLogs API. Will never be sent. See below.
Expand All @@ -35,6 +63,23 @@ message StreamAccessLogsResponse {
"envoy.service.accesslog.v2.StreamAccessLogsResponse";
}

// Response received to identify undelivered or delivered messages in CriticalAccessLogs.
message CriticalAccessLogsResponse {
enum Status {
// Indicates that the message has been received.
ACK = 0;

// Indicates that the message has not been received.
NACK = 1;
}

// This field is used to indicate the arrival status.
Status status = 1;

// Message ID that identifies a message.
uint64 id = 2;
}

// Stream message for the StreamAccessLogs API. Envoy will open a stream to the server and stream
// access logs without ever expecting a response.
message StreamAccessLogsMessage {
Expand Down Expand Up @@ -85,3 +130,16 @@ message StreamAccessLogsMessage {
TCPAccessLogEntries tcp_logs = 3;
}
}

// Stream message for the CriticalAccessLogs API.
// Envoy opens a stream to the server and streams the access log,
// expecting a response. Each message sent is assigned an individual ID,
// and the state of the message is tracked based on the ID.
message CriticalAccessLogsMessage {
// The body of the log message sent to CriticalAccessLogs.
StreamAccessLogsMessage message = 1;

// This is an ID to identify the message, and should be added to the Critical Endpoint
// response message to uniquely identify the message being ACK/NACKed.
uint64 id = 4;
}
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Removed Config or Runtime

New Features
------------
* access log: added :ref:`critical log message <envoy_v3_api_field_service.accesslog.v3.CriticalAccessLogsMessage.message>` to AccessLogService to guarantee log arrival.
* access log: added :ref:`grpc_stream_retry_policy <envoy_v3_api_field_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.grpc_stream_retry_policy>` to the gRPC logger to reconnect when a connection fails to be established.
* api: added support for *xds.type.v3.TypedStruct* in addition to the now-deprecated *udpa.type.v1.TypedStruct* proto message, which is a wrapper proto used to encode typed JSON data in a *google.protobuf.Any* field.
* bootstrap: added :ref:`typed_dns_resolver_config <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.typed_dns_resolver_config>` in the bootstrap to support DNS resolver as an extension.
Expand Down
1 change: 1 addition & 0 deletions source/extensions/access_loggers/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ envoy_cc_library(
"//source/common/protobuf:utility_lib",
"@com_google_absl//absl/types:optional",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/accesslog/v3:pkg_cc_proto",
],
)
65 changes: 39 additions & 26 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/service/accesslog/v3/als.pb.h"
#include "envoy/singleton/instance.h"
#include "envoy/stats/scope.h"
#include "envoy/thread_local/thread_local.h"

#include "source/common/access_log/access_log_impl.h"
#include "source/common/common/assert.h"
#include "source/common/grpc/typed_async_client.h"
#include "source/common/http/utility.h"
Expand Down Expand Up @@ -42,17 +44,28 @@ template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {

virtual ~GrpcAccessLogger() = default;

/**
* Start interval log flusher.
* @param dispatcher Event dispatcher to create timer to flush log message.
* @param buffer_flush_interval_msec Buffer flush interval msec.
*/
virtual void
startIntervalFlushTimer(Event::Dispatcher& dispatcher,
const std::chrono::milliseconds buffer_flush_interval_msec) PURE;

/**
* Log http access entry.
* @param entry supplies the access log to send.
* @param is_critical determine whether log entry is critical or not.
*/
virtual void log(HttpLogProto&& entry) PURE;
virtual void log(HttpLogProto&& entry, bool is_critical) PURE;

/**
* Log tcp access entry.
* @param entry supplies the access log to send.
* @param is_critical determine whether log entry is critical or not.
*/
virtual void log(TcpLogProto&& entry) PURE;
virtual void log(TcpLogProto&& entry, bool is_critical) PURE;
};

/**
Expand Down Expand Up @@ -172,24 +185,25 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
public:
using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;

GrpcAccessLogger(const Grpc::RawAsyncClientSharedPtr& client,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t max_buffer_size_bytes, Event::Dispatcher& dispatcher,
GrpcAccessLogger(const Grpc::RawAsyncClientSharedPtr& client, uint64_t max_buffer_size_bytes,
Stats::Scope& scope, std::string access_log_prefix,
const Protobuf::MethodDescriptor& service_method,
const envoy::config::core::v3::RetryPolicy& retry_policy)
: client_(client, service_method, retry_policy),
buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
})),
max_buffer_size_bytes_(max_buffer_size_bytes),
stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {
flush_timer_->enableTimer(buffer_flush_interval_msec_);
stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {}

void
startIntervalFlushTimer(Event::Dispatcher& dispatcher,
const std::chrono::milliseconds buffer_flush_interval_msec) override {
flush_timer_ = dispatcher.createTimer([this, buffer_flush_interval_msec]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec);
});
flush_timer_->enableTimer(buffer_flush_interval_msec);
}

void log(HttpLogProto&& entry) override {
void log(HttpLogProto&& entry, bool) override {
if (!canLogMore()) {
return;
}
Expand All @@ -200,7 +214,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}
}

void log(TcpLogProto&& entry) override {
void log(TcpLogProto&& entry, bool) override {
approximate_message_size_bytes_ += entry.ByteSizeLong();
addEntry(std::move(entry));
if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
Expand All @@ -209,16 +223,6 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}

protected:
Detail::GrpcAccessLogClient<LogRequest, LogResponse> client_;
LogRequest message_;

private:
virtual bool isEmpty() PURE;
virtual void initMessage() PURE;
virtual void addEntry(HttpLogProto&& entry) PURE;
virtual void addEntry(TcpLogProto&& entry) PURE;
virtual void clearMessage() { message_.Clear(); }

void flush() {
if (isEmpty()) {
// Nothing to flush.
Expand All @@ -236,6 +240,17 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}
}

Detail::GrpcAccessLogClient<LogRequest, LogResponse> client_;
LogRequest message_;
Event::TimerPtr flush_timer_;

private:
virtual bool isEmpty() PURE;
virtual void initMessage() PURE;
virtual void addEntry(HttpLogProto&& entry) PURE;
virtual void addEntry(TcpLogProto&& entry) PURE;
virtual void clearMessage() { message_.Clear(); }

bool canLogMore() {
if (max_buffer_size_bytes_ == 0 || approximate_message_size_bytes_ < max_buffer_size_bytes_) {
stats_.logs_written_.inc();
Expand All @@ -250,8 +265,6 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
return false;
}

const std::chrono::milliseconds buffer_flush_interval_msec_;
const Event::TimerPtr flush_timer_;
const uint64_t max_buffer_size_bytes_;
uint64_t approximate_message_size_bytes_ = 0;
GrpcAccessLoggerStats stats_;
Expand Down
1 change: 1 addition & 0 deletions source/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ envoy_cc_library(
"//envoy/local_info:local_info_interface",
"//envoy/thread_local:thread_local_interface",
"//source/common/config:utility_lib",
"//source/common/grpc:buffered_async_client_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/extensions/access_loggers/common:grpc_access_logger",
"@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto",
Expand Down
Loading