Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
4df069e
access logging: introduce critical ALS endpoint
Shikugawa Jul 26, 2021
daa2f41
wip
Shikugawa Jul 28, 2021
a63ec67
wip
Shikugawa Jul 29, 2021
37a1e18
timer
Shikugawa Jul 29, 2021
8f51ae2
wip
Shikugawa Aug 2, 2021
5ed729f
test
Shikugawa Aug 2, 2021
6fbaa22
tinyfix
Shikugawa Aug 2, 2021
4110cc7
fix api
Shikugawa Aug 4, 2021
c1d448a
add buffer limit
Shikugawa Aug 4, 2021
69773e4
fix
Shikugawa Aug 4, 2021
35dd187
refactor
Shikugawa Aug 4, 2021
e506fd6
fix build
Shikugawa Aug 5, 2021
d94bcee
fix
Shikugawa Aug 6, 2021
75eb0bd
add docs
Shikugawa Aug 16, 2021
e34e5ff
Merge branch 'main' into als-fatal
Shikugawa Aug 16, 2021
d432070
Merge branch 'main' of https://github.com/envoyproxy/envoy into als-f…
Shikugawa Aug 23, 2021
824aa2a
docs
Shikugawa Aug 23, 2021
9022520
fix
Shikugawa Aug 25, 2021
7e95cb6
fix
Shikugawa Aug 25, 2021
70c589a
not to generate v4alpha
Shikugawa Aug 27, 2021
444dd61
tmp
Shikugawa Sep 2, 2021
6c4cb60
api review
Shikugawa Sep 2, 2021
ee43245
fix
Shikugawa Sep 2, 2021
9d7e6d9
fix
Shikugawa Sep 2, 2021
1a72c4e
tmp
Shikugawa Sep 4, 2021
f6ce0f2
tmp
Shikugawa Sep 6, 2021
263c015
tmp
Shikugawa Sep 6, 2021
1b9991d
test: fix corner-case for waiting stats change utility on integration…
Shikugawa Sep 6, 2021
8bf0872
fix
Shikugawa Sep 6, 2021
76ee0ca
fix
Shikugawa Sep 6, 2021
9b88364
fix build
Shikugawa Sep 6, 2021
8ada51f
tidy
Shikugawa Sep 7, 2021
a400410
grpc: implement BufferedAsyncClient for bidi gRPC stream
Shikugawa Sep 15, 2021
a5a27a3
tmp
Shikugawa Sep 15, 2021
7cfdf0f
fix
Shikugawa Sep 15, 2021
0793deb
fix
Shikugawa Sep 15, 2021
599b05a
fix
Shikugawa Sep 15, 2021
6c43c6c
fix
Shikugawa Sep 15, 2021
44dd7ef
fix
Shikugawa Sep 15, 2021
b7c409e
fix
Shikugawa Sep 15, 2021
f6b9451
fix
Shikugawa Sep 16, 2021
6f673a1
fix
Shikugawa Sep 16, 2021
14e8ad7
fix
Shikugawa Sep 22, 2021
8591511
fix
Shikugawa Sep 22, 2021
1cf0c86
fix conflict
Shikugawa Nov 12, 2021
65e2d41
fix
Shikugawa Nov 12, 2021
3feffd8
fix
Shikugawa Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/envoy/extensions/access_loggers/grpc/v3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ licenses(["notice"]) # Apache 2

api_proto_package(
deps = [
"//envoy/config/accesslog/v3:pkg",
"//envoy/config/core/v3:pkg",
"@com_github_cncf_udpa//udpa/annotations:pkg",
],
Expand Down
19 changes: 18 additions & 1 deletion api/envoy/extensions/access_loggers/grpc/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package envoy.extensions.access_loggers.grpc.v3;

import "envoy/config/accesslog/v3/accesslog.proto";
import "envoy/config/core/v3/config_source.proto";
import "envoy/config/core/v3/grpc_service.proto";

Expand Down Expand Up @@ -54,7 +55,7 @@ message TcpGrpcAccessLogConfig {
}

// Common configuration for gRPC access logs.
// [#next-free-field: 7]
// [#next-free-field: 10]
message CommonGrpcAccessLogConfig {
option (udpa.annotations.versioning).previous_message_type =
"envoy.config.accesslog.v2.CommonGrpcAccessLogConfig";
Expand Down Expand Up @@ -86,4 +87,20 @@ message CommonGrpcAccessLogConfig {
// <envoy_v3_api_field_data.accesslog.v3.AccessLogCommon.filter_state_objects>`.
// Logger will call `FilterState::Object::serializeAsProto` to serialize the filter state object.
repeated string filter_state_objects_to_log = 5;

// Define the log condition for critical access logs.
// Logs that match the filter are not forwarded to the normal endpoint,
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
// but are sent to a special endpoint.
Comment thread
Shikugawa marked this conversation as resolved.
Outdated
config.accesslog.v3.AccessLogFilter critical_buffer_log_filter = 7;

// The time to wait for an ACK message. If no ACK message is returned after this time,
// the message is considered undeliverable and the failed transmission is buffered again.
Comment thread
htuch marked this conversation as resolved.
// The re-buffered message will be sent again at the next time a log matching *critical_buffer_log_filter* is queued.
google.protobuf.Duration message_ack_timeout = 8
[(validate.rules).duration = {gte {nanos: 1000000}}];

// Size limit (in bytes) of the buffer used to store messages during processing in a
// critical logger. A critical logger buffers messages until it receives an ACK from upstream.
// The default is 16384.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the buffer size is exceeded? Can you clarify in the comment, please?

google.protobuf.UInt32Value max_pending_buffer_size_bytes = 9;
}
64 changes: 61 additions & 3 deletions api/envoy/service/accesslog/v3/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,40 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
service AccessLogService {
// Envoy will connect and send StreamAccessLogsMessage messages forever. It does not expect any
// response to be sent as nothing would be done in the case of failure. The server should
// disconnect if it expects Envoy to reconnect. In the future we may decide to add a different
// API for "critical" access logs in which Envoy will buffer access logs for some period of time
// until it gets an ACK so it could then retry. This API is designed for high throughput with the
// disconnect if it expects Envoy to reconnect. This API is designed for high throughput with the
// expectation that it might be lossy.
rpc StreamAccessLogs(stream StreamAccessLogsMessage) returns (StreamAccessLogsResponse) {
}

// This endpoint provides acknowledgment of logs marked as requiring acknowledgment.
// The requirement for an acknowledgment can be set in
// :ref:`critical_buffer_log_filter <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.critical_buffer_log_filter>`.
// Log messages that match this filter will be guaranteed delivery. In order to guarantee
// the arrival, this endpoint performs the following process.
//
// 1. A response message is returned for each log. The response message includes ACK/NACK status,
// and in case of NACK, the target log is not flushed but buffered by Envoy.
// 2. Timeout for response message is set and if no message is returned within a certain time,
// it will be considered as unreachable and buffered by Envoy without flushing the target log. This timeout is set by
// :ref:`message_ack_timeout <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.message_ack_timeout>`.
//
// On the ALS receiver side, ACK is expected to be returned to indicate that the log was saved properly,
// and NACK is expected to be returned when the log could not be saved due to some error.
//
// .. attention::
//
// Buffers for guaranteed reachability can be extremely memory-intensive. Therefore, the following points
// should be considered when using this endpoint.
//
// 1. :ref:`critical_buffer_log_filter <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.critical_buffer_log_filter>`
// should be set strictly. A loose filter may encourage rapid buffer overwhelm and leading to OOM.
// 2. :ref:`max_pending_buffer_size_bytes <envoy_v3_api_msg_extensions.access_loggers.grpc.v3.CommonGrpcAccessLogConfig.max_pending_buffer_size_bytes>`
// should be set appropriately to prevent OOM.
// 3. Make sure that ALS receiver is implemented properly. If it is not implemented, all messages will
// be buffered, which may cause OOM soon.
Comment on lines +53 to +54

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 2 was set it shouldn't oom right?

@Shikugawa Shikugawa Sep 2, 2021

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can avoid OOM by setting pending_critical_buffer_size_bytes correctly.

rpc CriticalAccessLogs(stream CriticalAccessLogsMessage)
returns (stream CriticalAccessLogsResponse) {
}
}

// Empty response for the StreamAccessLogs API. Will never be sent. See below.
Expand All @@ -35,6 +63,23 @@ message StreamAccessLogsResponse {
"envoy.service.accesslog.v2.StreamAccessLogsResponse";
}

// Response received to identify undelivered or delivered messages in CriticalAccessLogs.
message CriticalAccessLogsResponse {
enum Status {
// Indicates that the message has been received.
ACK = 0;

// Indicates that the message has not been received.
NACK = 1;
}

// This field is used to indicate the arrival status.
Status status = 1;

// Message ID that identifies a message.
uint64 id = 2;
}

// Stream message for the StreamAccessLogs API. Envoy will open a stream to the server and stream
// access logs without ever expecting a response.
message StreamAccessLogsMessage {
Expand Down Expand Up @@ -85,3 +130,16 @@ message StreamAccessLogsMessage {
TCPAccessLogEntries tcp_logs = 3;
}
}

// Stream message for the CriticalAccessLogs API.
// Envoy opens a stream to the server and streams the access log,
// expecting a response. Each message sent is assigned an individual ID,
// and the state of the message is tracked based on the ID.
message CriticalAccessLogsMessage {
// The body of the log message sent to CriticalAccessLogs.
StreamAccessLogsMessage message = 1;

// This is an ID to identify the message, and should be added to the Critical Endpoint
// response message to uniquely identify the message being ACK/NACKed.
uint64 id = 4;
}
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Removed Config or Runtime

New Features
------------
* access log: added :ref:`critical log message <envoy_v3_api_field_service.accesslog.v3.CriticalAccessLogsMessage.message>` to AccessLogService to guarantee log arrival.
* access_log: added :ref:`METADATA<envoy_v3_api_msg_extensions.formatter.metadata.v3.Metadata>` token to handle all types of metadata (DYNAMIC, CLUSTER, ROUTE).
* bootstrap: added :ref:`inline_headers <envoy_v3_api_field_config.bootstrap.v3.Bootstrap.inline_headers>` in the bootstrap to make custom inline headers bootstrap configurable.
* contrib: added new :ref:`contrib images <install_contrib>` which contain contrib extensions.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 61 additions & 3 deletions generated_api_shadow/envoy/service/accesslog/v3/als.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions source/extensions/access_loggers/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ envoy_cc_library(
"//source/common/protobuf:utility_lib",
"@com_google_absl//absl/types:optional",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/service/accesslog/v3:pkg_cc_proto",
],
)
39 changes: 35 additions & 4 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/service/accesslog/v3/als.pb.h"
#include "envoy/singleton/instance.h"
#include "envoy/stats/scope.h"
#include "envoy/thread_local/thread_local.h"

#include "source/common/access_log/access_log_impl.h"
#include "source/common/common/assert.h"
#include "source/common/grpc/typed_async_client.h"
#include "source/common/protobuf/utility.h"
Expand Down Expand Up @@ -44,14 +46,16 @@ template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {
/**
* Log http access entry.
* @param entry supplies the access log to send.
* @param is_critical determine whether log entry is critical or not.
*/
virtual void log(HttpLogProto&& entry) PURE;
virtual void log(HttpLogProto&& entry, bool is_critical) PURE;

/**
* Log tcp access entry.
* @param entry supplies the access log to send.
* @param is_critical determine whether log entry is critical or not.
*/
virtual void log(TcpLogProto&& entry) PURE;
virtual void log(TcpLogProto&& entry, bool is_critical) PURE;
};

/**
Expand Down Expand Up @@ -146,6 +150,25 @@ struct GrpcAccessLoggerStats {
ALL_GRPC_ACCESS_LOGGER_STATS(GENERATE_COUNTER_STRUCT)
};

template <class RequestType> class CriticalAccessLoggerGrpcClient {
public:
virtual ~CriticalAccessLoggerGrpcClient() = default;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you added virtual uint64_t id(const ResponseType& r) PURE and virtual Status status(const ResponseType& r) PURE you could have this be generic over the response type as well and not be tied to access logging at all

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we need is to generalize grpc client with buffer. Is is enough as follows?

template <class RequestType>
class BufferedGrpcClient {
  // It adds message to internal buffer by copy.
  virtual void addMessage(RequestType) PURE;
  // Send buffered messages.
  virtual void sendAll() PURE;
}

class BufferedGrpcClientImpl : public BufferedGrpcClient {
  void addMessage(RequestType) override;
  void sendAll() override;

  map<uint32_t, State> state_map_;
  map<uint32_t, RequestType> msg_map_;
}


/**
* Flush critical messages.
*/
virtual void flush(RequestType message) PURE;

/**
* Whether this client has active stream or not.
*/
virtual bool isStreamStarted() PURE;
};

template <class RequestType>
using CriticalAccessLoggerGrpcClientPtr =
std::unique_ptr<CriticalAccessLoggerGrpcClient<RequestType>>;

/**
* Base class for defining a gRPC logger with the `HttpLogProto` and `TcpLogProto` access log
* entries and `LogRequest` and `LogResponse` gRPC messages.
Expand All @@ -165,14 +188,20 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
: client_(client, service_method), buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flushCriticalMessage();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
})),
max_buffer_size_bytes_(max_buffer_size_bytes),
stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX(scope, access_log_prefix))}) {
flush_timer_->enableTimer(buffer_flush_interval_msec_);
}

void log(HttpLogProto&& entry) {
void log(HttpLogProto&& entry, bool is_critical) {
if (is_critical) {
logCritical(std::move(entry));
return;
}

if (!canLogMore()) {
return;
}
Expand All @@ -183,7 +212,7 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
}
}

void log(TcpLogProto&& entry) {
void log(TcpLogProto&& entry, bool) {
approximate_message_size_bytes_ += entry.ByteSizeLong();
addEntry(std::move(entry));
if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) {
Expand All @@ -201,6 +230,8 @@ class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogPro
virtual void addEntry(HttpLogProto&& entry) PURE;
virtual void addEntry(TcpLogProto&& entry) PURE;
virtual void clearMessage() { message_.Clear(); }
virtual void flushCriticalMessage() {}
virtual void logCritical(HttpLogProto&&) {}

void flush() {
if (isEmpty()) {
Expand Down
Loading