Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/envoy/config/accesslog/v2/als.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ option go_package = "v2";

import "envoy/api/v2/core/grpc_service.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

import "validate/validate.proto";

// [#protodoc-title: gRPC Access Log Service (ALS)]
Expand Down Expand Up @@ -49,4 +52,14 @@ message CommonGrpcAccessLogConfig {

// The gRPC service for the access log service.
envoy.api.v2.core.GrpcService grpc_service = 2 [(validate.rules).message.required = true];

// Interval for flushing access logs to the gRPC stream. Logger will flush requests every time
// this interval is elapsed, or when batch size limit is hit, whichever comes first. Defaults to
// 1 second.
google.protobuf.Duration buffer_flush_interval = 3 [(validate.rules).duration.gt = {}];

// Soft size limit in bytes for access log entries buffer. Logger will buffer requests until
// this limit it hit, or every time flush interval is elapsed, whichever comes first. Setting it
// to zero effectively disables the batching. Defaults to 16384.
google.protobuf.UInt32Value buffer_size_bytes = 4;
}
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Version history

1.12.0 (pending)
================
* access log: added :ref:`buffering <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_size_bytes>` and :ref:`periodical flushing <envoy_api_field_config.accesslog.v2.CommonGrpcAccessLogConfig.buffer_flush_interval>` support to gRPC access logger. Defaults to 16KB buffer and flushing every 1 second.
* admin: added ability to configure listener :ref:`socket options <envoy_api_field_config.bootstrap.v2.Admin.socket_options>`.
* admin: added config dump support for Secret Discovery Service :ref:`SecretConfigDump <envoy_api_msg_admin.v2alpha.SecretsConfigDump>`.
* config: added access log :ref:`extension filter<envoy_api_field_config.filter.accesslog.v2.AccessLogFilter.extension_filter>`.
Expand Down
46 changes: 37 additions & 9 deletions source/extensions/access_loggers/http_grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,33 @@ void GrpcAccessLoggerImpl::LocalStream::onRemoteClose(Grpc::Status::GrpcStatus,
}

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t buffer_size_bytes,
Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info)
: client_(std::move(client)), log_name_(std::move(log_name)), local_info_(local_info) {}
: client_(std::move(client)), log_name_(log_name),
buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
})),
buffer_size_bytes_(buffer_size_bytes), local_info_(local_info) {
flush_timer_->enableTimer(buffer_flush_interval_msec_);
}

void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) {
// TODO(euroelessar): Add batching and flushing.
envoy::service::accesslog::v2::StreamAccessLogsMessage message;
message.mutable_http_logs()->add_log_entry()->Swap(&entry);
approximate_message_size_bytes_ += entry.ByteSizeLong();
message_.mutable_http_logs()->add_log_entry()->Swap(&entry);
if (approximate_message_size_bytes_ >= buffer_size_bytes_) {
flush();
}
}

void GrpcAccessLoggerImpl::flush() {
if (!message_.has_http_logs()) {
// Nothing to flush.
return;
}

if (stream_ == absl::nullopt) {
stream_.emplace(*this);
Expand All @@ -61,17 +81,21 @@ void GrpcAccessLoggerImpl::log(envoy::data::accesslog::v2::HTTPAccessLogEntry&&
"envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs"),
*stream_);

auto* identifier = message.mutable_identifier();
auto* identifier = message_.mutable_identifier();
*identifier->mutable_node() = local_info_.node();
identifier->set_log_name(log_name_);
}

if (stream_->stream_ != nullptr) {
stream_->stream_->sendMessage(message, false);
stream_->stream_->sendMessage(message_, false);
} else {
// Clear out the stream data due to stream creation failure.
stream_.reset();
}

// Clear the message regardless of the success.
approximate_message_size_bytes_ = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: could we potentially use the proto built-in cached size to avoid keeping track of it ourselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how. ByteSize() (or ByteSizeLong()) method does not use cached sizes for children (presumably because CachedSize is not invalidated on message modification).
Which means that the only way to use built-in cache is to manually iterate over all log entries and sum result of GetCachedSize() calls, which would be less efficient and less obvious in terms of implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK SGTM. Thanks for the explanation.

message_.Clear();
}

GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager,
Expand All @@ -80,7 +104,8 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a
const LocalInfo::LocalInfo& local_info)
: async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()),
local_info_(local_info) {
tls_slot_->set([](Event::Dispatcher&) { return std::make_shared<ThreadLocalCache>(); });
tls_slot_->set(
[](Event::Dispatcher& dispatcher) { return std::make_shared<ThreadLocalCache>(dispatcher); });
}

GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger(
Expand All @@ -94,8 +119,11 @@ GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger(
}
const Grpc::AsyncClientFactoryPtr factory =
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false);
const GrpcAccessLoggerSharedPtr logger =
std::make_shared<GrpcAccessLoggerImpl>(factory->create(), config.log_name(), local_info_);
const GrpcAccessLoggerSharedPtr logger = std::make_shared<GrpcAccessLoggerImpl>(
factory->create(), config.log_name(),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
local_info_);
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}
Expand Down
12 changes: 12 additions & 0 deletions source/extensions/access_loggers/http_grpc/grpc_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ using GrpcAccessLoggerCacheSharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
class GrpcAccessLoggerImpl : public GrpcAccessLogger {
public:
GrpcAccessLoggerImpl(Grpc::RawAsyncClientPtr&& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec,
uint64_t buffer_size_bytes, Event::Dispatcher& dispatcher,
const LocalInfo::LocalInfo& local_info);

void log(envoy::data::accesslog::v2::HTTPAccessLogEntry&& entry) override;
Expand All @@ -83,10 +85,17 @@ class GrpcAccessLoggerImpl : public GrpcAccessLogger {
Grpc::AsyncStream<envoy::service::accesslog::v2::StreamAccessLogsMessage> stream_{};
};

void flush();

Grpc::AsyncClient<envoy::service::accesslog::v2::StreamAccessLogsMessage,
envoy::service::accesslog::v2::StreamAccessLogsResponse>
client_;
const std::string log_name_;
const std::chrono::milliseconds buffer_flush_interval_msec_;
const Event::TimerPtr flush_timer_;
const uint64_t buffer_size_bytes_;
uint64_t approximate_message_size_bytes_ = 0;
envoy::service::accesslog::v2::StreamAccessLogsMessage message_;
absl::optional<LocalStream> stream_;
const LocalInfo::LocalInfo& local_info_;
};
Expand All @@ -105,6 +114,9 @@ class GrpcAccessLoggerCacheImpl : public Singleton::Instance, public GrpcAccessL
* Per-thread cache.
*/
struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}

Event::Dispatcher& dispatcher_;
// Access loggers indexed by the hash of logger's configuration.
absl::flat_hash_map<std::size_t, GrpcAccessLoggerSharedPtr> access_loggers_;
};
Expand Down
102 changes: 100 additions & 2 deletions test/extensions/access_loggers/http_grpc/grpc_access_log_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ namespace AccessLoggers {
namespace HttpGrpc {
namespace {

constexpr std::chrono::milliseconds FlushInterval(10);

class GrpcAccessLoggerImplTest : public testing::Test {
public:
using MockAccessLogStream = Grpc::MockAsyncStream;
using AccessLogCallbacks =
Grpc::AsyncStreamCallbacks<envoy::service::accesslog::v2::StreamAccessLogsResponse>;

GrpcAccessLoggerImplTest() {
void initLogger(std::chrono::milliseconds buffer_flush_interval_msec, size_t buffer_size_bytes) {
timer_ = new Event::MockTimer(&dispatcher_);
EXPECT_CALL(*timer_, enableTimer(buffer_flush_interval_msec));
logger_ = std::make_unique<GrpcAccessLoggerImpl>(Grpc::RawAsyncClientPtr{async_client_},
log_name_, local_info_);
log_name_, buffer_flush_interval_msec,
buffer_size_bytes, dispatcher_, local_info_);
}

void expectStreamStart(MockAccessLogStream& stream, AccessLogCallbacks** callbacks_to_set) {
Expand All @@ -59,13 +64,16 @@ class GrpcAccessLoggerImplTest : public testing::Test {

std::string log_name_ = "test_log_name";
LocalInfo::MockLocalInfo local_info_;
Event::MockTimer* timer_ = nullptr;
Event::MockDispatcher dispatcher_;
Grpc::MockAsyncClient* async_client_{new Grpc::MockAsyncClient};
std::unique_ptr<GrpcAccessLoggerImpl> logger_;
};

// Test basic stream logging flow.
TEST_F(GrpcAccessLoggerImplTest, BasicFlow) {
InSequence s;
initLogger(FlushInterval, 0);

// Start a stream for the first log.
MockAccessLogStream stream;
Expand Down Expand Up @@ -126,6 +134,7 @@ TEST_F(GrpcAccessLoggerImplTest, BasicFlow) {
// Test that stream failure is handled correctly.
TEST_F(GrpcAccessLoggerImplTest, StreamFailure) {
InSequence s;
initLogger(FlushInterval, 0);

EXPECT_CALL(*async_client_, startRaw(_, _, _))
.WillOnce(Invoke(
Expand All @@ -138,6 +147,95 @@ TEST_F(GrpcAccessLoggerImplTest, StreamFailure) {
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));
}

// Test that log entries are batched.
TEST_F(GrpcAccessLoggerImplTest, Batching) {
InSequence s;
initLogger(FlushInterval, 100);

MockAccessLogStream stream;
AccessLogCallbacks* callbacks;
expectStreamStart(stream, &callbacks);
EXPECT_CALL(local_info_, node());
const std::string path1(30, '1');
const std::string path2(30, '2');
const std::string path3(80, '3');
expectStreamMessage(stream, fmt::format(R"EOF(
identifier:
node:
id: node_name
cluster: cluster_name
locality:
zone: zone_name
log_name: test_log_name
http_logs:
log_entry:
- request:
path: "{}"
- request:
path: "{}"
- request:
path: "{}"
)EOF",
path1, path2, path3));
envoy::data::accesslog::v2::HTTPAccessLogEntry entry;
entry.mutable_request()->set_path(path1);
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));
entry.mutable_request()->set_path(path2);
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));
entry.mutable_request()->set_path(path3);
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));

const std::string path4(120, '4');
expectStreamMessage(stream, fmt::format(R"EOF(
http_logs:
log_entry:
request:
path: "{}"
)EOF",
path4));
entry.mutable_request()->set_path(path4);
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));
}

// Test that log entries are flushed periodically.
TEST_F(GrpcAccessLoggerImplTest, Flushing) {
InSequence s;
initLogger(FlushInterval, 100);

// Nothing to do yet.
EXPECT_CALL(*timer_, enableTimer(FlushInterval));
timer_->invokeCallback();

envoy::data::accesslog::v2::HTTPAccessLogEntry entry;
// Not enough data yet to trigger flush on batch size.
entry.mutable_request()->set_path("/test/path1");
logger_->log(envoy::data::accesslog::v2::HTTPAccessLogEntry(entry));

MockAccessLogStream stream;
AccessLogCallbacks* callbacks;
expectStreamStart(stream, &callbacks);
EXPECT_CALL(local_info_, node());
expectStreamMessage(stream, fmt::format(R"EOF(
identifier:
node:
id: node_name
cluster: cluster_name
locality:
zone: zone_name
log_name: test_log_name
http_logs:
log_entry:
- request:
path: /test/path1
)EOF"));
EXPECT_CALL(*timer_, enableTimer(FlushInterval));
timer_->invokeCallback();

// Flush on empty message does nothing.
EXPECT_CALL(*timer_, enableTimer(FlushInterval));
timer_->invokeCallback();
}

class GrpcAccessLoggerCacheImplTest : public testing::Test {
public:
GrpcAccessLoggerCacheImplTest() {
Expand Down