Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions source/extensions/access_loggers/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,22 @@ envoy_cc_library(
"//source/common/singleton:const_singleton",
],
)

envoy_cc_library(
name = "grpc_access_logger",
hdrs = ["grpc_access_logger.h"],
deps = [
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_manager_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/singleton:instance_interface",
"//include/envoy/stats:stats_interface",
"//include/envoy/thread_local:thread_local_interface",
"//source/common/common:assert_lib",
"//source/common/grpc:typed_async_client_lib",
"//source/common/protobuf:utility_lib",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/types:optional",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
],
)
232 changes: 232 additions & 0 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
#pragma once

#include <memory>

#include "envoy/config/core/v3/config_source.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/local_info/local_info.h"
#include "envoy/singleton/instance.h"
#include "envoy/stats/scope.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/assert.h"
#include "common/grpc/typed_async_client.h"
#include "common/protobuf/utility.h"

#include "absl/container/flat_hash_map.h"
#include "absl/types/optional.h"

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
namespace Common {

enum class GrpcAccessLoggerType { TCP, HTTP };

namespace Detail {

/**
* Fully specialized types of the interfaces below are available through the
* `Common::GrpcAccessLogger::Interface` and `Common::GrpcAccessLoggerCache::interface`
* aliases.
*/

/**
* Interface for an access logger. The logger provides abstraction on top of gRPC stream, deals with
* reconnects and performs batching.
*/
template <typename HttpLogProto, typename TcpLogProto> class GrpcAccessLogger {

@itamarkam itamarkam Nov 25, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mean to hijack this review, I was just curious to see how you templated all of this -
Does it make sense to also split the TCP and HTTP logging?
There's nothing specific to HTTP/TCP in this interface and it would allow creating a logger that does only one of the two.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not split it as such but we can provide partial specialization for the GrpcAccessLogger that has only one log method, instead of two. There is currently no use case for it now, but it can be added as part of adding OLTP logger if it only supports logging one type of log entry.

template <typename LogProto> class GrpcAccessLogger<LogProto, void> {
public:
  using SharedPtr = std::shared_ptr<GrpcAccessLogger>;

  virtual ~GrpcAccessLogger() = default;

  /**
   * Log access entry.
   * @param entry supplies the access log to send.
   */
  virtual void log(LogProto&& entry) PURE;
};

public:
using SharedPtr = std::shared_ptr<GrpcAccessLogger>;

virtual ~GrpcAccessLogger() = default;

/**
* Log http access entry.
* @param entry supplies the access log to send.
*/
virtual void log(HttpLogProto&& entry) PURE;

/**
* Log tcp access entry.
* @param entry supplies the access log to send.
*/
virtual void log(TcpLogProto&& entry) PURE;
};

/**
* Interface for an access logger cache. The cache deals with threading and de-duplicates loggers
* for the same configuration.
*/
template <typename GrpcAccessLogger, typename ConfigProto> class GrpcAccessLoggerCache {
public:
using SharedPtr = std::shared_ptr<GrpcAccessLoggerCache>;
virtual ~GrpcAccessLoggerCache() = default;

/**
* Get existing logger or create a new one for the given configuration.
* @param config supplies the configuration for the logger.
* @return GrpcAccessLoggerSharedPtr ready for logging requests.
*/
virtual typename GrpcAccessLogger::SharedPtr getOrCreateLogger(const ConfigProto& config,
GrpcAccessLoggerType logger_type,
Stats::Scope& scope) PURE;
};

template <typename LogRequest, typename LogResponse> class GrpcAccessLogClient {
public:
GrpcAccessLogClient(Grpc::RawAsyncClientPtr&& client,
const Protobuf::MethodDescriptor& service_method)
: GrpcAccessLogClient(std::move(client), service_method, absl::nullopt) {}
GrpcAccessLogClient(Grpc::RawAsyncClientPtr&& client,
const Protobuf::MethodDescriptor& service_method,
envoy::config::core::v3::ApiVersion transport_api_version)
: client_(std::move(client)), service_method_(service_method),
transport_api_version_(transport_api_version) {}

public:
struct LocalStream : public Grpc::AsyncStreamCallbacks<LogResponse> {
LocalStream(GrpcAccessLogClient& parent) : parent_(parent) {}

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
void onReceiveMessage(std::unique_ptr<LogResponse>&&) override {}
void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
ASSERT(parent_.stream_ != nullptr);
if (parent_.stream_->stream_ != nullptr) {
// Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
// stream data in send().
parent_.stream_.reset();
}
}

GrpcAccessLogClient& parent_;
Grpc::AsyncStream<LogRequest> stream_{};
};

bool isStreamStarted() { return stream_ != nullptr && stream_->stream_ != nullptr; }

bool log(const LogRequest& request) {
if (!stream_) {
stream_ = std::make_unique<LocalStream>(*this);
}

if (stream_->stream_ == nullptr) {
stream_->stream_ =
client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());
}

if (stream_->stream_ != nullptr) {
if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
return false;
}
if (transport_api_version_.has_value()) {
stream_->stream_->sendMessage(request, transport_api_version_.value(), false);
} else {
stream_->stream_->sendMessage(request, false);
}
} else {
// Clear out the stream data due to stream creation failure.
stream_.reset();
}
return true;
}

Grpc::AsyncClient<LogRequest, LogResponse> client_;
std::unique_ptr<LocalStream> stream_;
const Protobuf::MethodDescriptor& service_method_;
const absl::optional<envoy::config::core::v3::ApiVersion> transport_api_version_;
};

} // namespace Detail

/**
* Base class for defining a gRPC logger with the `HttpLogProto` and `TcpLogProto` access log
* entries and `LogRequest` and `LogResponse` gRPC messages.
* The log entries and messages are distinct types to support batching of multiple access log
* entries in a single gRPC messages that go on the wire.
*/
template <typename HttpLogProto, typename TcpLogProto, typename LogRequest, typename LogResponse>
class GrpcAccessLogger : public Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto> {
public:
using Interface = Detail::GrpcAccessLogger<HttpLogProto, TcpLogProto>;

GrpcAccessLogger(Grpc::RawAsyncClientPtr&& client,
const Protobuf::MethodDescriptor& service_method)
: GrpcAccessLogger(std::move(client), service_method, absl::nullopt) {}
GrpcAccessLogger(Grpc::RawAsyncClientPtr&& client,
const Protobuf::MethodDescriptor& service_method,
envoy::config::core::v3::ApiVersion transport_api_version)
: client_(std::move(client), service_method, transport_api_version) {}

protected:
Detail::GrpcAccessLogClient<LogRequest, LogResponse> client_;
};

/**
* Class for defining logger cache with the `GrpcAccessLogger` interface and
* `ConfigProto` configuration.
*/
template <typename GrpcAccessLogger, typename ConfigProto>
class GrpcAccessLoggerCache : public Singleton::Instance,
public Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto> {
public:
using Interface = Detail::GrpcAccessLoggerCache<GrpcAccessLogger, ConfigProto>;

GrpcAccessLoggerCache(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls, const LocalInfo::LocalInfo& local_info)
: async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()),
local_info_(local_info) {
tls_slot_->set([](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalCache>(dispatcher);
});
}

typename GrpcAccessLogger::SharedPtr getOrCreateLogger(const ConfigProto& config,
GrpcAccessLoggerType logger_type,
Stats::Scope& scope) override {
// TODO(euroelessar): Consider cleaning up loggers.
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
const auto it = cache.access_loggers_.find(cache_key);
if (it != cache.access_loggers_.end()) {
return it->second;
}
const Grpc::AsyncClientFactoryPtr factory =
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false);
const auto logger = std::make_shared<GrpcAccessLogger>(
factory->create(), config.log_name(),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
local_info_, scope, config.transport_api_version());
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}

private:
/**
* Per-thread cache.
*/
struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject {
ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {}

Event::Dispatcher& dispatcher_;
// Access loggers indexed by the hash of logger's configuration and logger type.
absl::flat_hash_map<std::pair<std::size_t, Common::GrpcAccessLoggerType>,
typename GrpcAccessLogger::SharedPtr>
access_loggers_;
};

Grpc::AsyncClientManager& async_client_manager_;
Stats::Scope& scope_;
ThreadLocal::SlotPtr tls_slot_;
const LocalInfo::LocalInfo& local_info_;
};

} // namespace Common
} // namespace AccessLoggers
} // namespace Extensions
} // namespace Envoy
1 change: 1 addition & 0 deletions source/extensions/access_loggers/grpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ envoy_cc_library(
"//source/common/grpc:typed_async_client_lib",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/access_loggers/common:access_log_base",
"//source/extensions/access_loggers/common:grpc_access_logger",
"@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/accesslog/v3:pkg_cc_proto",
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/access_loggers/grpc/config_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SINGLETON_MANAGER_REGISTRATION(grpc_access_logger_cache);

GrpcCommon::GrpcAccessLoggerCacheSharedPtr
getGrpcAccessLoggerCacheSingleton(Server::Configuration::FactoryContext& context) {
return context.singletonManager().getTyped<GrpcCommon::GrpcAccessLoggerCache>(
return context.singletonManager().getTyped<GrpcCommon::GrpcAccessLoggerCacheImpl>(
SINGLETON_MANAGER_REGISTERED_NAME(grpc_access_logger_cache), [&context] {
return std::make_shared<GrpcCommon::GrpcAccessLoggerCacheImpl>(
context.clusterManager().grpcAsyncClientManager(), context.scope(),
Expand Down
84 changes: 14 additions & 70 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,25 @@ namespace Extensions {
namespace AccessLoggers {
namespace GrpcCommon {

void GrpcAccessLoggerImpl::LocalStream::onRemoteClose(Grpc::Status::GrpcStatus,
const std::string&) {
ASSERT(parent_.stream_ != absl::nullopt);
if (parent_.stream_->stream_ != nullptr) {
// Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
// stream data in send().
parent_.stream_.reset();
}
}

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
Grpc::RawAsyncClientPtr&& client, std::string log_name,
std::chrono::milliseconds buffer_flush_interval_msec, uint64_t max_buffer_size_bytes,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope,
envoy::config::core::v3::ApiVersion transport_api_version)
: stats_({ALL_GRPC_ACCESS_LOGGER_STATS(
: GrpcAccessLogger(
std::move(client),
Grpc::VersionedMethods("envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs",
"envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs")
.getMethodDescriptorForVersion(transport_api_version),
transport_api_version),
stats_({ALL_GRPC_ACCESS_LOGGER_STATS(
POOL_COUNTER_PREFIX(scope, "access_logs.grpc_access_log."))}),
client_(std::move(client)), log_name_(log_name),
buffer_flush_interval_msec_(buffer_flush_interval_msec),
log_name_(log_name), buffer_flush_interval_msec_(buffer_flush_interval_msec),
flush_timer_(dispatcher.createTimer([this]() {
flush();
flush_timer_->enableTimer(buffer_flush_interval_msec_);
})),
max_buffer_size_bytes_(max_buffer_size_bytes), local_info_(local_info),
service_method_(
Grpc::VersionedMethods("envoy.service.accesslog.v3.AccessLogService.StreamAccessLogs",
"envoy.service.accesslog.v2.AccessLogService.StreamAccessLogs")
.getMethodDescriptorForVersion(transport_api_version)),
transport_api_version_(transport_api_version) {
max_buffer_size_bytes_(max_buffer_size_bytes), local_info_(local_info) {
flush_timer_->enableTimer(buffer_flush_interval_msec_);
}

Expand Down Expand Up @@ -90,63 +80,17 @@ void GrpcAccessLoggerImpl::flush() {
return;
}

if (stream_ == absl::nullopt) {
stream_.emplace(*this);
}

if (stream_->stream_ == nullptr) {
stream_->stream_ =
client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());

if (!client_.isStreamStarted()) {
auto* identifier = message_.mutable_identifier();
*identifier->mutable_node() = local_info_.node();
identifier->set_log_name(log_name_);
}

if (stream_->stream_ != nullptr) {
if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
return;
}
stream_->stream_->sendMessage(message_, transport_api_version_, false);
} else {
// Clear out the stream data due to stream creation failure.
stream_.reset();
}

// Clear the message regardless of the success.
approximate_message_size_bytes_ = 0;
message_.Clear();
}

GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager,
Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info)
: async_client_manager_(async_client_manager), scope_(scope), tls_slot_(tls.allocateSlot()),
local_info_(local_info) {
tls_slot_->set(
[](Event::Dispatcher& dispatcher) { return std::make_shared<ThreadLocalCache>(dispatcher); });
}

GrpcAccessLoggerSharedPtr GrpcAccessLoggerCacheImpl::getOrCreateLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
GrpcAccessLoggerType logger_type, Stats::Scope& scope) {
// TODO(euroelessar): Consider cleaning up loggers.
auto& cache = tls_slot_->getTyped<ThreadLocalCache>();
const auto cache_key = std::make_pair(MessageUtil::hash(config), logger_type);
const auto it = cache.access_loggers_.find(cache_key);
if (it != cache.access_loggers_.end()) {
return it->second;
if (client_.log(message_)) {
// Clear the message regardless of the success.
approximate_message_size_bytes_ = 0;
message_.Clear();
}
const Grpc::AsyncClientFactoryPtr factory =
async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, false);
const GrpcAccessLoggerSharedPtr logger = std::make_shared<GrpcAccessLoggerImpl>(
factory->create(), config.log_name(),
std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, 1000)),
PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, 16384), cache.dispatcher_,
local_info_, scope, config.transport_api_version());
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}

} // namespace GrpcCommon
Expand Down
Loading