diff --git a/api/envoy/extensions/access_loggers/open_telemetry/v3/BUILD b/api/envoy/extensions/access_loggers/open_telemetry/v3/BUILD index 39d57c5eab682..e13dc63106d3f 100644 --- a/api/envoy/extensions/access_loggers/open_telemetry/v3/BUILD +++ b/api/envoy/extensions/access_loggers/open_telemetry/v3/BUILD @@ -6,8 +6,10 @@ licenses(["notice"]) # Apache 2 api_proto_package( deps = [ + "//envoy/annotations:pkg", "//envoy/config/core/v3:pkg", "//envoy/extensions/access_loggers/grpc/v3:pkg", + "//envoy/type/tracing/v3:pkg", "@com_github_cncf_xds//udpa/annotations:pkg", "@opentelemetry_proto//:common_proto", ], diff --git a/api/envoy/extensions/access_loggers/open_telemetry/v3/logs_service.proto b/api/envoy/extensions/access_loggers/open_telemetry/v3/logs_service.proto index 641276a64bd62..ccb1ac479afca 100644 --- a/api/envoy/extensions/access_loggers/open_telemetry/v3/logs_service.proto +++ b/api/envoy/extensions/access_loggers/open_telemetry/v3/logs_service.proto @@ -3,12 +3,18 @@ syntax = "proto3"; package envoy.extensions.access_loggers.open_telemetry.v3; import "envoy/config/core/v3/extension.proto"; +import "envoy/config/core/v3/grpc_service.proto"; +import "envoy/config/core/v3/http_service.proto"; import "envoy/extensions/access_loggers/grpc/v3/als.proto"; +import "envoy/type/tracing/v3/custom_tag.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/wrappers.proto"; import "opentelemetry/proto/common/v1/common.proto"; +import "envoy/annotations/deprecation.proto"; import "udpa/annotations/status.proto"; -import "validate/validate.proto"; option java_package = "io.envoyproxy.envoy.extensions.access_loggers.open_telemetry.v3"; option java_outer_classname = "LogsServiceProto"; @@ -16,17 +22,37 @@ option java_multiple_files = true; option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/open_telemetry/v3;open_telemetryv3"; option (udpa.annotations.file_status).package_version_status = ACTIVE; -// [#protodoc-title: OpenTelemetry (gRPC) Access Log] +// [#protodoc-title: OpenTelemetry Access Log] // Configuration for the built-in ``envoy.access_loggers.open_telemetry`` // :ref:`AccessLog `. This configuration will // populate `opentelemetry.proto.collector.v1.logs.ExportLogsServiceRequest.resource_logs `_. // In addition, the request start time is set in the dedicated field. // [#extension: envoy.access_loggers.open_telemetry] -// [#next-free-field: 8] +// [#next-free-field: 15] message OpenTelemetryAccessLogConfig { // [#comment:TODO(itamarkam): add 'filter_state_objects_to_log' to logs.] - grpc.v3.CommonGrpcAccessLogConfig common_config = 1 [(validate.rules).message = {required: true}]; + // Deprecated. Use ``grpc_service`` or ``http_service`` instead. + grpc.v3.CommonGrpcAccessLogConfig common_config = 1 + [deprecated = true, (envoy.annotations.deprecated_at_minor_version) = "3.0"]; + + // The upstream HTTP cluster that will receive OTLP logs via + // `OTLP/HTTP `_. + // Note: Only one of ``common_config``, ``grpc_service``, or ``http_service`` may be used. + // + // .. note:: + // + // The ``request_headers_to_add`` property in the OTLP HTTP exporter service + // does not support the :ref:`format specifier ` as used for + // :ref:`HTTP access logging `. + // The values configured are added as HTTP headers on the OTLP export request + // without any formatting applied. + config.core.v3.HttpService http_service = 8; + + // The upstream gRPC cluster that will receive OTLP logs. + // Note: Only one of ``common_config``, ``grpc_service``, or ``http_service`` may be used. + // This field is preferred over ``common_config.grpc_service``. + config.core.v3.GrpcService grpc_service = 9; // If specified, Envoy will not generate built-in resource labels // like ``log_name``, ``zone_name``, ``cluster_name``, ``node_name``. @@ -57,4 +83,19 @@ message OpenTelemetryAccessLogConfig { // See the formatters extensions documentation for details. // [#extension-category: envoy.formatter] repeated config.core.v3.TypedExtensionConfig formatters = 7; + + string log_name = 10; + + // The interval for flushing access logs to the transport. Default: 1 second. + google.protobuf.Duration buffer_flush_interval = 11; + + // Soft size limit in bytes for the access log buffer. When the buffer exceeds + // this limit, logs will be flushed. Default: 16KB. + google.protobuf.UInt32Value buffer_size_bytes = 12; + + // Additional filter state objects to log as attributes. + repeated string filter_state_objects_to_log = 13; + + // Custom tags to include as log attributes. + repeated type.tracing.v3.CustomTag custom_tags = 14; } diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 2c6eef620f7bf..dd2d7e8c5d043 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -728,5 +728,19 @@ new_features: Adds ``%DOWNSTREAM_LOCAL_ADDRESS_ENDPOINT_ID%``, ``%DOWNSTREAM_DIRECT_LOCAL_ADDRESS_ENDPOINT_ID%``, and ``%UPSTREAM_REMOTE_ADDRESS_ENDPOINT_ID%`` access_log command operators to access the endpoint ID used to establish a connection to an internal listener. +- area: access_log + change: | + Added support for exporting OpenTelemetry access logs via HTTP. New top-level fields + ``http_service``, ``grpc_service``, ``log_name``, ``buffer_flush_interval``, ``buffer_size_bytes``, + ``filter_state_objects_to_log``, and ``custom_tags`` provide a cleaner configuration. + The ``common_config`` field is deprecated but remains functional for backward compatibility. + See :ref:`http_service ` + and :ref:`grpc_service `. deprecated: +- area: access_log + change: | + The ``common_config`` field in + :ref:`OpenTelemetryAccessLogConfig ` + is deprecated. Use ``http_service`` for HTTP transport, ``grpc_service`` for gRPC transport, + and ``log_name`` for the log identifier instead. diff --git a/configs/envoy-otel-http.yaml b/configs/envoy-otel-http.yaml new file mode 100644 index 0000000000000..dfb7459235697 --- /dev/null +++ b/configs/envoy-otel-http.yaml @@ -0,0 +1,141 @@ +# Example configuration for exporting OpenTelemetry logs and traces via HTTP. +# This demonstrates OTLP/HTTP transport for both access logs and tracing. +# +# Usage: +# 1. Start an OTLP HTTP collector on localhost:4318 (e.g., otel-tui) +# 2. Run: bazel-bin/source/exe/envoy-static -c configs/envoy-otel-http.yaml +# 3. Test: curl localhost:10080/get + +admin: + address: + socket_address: + protocol: TCP + address: 127.0.0.1 + port_value: 9901 + +static_resources: + listeners: + - name: listener_0 + address: + socket_address: + protocol: TCP + address: 0.0.0.0 + port_value: 10080 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + generate_request_id: true + tracing: + spawn_upstream_span: true + random_sampling: + value: 100.0 + provider: + name: envoy.tracers.opentelemetry + typed_config: + "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + http_service: + http_uri: + uri: "http://localhost:4318/v1/traces" + cluster: otel_collector + timeout: 1s + request_headers_to_add: + - header: + key: "Authorization" + value: "Bearer fake" + service_name: "envoy-demo" + sampler: + name: envoy.tracers.opentelemetry.samplers.always_on + typed_config: + "@type": type.googleapis.com/envoy.extensions.tracers.opentelemetry.samplers.v3.AlwaysOnSamplerConfig + access_log: + - name: envoy.access_loggers.open_telemetry + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.open_telemetry.v3.OpenTelemetryAccessLogConfig + log_name: envoy_access_log + # Flush immediately for testing (default is 16384 bytes). + buffer_size_bytes: 1 + http_service: + http_uri: + uri: "http://localhost:4318/v1/logs" + cluster: otel_collector + timeout: 1s + request_headers_to_add: + - header: + key: "Authorization" + value: "Bearer fake" + resource_attributes: + values: + - key: "service.name" + value: + string_value: "envoy-demo" + body: + string_value: "%REQ(:METHOD)% %REQ(:PATH)% %PROTOCOL% %RESPONSE_CODE%" + attributes: + values: + - key: "response_code_details" + value: + string_value: "%RESPONSE_CODE_DETAILS%" + - key: "upstream_host" + value: + string_value: "%UPSTREAM_HOST%" + - key: "upstream_cluster" + value: + string_value: "%UPSTREAM_CLUSTER%" + - key: "upstream_transport_failure_reason" + value: + string_value: "%UPSTREAM_TRANSPORT_FAILURE_REASON%" + - key: "response_flags" + value: + string_value: "%RESPONSE_FLAGS%" + - key: "connection_termination_details" + value: + string_value: "%CONNECTION_TERMINATION_DETAILS%" + route_config: + name: local_route + virtual_hosts: + - name: backend + domains: ["*"] + routes: + - match: + prefix: "/" + route: + host_rewrite_literal: httpbingo.org + cluster: httpbingo + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: otel_collector + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: otel_collector + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: localhost + port_value: 4318 + - name: httpbingo + type: LOGICAL_DNS + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: httpbingo + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: httpbingo.org + port_value: 443 + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + sni: httpbingo.org diff --git a/contrib/golang/filters/http/test/test_data/dummy/go.mod b/contrib/golang/filters/http/test/test_data/dummy/go.mod index 25094545aa066..9e0d8fb7fe6b9 100644 --- a/contrib/golang/filters/http/test/test_data/dummy/go.mod +++ b/contrib/golang/filters/http/test/test_data/dummy/go.mod @@ -1,9 +1,9 @@ module example.com/dummy -go 1.22 +go 1.24.6 require github.com/envoyproxy/envoy v1.24.0 -require google.golang.org/protobuf v1.36.1 // indirect +require google.golang.org/protobuf v1.36.11 // indirect replace github.com/envoyproxy/envoy => ../../../../../../../ diff --git a/contrib/golang/filters/http/test/test_data/go.mod b/contrib/golang/filters/http/test/test_data/go.mod index 543cead99c755..8f90996861c06 100644 --- a/contrib/golang/filters/http/test/test_data/go.mod +++ b/contrib/golang/filters/http/test/test_data/go.mod @@ -11,8 +11,8 @@ require ( require ( cel.dev/expr v0.25.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250728155136-f173205681a0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250728155136-f173205681a0 // indirect ) replace github.com/envoyproxy/envoy => ../../../../../../ diff --git a/contrib/golang/filters/network/test/test_data/go.mod b/contrib/golang/filters/network/test/test_data/go.mod index 43a4950fe54eb..33197f5652810 100644 --- a/contrib/golang/filters/network/test/test_data/go.mod +++ b/contrib/golang/filters/network/test/test_data/go.mod @@ -1,9 +1,9 @@ module github.com/envoyproxy/envoy/contrib/golang/filters/network/test/test_data -go 1.22 +go 1.24.6 require github.com/envoyproxy/envoy v1.33.2 -require google.golang.org/protobuf v1.36.5 // indirect +require google.golang.org/protobuf v1.36.11 // indirect replace github.com/envoyproxy/envoy => ../../../../../../ diff --git a/source/extensions/access_loggers/open_telemetry/BUILD b/source/extensions/access_loggers/open_telemetry/BUILD index c790243b8c028..53d5c11996c27 100644 --- a/source/extensions/access_loggers/open_telemetry/BUILD +++ b/source/extensions/access_loggers/open_telemetry/BUILD @@ -9,11 +9,40 @@ licenses(["notice"]) # Apache 2 envoy_extension_package() +envoy_cc_library( + name = "otlp_log_utils_lib", + srcs = ["otlp_log_utils.cc"], + hdrs = ["otlp_log_utils.h"], + deps = [ + "//envoy/formatter:http_formatter_context_interface", + "//envoy/local_info:local_info_interface", + "//envoy/stats:stats_macros", + "//envoy/stream_info:filter_state_interface", + "//envoy/stream_info:stream_info_interface", + "//source/common/common:assert_lib", + "//source/common/common:hex_lib", + "//source/common/common:macros", + "//source/common/http:header_map_lib", + "//source/common/protobuf:utility_lib", + "//source/common/tracing:custom_tag_lib", + "//source/common/tracing:http_tracer_lib", + "//source/common/tracing:trace_context_lib", + "//source/common/version:version_lib", + "@com_google_absl//absl/strings", + "@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/open_telemetry/v3:pkg_cc_proto", + "@opentelemetry_proto//:common_proto_cc", + "@opentelemetry_proto//:logs_proto_cc", + "@opentelemetry_proto//:logs_service_proto_cc", + ], +) + envoy_cc_library( name = "grpc_access_log_lib", srcs = ["grpc_access_log_impl.cc"], hdrs = ["grpc_access_log_impl.h"], deps = [ + ":otlp_log_utils_lib", "//envoy/event:dispatcher_interface", "//envoy/grpc:async_client_manager_interface", "//envoy/local_info:local_info_interface", @@ -23,6 +52,7 @@ envoy_cc_library( "//source/common/protobuf", "//source/extensions/access_loggers/common:grpc_access_logger", "//source/extensions/access_loggers/common:grpc_access_logger_clients_lib", + "//source/extensions/tracers/opentelemetry/resource_detectors:resource_detector_lib", "@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/access_loggers/open_telemetry/v3:pkg_cc_proto", "@opentelemetry_proto//:logs_proto_cc", @@ -36,11 +66,14 @@ envoy_cc_library( hdrs = ["access_log_impl.h"], deps = [ ":grpc_access_log_lib", + ":otlp_log_utils_lib", ":substitution_formatter_lib", "//envoy/access_log:access_log_interface", "//envoy/protobuf:message_validator_interface", "//source/common/protobuf:utility_lib", + "//source/common/tracing:custom_tag_lib", "//source/extensions/access_loggers/common:access_log_base", + "//source/extensions/tracers/opentelemetry/resource_detectors:resource_detector_lib", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", "@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/access_loggers/grpc/v3:pkg_cc_proto", @@ -50,6 +83,38 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "http_access_log_lib", + srcs = ["http_access_log_impl.cc"], + hdrs = ["http_access_log_impl.h"], + deps = [ + ":otlp_log_utils_lib", + ":substitution_formatter_lib", + "//envoy/access_log:access_log_interface", + "//envoy/event:dispatcher_interface", + "//envoy/local_info:local_info_interface", + "//envoy/singleton:instance_interface", + "//envoy/thread_local:thread_local_interface", + "//envoy/upstream:cluster_manager_interface", + "//source/common/config:utility_lib", + "//source/common/http:async_client_lib", + "//source/common/http:async_client_utility_lib", + "//source/common/http:header_map_lib", + "//source/common/http:message_lib", + "//source/common/http:utility_lib", + "//source/common/protobuf", + "//source/common/protobuf:utility_lib", + "//source/common/tracing:custom_tag_lib", + "//source/extensions/access_loggers/common:access_log_base", + "//source/extensions/tracers/opentelemetry/resource_detectors:resource_detector_lib", + "@envoy_api//envoy/config/core/v3:pkg_cc_proto", + "@envoy_api//envoy/data/accesslog/v3:pkg_cc_proto", + "@envoy_api//envoy/extensions/access_loggers/open_telemetry/v3:pkg_cc_proto", + "@opentelemetry_proto//:logs_proto_cc", + "@opentelemetry_proto//:logs_service_proto_cc", + ], +) + envoy_cc_library( name = "access_log_proto_descriptors_lib", srcs = ["access_log_proto_descriptors.cc"], @@ -72,6 +137,8 @@ envoy_cc_extension( "//source/extensions/access_loggers/open_telemetry:access_log_lib", "//source/extensions/access_loggers/open_telemetry:access_log_proto_descriptors_lib", "//source/extensions/access_loggers/open_telemetry:grpc_access_log_lib", + "//source/extensions/access_loggers/open_telemetry:http_access_log_lib", + "//source/extensions/tracers/opentelemetry/resource_detectors:resource_detector_lib", "@envoy_api//envoy/extensions/access_loggers/open_telemetry/v3:pkg_cc_proto", ], ) diff --git a/source/extensions/access_loggers/open_telemetry/access_log_impl.cc b/source/extensions/access_loggers/open_telemetry/access_log_impl.cc index 858e9259651d9..1a85580f45d35 100644 --- a/source/extensions/access_loggers/open_telemetry/access_log_impl.cc +++ b/source/extensions/access_loggers/open_telemetry/access_log_impl.cc @@ -7,7 +7,6 @@ #include "envoy/extensions/access_loggers/grpc/v3/als.pb.h" #include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h" -#include "source/common/common/assert.h" #include "source/common/config/utility.h" #include "source/common/formatter/substitution_formatter.h" #include "source/common/http/headers.h" @@ -15,6 +14,7 @@ #include "source/common/protobuf/message_validator_impl.h" #include "source/common/protobuf/utility.h" #include "source/common/stream_info/utility.h" +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" #include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" @@ -22,34 +22,11 @@ #include "opentelemetry/proto/logs/v1/logs.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" -// Used to pack/unpack the body AnyValue to a KeyValueList. -const char BODY_KEY[] = "body"; - namespace Envoy { namespace Extensions { namespace AccessLoggers { namespace OpenTelemetry { -namespace { - -// Packing the body "AnyValue" to a "KeyValueList" with a single key and the body as value. -::opentelemetry::proto::common::v1::KeyValueList -packBody(const ::opentelemetry::proto::common::v1::AnyValue& body) { - ::opentelemetry::proto::common::v1::KeyValueList output; - auto* kv = output.add_values(); - kv->set_key(BODY_KEY); - *kv->mutable_value() = body; - return output; -} - -::opentelemetry::proto::common::v1::AnyValue -unpackBody(const ::opentelemetry::proto::common::v1::KeyValueList& value) { - ASSERT(value.values().size() == 1 && value.values(0).key() == BODY_KEY); - return value.values(0).value(); -} - -} // namespace - Http::RegisterCustomInlineHeader referer_handle(Http::CustomHeaders::get().Referer); @@ -62,7 +39,9 @@ AccessLog::AccessLog( ThreadLocal::SlotAllocator& tls, GrpcAccessLoggerCacheSharedPtr access_logger_cache, const std::vector& commands) : Common::ImplBase(std::move(filter)), tls_slot_(tls.allocateSlot()), - access_logger_cache_(std::move(access_logger_cache)) { + access_logger_cache_(std::move(access_logger_cache)), + filter_state_objects_to_log_(getFilterStateObjectsToLog(config)), + custom_tags_(getCustomTags(config)) { THROW_IF_NOT_OK(Envoy::Config::Utility::checkTransportVersion(config.common_config())); tls_slot_->set([this, config](Event::Dispatcher&) { @@ -85,7 +64,7 @@ void AccessLog::emitLog(const Formatter::Context& log_context, stream_info.startTime().time_since_epoch()) .count()); - // Unpacking the body "KeyValueList" to "AnyValue". + // Unpacks the body "KeyValueList" to "AnyValue". if (body_formatter_) { const auto formatted_body = unpackBody(body_formatter_->format(log_context, stream_info)); *log_entry.mutable_body() = formatted_body; @@ -93,23 +72,15 @@ void AccessLog::emitLog(const Formatter::Context& log_context, const auto formatted_attributes = attributes_formatter_->format(log_context, stream_info); *log_entry.mutable_attributes() = formatted_attributes.values(); - // Setting the trace id if available. - // OpenTelemetry trace id is a [16]byte array, backend(e.g. OTel-collector) will reject the - // request if the length is not 16. Some trace provider(e.g. zipkin) may return it as a 64-bit hex - // string. In this case, we need to convert it to a 128-bit hex string, padding left with zeros. - std::string trace_id_hex = + // Sets trace context (trace_id, span_id) if available. + const std::string trace_id_hex = log_context.activeSpan().has_value() ? log_context.activeSpan()->getTraceId() : ""; - if (trace_id_hex.size() == 32) { - *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id_hex); - } else if (trace_id_hex.size() == 16) { - auto trace_id = absl::StrCat(Hex::uint64ToHex(0), trace_id_hex); - *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id); - } - std::string span_id_hex = + const std::string span_id_hex = log_context.activeSpan().has_value() ? log_context.activeSpan()->getSpanId() : ""; - if (!span_id_hex.empty()) { - *log_entry.mutable_span_id() = absl::HexStringToBytes(span_id_hex); - } + populateTraceContext(log_entry, trace_id_hex, span_id_hex); + + addFilterStateToAttributes(stream_info, filter_state_objects_to_log_, log_entry); + addCustomTagsToAttributes(custom_tags_, log_context, stream_info, log_entry); tls_slot_->getTyped().logger_->log(std::move(log_entry)); } diff --git a/source/extensions/access_loggers/open_telemetry/access_log_impl.h b/source/extensions/access_loggers/open_telemetry/access_log_impl.h index d8ac11a362e15..e9ab37d56543f 100644 --- a/source/extensions/access_loggers/open_telemetry/access_log_impl.h +++ b/source/extensions/access_loggers/open_telemetry/access_log_impl.h @@ -12,6 +12,7 @@ #include "envoy/thread_local/thread_local.h" #include "source/common/grpc/typed_async_client.h" +#include "source/common/tracing/custom_tag_impl.h" #include "source/extensions/access_loggers/common/access_log_base.h" #include "source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h" #include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h" @@ -56,6 +57,8 @@ class AccessLog : public Common::ImplBase { const GrpcAccessLoggerCacheSharedPtr access_logger_cache_; std::unique_ptr body_formatter_; std::unique_ptr attributes_formatter_; + const std::vector filter_state_objects_to_log_; + const std::vector custom_tags_; }; using AccessLogPtr = std::unique_ptr; diff --git a/source/extensions/access_loggers/open_telemetry/config.cc b/source/extensions/access_loggers/open_telemetry/config.cc index 7ca56443c4618..dc163359886ef 100644 --- a/source/extensions/access_loggers/open_telemetry/config.cc +++ b/source/extensions/access_loggers/open_telemetry/config.cc @@ -12,6 +12,7 @@ #include "source/common/protobuf/protobuf.h" #include "source/extensions/access_loggers/open_telemetry/access_log_impl.h" #include "source/extensions/access_loggers/open_telemetry/access_log_proto_descriptors.h" +#include "source/extensions/access_loggers/open_telemetry/http_access_log_impl.h" namespace Envoy { namespace Extensions { @@ -20,9 +21,10 @@ namespace OpenTelemetry { // Singleton registration via macro defined in envoy/singleton/manager.h SINGLETON_MANAGER_REGISTRATION(open_telemetry_access_logger_cache); +SINGLETON_MANAGER_REGISTRATION(open_telemetry_http_access_logger_cache); -GrpcAccessLoggerCacheSharedPtr -getAccessLoggerCacheSingleton(Server::Configuration::CommonFactoryContext& context) { +std::shared_ptr +getGrpcAccessLoggerCacheSingleton(Server::Configuration::CommonFactoryContext& context) { return context.singletonManager().getTyped( SINGLETON_MANAGER_REGISTERED_NAME(open_telemetry_access_logger_cache), [&context] { return std::make_shared( @@ -31,6 +33,16 @@ getAccessLoggerCacheSingleton(Server::Configuration::CommonFactoryContext& conte }); } +HttpAccessLoggerCacheSharedPtr +getHttpAccessLoggerCacheSingleton(Server::Configuration::CommonFactoryContext& context) { + return context.singletonManager().getTyped( + SINGLETON_MANAGER_REGISTERED_NAME(open_telemetry_http_access_logger_cache), [&context] { + return std::make_shared( + context.clusterManager(), context.serverScope(), context.threadLocal(), + context.localInfo()); + }); +} + ::Envoy::AccessLog::InstanceSharedPtr AccessLogFactory::createAccessLogInstance( const Protobuf::Message& config, ::Envoy::AccessLog::FilterPtr&& filter, Server::Configuration::GenericFactoryContext& context, @@ -41,14 +53,42 @@ ::Envoy::AccessLog::InstanceSharedPtr AccessLogFactory::createAccessLogInstance( const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&>( config, context.messageValidationVisitor()); + // Validate transport configuration: exactly one transport must be specified. + const bool has_grpc_service = proto_config.has_grpc_service(); + const bool has_http_service = proto_config.has_http_service(); + const bool has_common_config_grpc = + proto_config.has_common_config() && proto_config.common_config().has_grpc_service(); + + const int transport_count = + (has_grpc_service ? 1 : 0) + (has_http_service ? 1 : 0) + (has_common_config_grpc ? 1 : 0); + + if (transport_count == 0) { + throw EnvoyException( + "OpenTelemetry access logger requires one of: grpc_service, http_service, or " + "common_config.grpc_service to be configured."); + } + + if (transport_count > 1) { + throw EnvoyException( + "OpenTelemetry access logger can only have one transport configured. " + "Specify exactly one of: grpc_service, http_service, or common_config.grpc_service."); + } + auto commands = THROW_OR_RETURN_VALUE(Formatter::SubstitutionFormatStringUtils::parseFormatters( proto_config.formatters(), context, std::move(command_parsers)), std::vector); + // Create appropriate access log based on transport type. + if (has_http_service) { + return std::make_shared( + std::move(filter), proto_config, context.serverFactoryContext().threadLocal(), + getHttpAccessLoggerCacheSingleton(context.serverFactoryContext()), commands); + } + return std::make_shared( std::move(filter), proto_config, context.serverFactoryContext().threadLocal(), - getAccessLoggerCacheSingleton(context.serverFactoryContext()), commands); + getGrpcAccessLoggerCacheSingleton(context.serverFactoryContext()), commands); } ProtobufTypes::MessagePtr AccessLogFactory::createEmptyConfigProto() { diff --git a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc index b05335225d8f3..46d45f5734598 100644 --- a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc +++ b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc @@ -7,15 +7,15 @@ #include "source/common/config/utility.h" #include "source/common/grpc/typed_async_client.h" +#include "source/common/protobuf/utility.h" #include "source/extensions/access_loggers/common/grpc_access_logger_clients.h" +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" #include "opentelemetry/proto/common/v1/common.pb.h" #include "opentelemetry/proto/logs/v1/logs.pb.h" #include "opentelemetry/proto/resource/v1/resource.pb.h" -const char GRPC_LOG_STATS_PREFIX[] = "access_logs.open_telemetry_access_log."; - namespace Envoy { namespace Extensions { namespace AccessLoggers { @@ -24,15 +24,6 @@ namespace OpenTelemetry { namespace { using opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest; using opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse; - -opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key, - const std::string& value) { - opentelemetry::proto::common::v1::KeyValue keyValue; - keyValue.set_key(key); - keyValue.mutable_value()->set_string_value(value); - return keyValue; -} - } // namespace GrpcAccessLoggerImpl::GrpcAccessLoggerImpl( @@ -48,9 +39,9 @@ GrpcAccessLoggerImpl::GrpcAccessLoggerImpl( *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "opentelemetry.proto.collector.logs.v1.LogsService.Export"), GrpcCommon::optionalRetryPolicy(config.common_config()), genOTelCallbacksFactory())), - stats_({ALL_GRPC_ACCESS_LOGGER_STATS( - POOL_COUNTER_PREFIX(scope, absl::StrCat(GRPC_LOG_STATS_PREFIX, config.stat_prefix())))}) { - initMessageRoot(config, local_info); + stats_({ALL_GRPC_ACCESS_LOGGER_STATS(POOL_COUNTER_PREFIX( + scope, absl::StrCat(OtlpAccessLogStatsPrefix, config.stat_prefix())))}) { + root_ = initOtlpMessageRoot(message_, config, local_info); } std::function @@ -68,26 +59,6 @@ GrpcAccessLoggerImpl::genOTelCallbacksFactory() { return *ptr; }; } -// See comment about the structure of repeated fields in the header file. -void GrpcAccessLoggerImpl::initMessageRoot( - const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& - config, - const LocalInfo::LocalInfo& local_info) { - auto* resource_logs = message_.add_resource_logs(); - root_ = resource_logs->add_scope_logs(); - auto* resource = resource_logs->mutable_resource(); - if (!config.disable_builtin_labels()) { - *resource->add_attributes() = getStringKeyValue("log_name", config.common_config().log_name()); - *resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName()); - *resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName()); - *resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName()); - } - - for (const auto& pair : config.resource_attributes().values()) { - *resource->add_attributes() = pair; - } -} - void GrpcAccessLoggerImpl::addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) { batched_log_entries_++; root_->mutable_log_records()->Add(std::move(entry)); @@ -114,8 +85,8 @@ GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger( // exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster // availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in // the main thread if necessary to ensure it does not throw here. - auto factory_or_error = async_client_manager_.factoryForGrpcService( - config.common_config().grpc_service(), scope_, true); + auto factory_or_error = + async_client_manager_.factoryForGrpcService(getGrpcService(config), scope_, true); THROW_IF_NOT_OK_REF(factory_or_error.status()); auto client = THROW_OR_RETURN_VALUE(factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr); diff --git a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h index 49e102a1bab86..3bb8432aed178 100644 --- a/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h +++ b/source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h @@ -81,10 +81,6 @@ class GrpcAccessLoggerImpl std::function deletion_; }; - void initMessageRoot( - const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& - config, - const LocalInfo::LocalInfo& local_info); // Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger void addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) override; // Non used addEntry method (the above is used for both TCP and HTTP). diff --git a/source/extensions/access_loggers/open_telemetry/http_access_log_impl.cc b/source/extensions/access_loggers/open_telemetry/http_access_log_impl.cc new file mode 100644 index 0000000000000..a442b11a309f7 --- /dev/null +++ b/source/extensions/access_loggers/open_telemetry/http_access_log_impl.cc @@ -0,0 +1,239 @@ +#include "source/extensions/access_loggers/open_telemetry/http_access_log_impl.h" + +#include +#include +#include +#include + +#include "envoy/config/core/v3/base.pb.h" +#include "envoy/data/accesslog/v3/accesslog.pb.h" +#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h" + +#include "source/common/common/enum_to_int.h" +#include "source/common/common/logger.h" +#include "source/common/config/utility.h" +#include "source/common/formatter/substitution_formatter.h" +#include "source/common/http/headers.h" +#include "source/common/network/utility.h" +#include "source/common/protobuf/message_validator_impl.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/protobuf/utility.h" +#include "source/common/stream_info/utility.h" +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" +#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { + +HttpAccessLoggerImpl::HttpAccessLoggerImpl( + Upstream::ClusterManager& cluster_manager, + const envoy::config::core::v3::HttpService& http_service, + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) + : cluster_manager_(cluster_manager), http_service_(http_service), + buffer_flush_interval_(getBufferFlushInterval(config)), + max_buffer_size_bytes_(getBufferSizeBytes(config)), + stats_({ALL_OTLP_ACCESS_LOG_STATS(POOL_COUNTER_PREFIX( + scope, absl::StrCat(OtlpAccessLogStatsPrefix, config.stat_prefix())))}) { + + // Prepares and stores headers to be used later on each export request. + for (const auto& header_value_option : http_service_.request_headers_to_add()) { + parsed_headers_to_add_.push_back({Http::LowerCaseString(header_value_option.header().key()), + header_value_option.header().value()}); + } + + root_ = initOtlpMessageRoot(message_, config, local_info); + + // Sets up the flush timer. + flush_timer_ = dispatcher.createTimer([this]() { + flush(); + flush_timer_->enableTimer(buffer_flush_interval_); + }); + flush_timer_->enableTimer(buffer_flush_interval_); +} + +void HttpAccessLoggerImpl::log(opentelemetry::proto::logs::v1::LogRecord&& entry) { + approximate_message_size_bytes_ += entry.ByteSizeLong(); + batched_log_entries_++; + root_->mutable_log_records()->Add(std::move(entry)); + + if (approximate_message_size_bytes_ >= max_buffer_size_bytes_) { + flush(); + } +} + +void HttpAccessLoggerImpl::flush() { + if (root_->log_records().empty()) { + return; + } + + std::string request_body; + const auto ok = message_.SerializeToString(&request_body); + if (!ok) { + ENVOY_LOG(warn, "Error while serializing the binary proto ExportLogsServiceRequest."); + root_->clear_log_records(); + approximate_message_size_bytes_ = 0; + return; + } + + const auto thread_local_cluster = + cluster_manager_.getThreadLocalCluster(http_service_.http_uri().cluster()); + if (thread_local_cluster == nullptr) { + ENVOY_LOG(error, "OTLP HTTP access log exporter failed: [cluster = {}] is not configured", + http_service_.http_uri().cluster()); + root_->clear_log_records(); + approximate_message_size_bytes_ = 0; + return; + } + + Http::RequestMessagePtr message = Http::Utility::prepareHeaders(http_service_.http_uri()); + + // The request follows the OTLP HTTP specification: + // https://github.com/open-telemetry/opentelemetry-proto/blob/v1.9.0/docs/specification.md#otlphttp. + message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post); + message->headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Protobuf); + + // User-Agent header follows the OTLP specification. + message->headers().setReferenceUserAgent(getOtlpUserAgentHeader()); + + // Adds all custom headers to the request. + for (const auto& header_pair : parsed_headers_to_add_) { + message->headers().setReference(header_pair.first, header_pair.second); + } + message->body().add(request_body); + + const auto options = + Http::AsyncClient::RequestOptions() + .setTimeout(std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(http_service_.http_uri().timeout()))) + .setDiscardResponseBody(true); + + Http::AsyncClient::Request* in_flight_request = + thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options); + + if (in_flight_request != nullptr) { + active_requests_.add(*in_flight_request); + in_flight_log_entries_ = batched_log_entries_; + } else { + stats_.logs_dropped_.add(batched_log_entries_); + } + + root_->clear_log_records(); + approximate_message_size_bytes_ = 0; + batched_log_entries_ = 0; +} + +void HttpAccessLoggerImpl::onSuccess(const Http::AsyncClient::Request& request, + Http::ResponseMessagePtr&& http_response) { + active_requests_.remove(request); + const auto response_code = Http::Utility::getResponseStatus(http_response->headers()); + if (response_code == enumToInt(Http::Code::OK)) { + stats_.logs_written_.add(in_flight_log_entries_); + } else { + ENVOY_LOG(error, + "OTLP HTTP access log exporter received a non-success status code: {} while " + "exporting the OTLP message", + response_code); + stats_.logs_dropped_.add(in_flight_log_entries_); + } + in_flight_log_entries_ = 0; +} + +void HttpAccessLoggerImpl::onFailure(const Http::AsyncClient::Request& request, + Http::AsyncClient::FailureReason reason) { + active_requests_.remove(request); + ENVOY_LOG(warn, "OTLP HTTP access log export request failed. Failure reason: {}", + enumToInt(reason)); + stats_.logs_dropped_.add(in_flight_log_entries_); + in_flight_log_entries_ = 0; +} + +HttpAccessLoggerCacheImpl::HttpAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager, + Stats::Scope& scope, + ThreadLocal::SlotAllocator& tls, + const LocalInfo::LocalInfo& local_info) + : cluster_manager_(cluster_manager), scope_(scope), tls_slot_(tls.allocateSlot()), + local_info_(local_info) { + tls_slot_->set( + [](Event::Dispatcher& dispatcher) { return std::make_shared(dispatcher); }); +} + +HttpAccessLoggerImpl::SharedPtr HttpAccessLoggerCacheImpl::getOrCreateLogger( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + const envoy::config::core::v3::HttpService& http_service) { + auto& cache = tls_slot_->getTyped(); + const std::size_t config_hash = MessageUtil::hash(config) ^ MessageUtil::hash(http_service); + + const auto it = cache.access_loggers_.find(config_hash); + if (it != cache.access_loggers_.end()) { + return it->second; + } + + auto logger = std::make_shared(cluster_manager_, http_service, config, + cache.dispatcher_, local_info_, scope_); + cache.access_loggers_.emplace(config_hash, logger); + return logger; +} + +HttpAccessLog::ThreadLocalLogger::ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger) + : logger_(std::move(logger)) {} + +HttpAccessLog::HttpAccessLog( + ::Envoy::AccessLog::FilterPtr&& filter, + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config, + ThreadLocal::SlotAllocator& tls, HttpAccessLoggerCacheSharedPtr access_logger_cache, + const std::vector& commands) + : Common::ImplBase(std::move(filter)), tls_slot_(tls.allocateSlot()), + access_logger_cache_(std::move(access_logger_cache)), http_service_(config.http_service()), + filter_state_objects_to_log_(getFilterStateObjectsToLog(config)), + custom_tags_(getCustomTags(config)) { + + tls_slot_->set([this, config](Event::Dispatcher&) { + return std::make_shared( + access_logger_cache_->getOrCreateLogger(config, http_service_)); + }); + + // Packs the body "AnyValue" to a "KeyValueList" only if it's not empty. Otherwise the + // formatter would fail to parse it. + if (config.body().value_case() != ::opentelemetry::proto::common::v1::AnyValue::VALUE_NOT_SET) { + body_formatter_ = std::make_unique(packBody(config.body()), commands); + } + attributes_formatter_ = std::make_unique(config.attributes(), commands); +} + +void HttpAccessLog::emitLog(const Formatter::Context& log_context, + const StreamInfo::StreamInfo& stream_info) { + opentelemetry::proto::logs::v1::LogRecord log_entry; + log_entry.set_time_unix_nano(std::chrono::duration_cast( + stream_info.startTime().time_since_epoch()) + .count()); + + // Unpacks the body "KeyValueList" to "AnyValue". + if (body_formatter_) { + const auto formatted_body = unpackBody(body_formatter_->format(log_context, stream_info)); + *log_entry.mutable_body() = formatted_body; + } + const auto formatted_attributes = attributes_formatter_->format(log_context, stream_info); + *log_entry.mutable_attributes() = formatted_attributes.values(); + + // Sets trace context (trace_id, span_id) if available. + const std::string trace_id_hex = + log_context.activeSpan().has_value() ? log_context.activeSpan()->getTraceId() : ""; + const std::string span_id_hex = + log_context.activeSpan().has_value() ? log_context.activeSpan()->getSpanId() : ""; + populateTraceContext(log_entry, trace_id_hex, span_id_hex); + + addFilterStateToAttributes(stream_info, filter_state_objects_to_log_, log_entry); + addCustomTagsToAttributes(custom_tags_, log_context, stream_info, log_entry); + + tls_slot_->getTyped().logger_->log(std::move(log_entry)); +} + +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/open_telemetry/http_access_log_impl.h b/source/extensions/access_loggers/open_telemetry/http_access_log_impl.h new file mode 100644 index 0000000000000..f7e4013996928 --- /dev/null +++ b/source/extensions/access_loggers/open_telemetry/http_access_log_impl.h @@ -0,0 +1,155 @@ +#pragma once + +#include +#include + +#include "envoy/access_log/access_log.h" +#include "envoy/config/core/v3/http_service.pb.h" +#include "envoy/event/dispatcher.h" +#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h" +#include "envoy/local_info/local_info.h" +#include "envoy/singleton/instance.h" +#include "envoy/thread_local/thread_local.h" +#include "envoy/upstream/cluster_manager.h" + +#include "source/common/common/logger.h" +#include "source/common/http/async_client_impl.h" +#include "source/common/http/async_client_utility.h" +#include "source/common/http/headers.h" +#include "source/common/http/message_impl.h" +#include "source/common/http/utility.h" +#include "source/common/protobuf/protobuf.h" +#include "source/common/tracing/custom_tag_impl.h" +#include "source/extensions/access_loggers/common/access_log_base.h" +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" +#include "source/extensions/access_loggers/open_telemetry/substitution_formatter.h" + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/logs/v1/logs.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { + +/** + * HTTP access logger that exports OTLP logs over HTTP. + * Follows the same pattern as OpenTelemetryHttpTraceExporter. + */ +class HttpAccessLoggerImpl : public Logger::Loggable, + public Http::AsyncClient::Callbacks { +public: + HttpAccessLoggerImpl( + Upstream::ClusterManager& cluster_manager, + const envoy::config::core::v3::HttpService& http_service, + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope); + + using SharedPtr = std::shared_ptr; + + /** + * Log a single log entry. Batches entries and flushes periodically. + */ + void log(opentelemetry::proto::logs::v1::LogRecord&& entry); + + // Http::AsyncClient::Callbacks. + void onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&&) override; + void onFailure(const Http::AsyncClient::Request&, Http::AsyncClient::FailureReason) override; + void onBeforeFinalizeUpstreamSpan(Tracing::Span&, const Http::ResponseHeaderMap*) override {} + +private: + void flush(); + + Upstream::ClusterManager& cluster_manager_; + envoy::config::core::v3::HttpService http_service_; + // Track active HTTP requests to be able to cancel them on destruction. + Http::AsyncClientRequestTracker active_requests_; + std::vector> parsed_headers_to_add_; + + // Message structure: ExportLogsServiceRequest -> ResourceLogs -> ScopeLogs -> LogRecord. + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message_; + opentelemetry::proto::logs::v1::ScopeLogs* root_; + + // Batching timer. + Event::TimerPtr flush_timer_; + const std::chrono::milliseconds buffer_flush_interval_; + const uint64_t max_buffer_size_bytes_; + uint64_t approximate_message_size_bytes_ = 0; + + OtlpAccessLogStats stats_; + uint32_t batched_log_entries_ = 0; + uint32_t in_flight_log_entries_ = 0; +}; + +/** + * Cache for HTTP access loggers. Creates one logger per unique configuration. + */ +class HttpAccessLoggerCacheImpl : public Singleton::Instance, + public Logger::Loggable { +public: + HttpAccessLoggerCacheImpl(Upstream::ClusterManager& cluster_manager, Stats::Scope& scope, + ThreadLocal::SlotAllocator& tls, + const LocalInfo::LocalInfo& local_info); + + HttpAccessLoggerImpl::SharedPtr getOrCreateLogger( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + const envoy::config::core::v3::HttpService& http_service); + +private: + struct ThreadLocalCache : public ThreadLocal::ThreadLocalObject { + ThreadLocalCache(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} + Event::Dispatcher& dispatcher_; + absl::flat_hash_map access_loggers_; + }; + + Upstream::ClusterManager& cluster_manager_; + Stats::Scope& scope_; + ThreadLocal::SlotPtr tls_slot_; + const LocalInfo::LocalInfo& local_info_; +}; + +using HttpAccessLoggerCacheSharedPtr = std::shared_ptr; + +/** + * Access log instance that streams logs over HTTP. + */ +class HttpAccessLog : public Common::ImplBase { +public: + HttpAccessLog( + ::Envoy::AccessLog::FilterPtr&& filter, + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config, + ThreadLocal::SlotAllocator& tls, HttpAccessLoggerCacheSharedPtr access_logger_cache, + const std::vector& commands); + +private: + /** + * Per-thread cached logger. + */ + struct ThreadLocalLogger : public ThreadLocal::ThreadLocalObject { + ThreadLocalLogger(HttpAccessLoggerImpl::SharedPtr logger); + + const HttpAccessLoggerImpl::SharedPtr logger_; + }; + + // Common::ImplBase + void emitLog(const Formatter::Context& context, const StreamInfo::StreamInfo& info) override; + + const ThreadLocal::SlotPtr tls_slot_; + const HttpAccessLoggerCacheSharedPtr access_logger_cache_; + const envoy::config::core::v3::HttpService http_service_; + std::unique_ptr body_formatter_; + std::unique_ptr attributes_formatter_; + const std::vector filter_state_objects_to_log_; + const std::vector custom_tags_; +}; + +using HttpAccessLogPtr = std::unique_ptr; + +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/open_telemetry/otlp_log_utils.cc b/source/extensions/access_loggers/open_telemetry/otlp_log_utils.cc new file mode 100644 index 0000000000000..29fd6c7d04454 --- /dev/null +++ b/source/extensions/access_loggers/open_telemetry/otlp_log_utils.cc @@ -0,0 +1,213 @@ +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" + +#include +#include + +#include "envoy/data/accesslog/v3/accesslog.pb.h" + +#include "source/common/common/assert.h" +#include "source/common/common/macros.h" +#include "source/common/http/header_map_impl.h" +#include "source/common/protobuf/utility.h" +#include "source/common/tracing/custom_tag_impl.h" +#include "source/common/tracing/http_tracer_impl.h" +#include "source/common/version/version.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { + +opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key, + const std::string& value) { + opentelemetry::proto::common::v1::KeyValue keyValue; + keyValue.set_key(key); + keyValue.mutable_value()->set_string_value(value); + return keyValue; +} + +::opentelemetry::proto::common::v1::KeyValueList +packBody(const ::opentelemetry::proto::common::v1::AnyValue& body) { + ::opentelemetry::proto::common::v1::KeyValueList output; + auto* kv = output.add_values(); + kv->set_key(std::string(BodyKey)); + *kv->mutable_value() = body; + return output; +} + +::opentelemetry::proto::common::v1::AnyValue +unpackBody(const ::opentelemetry::proto::common::v1::KeyValueList& value) { + ASSERT(value.values().size() == 1 && value.values(0).key() == BodyKey); + return value.values(0).value(); +} + +// User-Agent header follows the OTLP specification: +// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.52.0/specification/protocol/exporter.md#user-agent +const std::string& getOtlpUserAgentHeader() { + CONSTRUCT_ON_FIRST_USE(std::string, "OTel-OTLP-Exporter-Envoy/" + VersionInfo::version()); +} + +void populateTraceContext(opentelemetry::proto::logs::v1::LogRecord& log_entry, + const std::string& trace_id_hex, const std::string& span_id_hex) { + // Sets trace_id if available. OpenTelemetry trace_id is a 16-byte array, and backends + // (e.g. OTel-collector) will reject requests if the length is incorrect. Some trace + // providers (e.g. Zipkin) return a 64-bit hex string, which must be padded to 128-bit. + if (trace_id_hex.size() == TraceIdHexLength) { + *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id_hex); + } else if (trace_id_hex.size() == ShortTraceIdHexLength) { + const auto trace_id = absl::StrCat(Hex::uint64ToHex(0), trace_id_hex); + *log_entry.mutable_trace_id() = absl::HexStringToBytes(trace_id); + } + // Sets span_id if available. + if (!span_id_hex.empty()) { + *log_entry.mutable_span_id() = absl::HexStringToBytes(span_id_hex); + } +} + +const std::string& getLogName( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + // Prefer top-level log_name, fall back to common_config.log_name (deprecated). + if (!config.log_name().empty()) { + return config.log_name(); + } + return config.common_config().log_name(); +} + +const envoy::config::core::v3::GrpcService& getGrpcService( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + // Prefer top-level grpc_service, fall back to common_config.grpc_service (deprecated). + if (config.has_grpc_service()) { + return config.grpc_service(); + } + return config.common_config().grpc_service(); +} + +std::chrono::milliseconds getBufferFlushInterval( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + if (config.has_buffer_flush_interval()) { + return std::chrono::milliseconds( + DurationUtil::durationToMilliseconds(config.buffer_flush_interval())); + } + return DefaultBufferFlushInterval; +} + +uint64_t getBufferSizeBytes( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + if (config.has_buffer_size_bytes()) { + return config.buffer_size_bytes().value(); + } + return DefaultMaxBufferSizeBytes; +} + +std::vector getFilterStateObjectsToLog( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + return std::vector(config.filter_state_objects_to_log().begin(), + config.filter_state_objects_to_log().end()); +} + +std::vector getCustomTags( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config) { + std::vector custom_tags; + for (const auto& custom_tag : config.custom_tags()) { + custom_tags.push_back(Tracing::CustomTagUtility::createCustomTag(custom_tag)); + } + return custom_tags; +} + +void addFilterStateToAttributes(const StreamInfo::StreamInfo& stream_info, + const std::vector& filter_state_objects_to_log, + opentelemetry::proto::logs::v1::LogRecord& log_entry) { + for (const auto& key : filter_state_objects_to_log) { + const StreamInfo::FilterState* filter_state = &stream_info.filterState(); + // Check downstream filter state first, then upstream. + if (auto state = filter_state->getDataReadOnlyGeneric(key); state != nullptr) { + ProtobufTypes::MessagePtr serialized_proto = state->serializeAsProto(); + if (serialized_proto != nullptr) { + auto json_or_error = MessageUtil::getJsonStringFromMessage(*serialized_proto); + if (json_or_error.ok()) { + auto* attr = log_entry.add_attributes(); + attr->set_key(key); + attr->mutable_value()->set_string_value(json_or_error.value()); + } + } + } else if (stream_info.upstreamInfo().has_value() && + stream_info.upstreamInfo()->upstreamFilterState() != nullptr) { + if (auto upstream_state = + stream_info.upstreamInfo()->upstreamFilterState()->getDataReadOnlyGeneric(key); + upstream_state != nullptr) { + ProtobufTypes::MessagePtr serialized_proto = upstream_state->serializeAsProto(); + if (serialized_proto != nullptr) { + auto json_or_error = MessageUtil::getJsonStringFromMessage(*serialized_proto); + if (json_or_error.ok()) { + auto* attr = log_entry.add_attributes(); + attr->set_key(key); + attr->mutable_value()->set_string_value(json_or_error.value()); + } + } + } + } + } +} + +void addCustomTagsToAttributes(const std::vector& custom_tags, + const Formatter::Context& context, + const StreamInfo::StreamInfo& stream_info, + opentelemetry::proto::logs::v1::LogRecord& log_entry) { + if (custom_tags.empty()) { + return; + } + + // Create empty header map if request headers not available. + const Http::RequestHeaderMap* headers_ptr = + context.requestHeaders().has_value() + ? &static_cast(context.requestHeaders().value()) + : Http::StaticEmptyHeaders::get().request_headers.get(); + const Http::RequestHeaderMap& headers = *headers_ptr; + + Tracing::ReadOnlyHttpTraceContext trace_context(headers); + Tracing::CustomTagContext tag_context{trace_context, stream_info, context}; + + // Use a temporary AccessLogCommon to extract custom tag values via applyLog. + envoy::data::accesslog::v3::AccessLogCommon temp_log; + for (const auto& custom_tag : custom_tags) { + custom_tag->applyLog(temp_log, tag_context); + } + + // Copy custom tags to OTLP attributes. + for (const auto& [key, value] : temp_log.custom_tags()) { + auto* attr = log_entry.add_attributes(); + attr->set_key(key); + attr->mutable_value()->set_string_value(value); + } +} + +opentelemetry::proto::logs::v1::ScopeLogs* initOtlpMessageRoot( + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest& message, + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + const LocalInfo::LocalInfo& local_info) { + auto* resource_logs = message.add_resource_logs(); + auto* root = resource_logs->add_scope_logs(); + auto* resource = resource_logs->mutable_resource(); + if (!config.disable_builtin_labels()) { + *resource->add_attributes() = getStringKeyValue("log_name", getLogName(config)); + *resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName()); + *resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName()); + *resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName()); + } + for (const auto& pair : config.resource_attributes().values()) { + *resource->add_attributes() = pair; + } + return root; +} + +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/access_loggers/open_telemetry/otlp_log_utils.h b/source/extensions/access_loggers/open_telemetry/otlp_log_utils.h new file mode 100644 index 0000000000000..cf85f319d69ca --- /dev/null +++ b/source/extensions/access_loggers/open_telemetry/otlp_log_utils.h @@ -0,0 +1,118 @@ +#pragma once + +#include +#include +#include + +#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h" +#include "envoy/formatter/http_formatter_context.h" +#include "envoy/local_info/local_info.h" +#include "envoy/stats/stats_macros.h" +#include "envoy/stream_info/filter_state.h" +#include "envoy/stream_info/stream_info.h" + +#include "source/common/common/hex.h" +#include "source/common/tracing/custom_tag_impl.h" + +#include "absl/strings/escaping.h" +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/common/v1/common.pb.h" +#include "opentelemetry/proto/logs/v1/logs.pb.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { + +// Default buffer flush interval (1 second). +constexpr std::chrono::milliseconds DefaultBufferFlushInterval{1000}; +// Default max buffer size (16KB). +constexpr uint64_t DefaultMaxBufferSizeBytes = 16384; + +// Key used to pack/unpack the body AnyValue to a KeyValueList. +constexpr absl::string_view BodyKey = "body"; + +// OpenTelemetry trace ID length in hex (128-bit = 32 hex chars). +constexpr size_t TraceIdHexLength = 32; +// Zipkin-style trace ID length in hex (64-bit = 16 hex chars). +constexpr size_t ShortTraceIdHexLength = 16; + +constexpr absl::string_view OtlpAccessLogStatsPrefix = "access_logs.open_telemetry_access_log."; + +#define ALL_OTLP_ACCESS_LOG_STATS(COUNTER) \ + COUNTER(logs_written) \ + COUNTER(logs_dropped) + +struct OtlpAccessLogStats { + ALL_OTLP_ACCESS_LOG_STATS(GENERATE_COUNTER_STRUCT) +}; + +// Creates a KeyValue protobuf with a string value. +opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string& key, + const std::string& value); + +// Packs the body "AnyValue" to a "KeyValueList" with a single key. +::opentelemetry::proto::common::v1::KeyValueList +packBody(const ::opentelemetry::proto::common::v1::AnyValue& body); + +// Unpacks the body "AnyValue" from a "KeyValueList". +::opentelemetry::proto::common::v1::AnyValue +unpackBody(const ::opentelemetry::proto::common::v1::KeyValueList& value); + +// User-Agent header per OTLP specification. +const std::string& getOtlpUserAgentHeader(); + +// Populates trace context (trace_id, span_id) on a LogRecord. +// Handles 128-bit (32 hex chars) and 64-bit Zipkin-style (16 hex chars) trace IDs. +void populateTraceContext(opentelemetry::proto::logs::v1::LogRecord& log_entry, + const std::string& trace_id_hex, const std::string& span_id_hex); + +// Returns log_name, with fallback to common_config.log_name. +const std::string& getLogName( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +// Returns grpc_service, with fallback to common_config.grpc_service. +const envoy::config::core::v3::GrpcService& getGrpcService( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +std::chrono::milliseconds getBufferFlushInterval( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +uint64_t getBufferSizeBytes( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +std::vector getFilterStateObjectsToLog( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +std::vector getCustomTags( + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config); + +void addFilterStateToAttributes(const StreamInfo::StreamInfo& stream_info, + const std::vector& filter_state_objects_to_log, + opentelemetry::proto::logs::v1::LogRecord& log_entry); + +void addCustomTagsToAttributes(const std::vector& custom_tags, + const Formatter::Context& context, + const StreamInfo::StreamInfo& stream_info, + opentelemetry::proto::logs::v1::LogRecord& log_entry); + +// Initializes the OTLP message root structure with resource attributes. +// Returns a pointer to the ScopeLogs where log records should be added. +opentelemetry::proto::logs::v1::ScopeLogs* initOtlpMessageRoot( + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest& message, + const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig& + config, + const LocalInfo::LocalInfo& local_info); + +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/coverage.yaml b/test/coverage.yaml index 2c3f99d63343c..28757821053ca 100644 --- a/test/coverage.yaml +++ b/test/coverage.yaml @@ -27,6 +27,7 @@ directories: source/exe: 94.4 # increased by #32346, need coverage for terminate_handler and hot restart failures source/extensions/api_listeners: 55.0 # Many IS_ENVOY_BUG are not covered. source/extensions/api_listeners/default_api_listener: 67.8 # Many IS_ENVOY_BUG are not covered. + source/extensions/access_loggers/open_telemetry: 96.4 # Serialization failure path is untestable source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface: 96.3 source/extensions/common/aws: 98.5 source/extensions/common/aws/credential_providers: 100.0 diff --git a/test/extensions/access_loggers/open_telemetry/BUILD b/test/extensions/access_loggers/open_telemetry/BUILD index f480f03c84b53..e079ff07c1ff3 100644 --- a/test/extensions/access_loggers/open_telemetry/BUILD +++ b/test/extensions/access_loggers/open_telemetry/BUILD @@ -13,6 +13,23 @@ licenses(["notice"]) # Apache 2 envoy_package() +envoy_extension_cc_test( + name = "otlp_log_utils_test", + srcs = ["otlp_log_utils_test.cc"], + extension_names = ["envoy.access_loggers.open_telemetry"], + deps = [ + "//source/common/http:header_map_lib", + "//source/common/router:string_accessor_lib", + "//source/common/stream_info:filter_state_lib", + "//source/extensions/access_loggers/open_telemetry:otlp_log_utils_lib", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/stream_info:stream_info_mocks", + "//test/test_common:utility_lib", + "@opentelemetry_proto//:logs_proto_cc", + "@opentelemetry_proto//:logs_service_proto_cc", + ], +) + envoy_extension_cc_test( name = "grpc_access_log_impl_test", srcs = ["grpc_access_log_impl_test.cc"], @@ -33,6 +50,22 @@ envoy_extension_cc_test( ], ) +envoy_extension_cc_test( + name = "http_access_log_impl_test", + srcs = ["http_access_log_impl_test.cc"], + extension_names = ["envoy.access_loggers.open_telemetry"], + rbe_pool = "6gig", + deps = [ + "//source/extensions/access_loggers/open_telemetry:http_access_log_lib", + "//test/mocks/event:event_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/stats:stats_mocks", + "//test/mocks/thread_local:thread_local_mocks", + "//test/mocks/upstream:cluster_manager_mocks", + "//test/test_common:utility_lib", + ], +) + envoy_extension_cc_test( name = "access_log_impl_test", srcs = ["access_log_impl_test.cc"], diff --git a/test/extensions/access_loggers/open_telemetry/access_log_integration_test.cc b/test/extensions/access_loggers/open_telemetry/access_log_integration_test.cc index 6cfa3d8e29529..6af5b78688434 100644 --- a/test/extensions/access_loggers/open_telemetry/access_log_integration_test.cc +++ b/test/extensions/access_loggers/open_telemetry/access_log_integration_test.cc @@ -12,6 +12,7 @@ #include "test/integration/http_integration.h" #include "test/test_common/utility.h" +#include "absl/strings/match.h" #include "gtest/gtest.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" @@ -46,16 +47,37 @@ constexpr char EXPECTED_REQUEST_MESSAGE[] = R"EOF( namespace Envoy { namespace { -class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, - public HttpIntegrationTest { +enum class ExporterType { GRPC, HTTP }; + +struct TransportDriver { + std::function + configureExporter; + std::function + waitForRequest; + std::function sendResponse; +}; + +class AccessLogIntegrationTest + : public Grpc::BaseGrpcClientIntegrationParamTest, + public testing::TestWithParam< + std::tuple>, + public HttpIntegrationTest { + TransportDriver driver_; + public: - AccessLogIntegrationTest() : HttpIntegrationTest(Http::CodecType::HTTP1, ipVersion()) { - // TODO(ggreenway): add tag extraction rules. - // Missing stat tag-extraction rule for stat 'grpc.accesslog.streams_closed_1' and stat_prefix - // 'accesslog'. + AccessLogIntegrationTest() + : HttpIntegrationTest(Http::CodecType::HTTP1, std::get<0>(GetParam())) { skip_tag_extraction_rule_check_ = true; + driver_ = (std::get<2>(GetParam()) == ExporterType::GRPC) ? makeGrpcDriver() : makeHttpDriver(); } + Network::Address::IpVersion ipVersion() const override { return std::get<0>(GetParam()); } + Grpc::ClientType clientType() const override { return std::get<1>(GetParam()); } + void createUpstreams() override { HttpIntegrationTest::createUpstreams(); addFakeUpstream(Http::CodecType::HTTP2); @@ -74,15 +96,12 @@ class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& hcm) { auto* access_log = hcm.add_access_log(); - access_log->set_name("grpc_accesslog"); + access_log->set_name("otel_accesslog"); envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; - auto* common_config = config.mutable_common_config(); - common_config->set_log_name("foo"); - common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); - setGrpcService(*common_config->mutable_grpc_service(), "accesslog", - fake_upstreams_.back()->localAddress()); + config.set_log_name("foo"); + driver_.configureExporter(config, fake_upstreams_.back()->localAddress()); auto* body_config = config.mutable_body(); body_config->set_string_value("%REQ(:METHOD)% %PROTOCOL% %RESPONSE_CODE%"); auto* attr_config = config.mutable_attributes(); @@ -108,11 +127,7 @@ class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, ABSL_MUST_USE_RESULT AssertionResult waitForAccessLogRequest(const std::string& expected_request_msg_yaml) { opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest request_msg; - VERIFY_ASSERTION(access_log_request_->waitForGrpcMessage(*dispatcher_, request_msg)); - EXPECT_EQ("POST", access_log_request_->headers().getMethodValue()); - EXPECT_EQ("/opentelemetry.proto.collector.logs.v1.LogsService/Export", - access_log_request_->headers().getPathValue()); - EXPECT_EQ("application/grpc", access_log_request_->headers().getContentTypeValue()); + VERIFY_ASSERTION(driver_.waitForRequest(access_log_request_, *dispatcher_, request_msg)); opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest expected_request_msg; TestUtility::loadFromYaml(expected_request_msg_yaml, expected_request_msg); @@ -124,10 +139,7 @@ class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, EXPECT_TRUE(TestUtility::protoEqual(request_msg, expected_request_msg, /*ignore_repeated_field_ordering=*/false)); - opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse response; - access_log_request_->startGrpcStream(); - access_log_request_->sendGrpcMessage(response); - access_log_request_->finishGrpcStream(Grpc::Status::Ok); + driver_.sendResponse(access_log_request_); return AssertionSuccess(); } @@ -140,13 +152,66 @@ class AccessLogIntegrationTest : public Grpc::GrpcClientIntegrationParamTest, } } +private: + TransportDriver makeGrpcDriver() { + return {[this](auto& config, auto addr) { + setGrpcService(*config.mutable_grpc_service(), "accesslog", addr); + }, + [](auto& stream, auto& dispatcher, auto& request) -> AssertionResult { + VERIFY_ASSERTION(stream->waitForGrpcMessage(dispatcher, request)); + EXPECT_EQ("POST", stream->headers().getMethodValue()); + EXPECT_EQ("/opentelemetry.proto.collector.logs.v1.LogsService/Export", + stream->headers().getPathValue()); + EXPECT_EQ("application/grpc", stream->headers().getContentTypeValue()); + return AssertionSuccess(); + }, + [](auto& stream) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse response; + stream->startGrpcStream(); + stream->sendGrpcMessage(response); + stream->finishGrpcStream(Grpc::Status::Ok); + }}; + } + + TransportDriver makeHttpDriver() { + return {[this](auto& config, auto addr) { + auto* http = config.mutable_http_service(); + http->mutable_http_uri()->set_uri(fmt::format( + "http://{}:{}/v1/logs", Network::Test::getLoopbackAddressUrlString(ipVersion()), + addr->ip()->port())); + http->mutable_http_uri()->set_cluster("accesslog"); + http->mutable_http_uri()->mutable_timeout()->set_seconds(1); + }, + [](auto& stream, auto& dispatcher, auto& request) -> AssertionResult { + VERIFY_ASSERTION(stream->waitForEndStream(dispatcher)); + EXPECT_EQ("POST", stream->headers().getMethodValue()); + EXPECT_EQ("/v1/logs", stream->headers().getPathValue()); + EXPECT_EQ("application/x-protobuf", stream->headers().getContentTypeValue()); + EXPECT_TRUE(absl::StartsWith(stream->headers().getUserAgentValue(), + "OTel-OTLP-Exporter-Envoy/")); + EXPECT_TRUE(request.ParseFromString(stream->body().toString())); + return AssertionSuccess(); + }, + [](auto& stream) { + stream->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + }}; + } + FakeHttpConnectionPtr fake_access_log_connection_; FakeStreamPtr access_log_request_; }; -INSTANTIATE_TEST_SUITE_P(IpVersionsCientType, AccessLogIntegrationTest, - GRPC_CLIENT_INTEGRATION_PARAMS, - Grpc::GrpcClientIntegrationParamTest::protocolTestParamsToString); +INSTANTIATE_TEST_SUITE_P( + IpVersionsClientTypeExporterType, AccessLogIntegrationTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + testing::ValuesIn(TestEnvironment::getsGrpcVersionsForTest()), + testing::Values(ExporterType::GRPC, ExporterType::HTTP)), + [](const auto& info) { + return fmt::format("{}_{}_{}", TestUtility::ipVersionToString(std::get<0>(info.param)), + std::get<1>(info.param) == Grpc::ClientType::GoogleGrpc ? "GoogleGrpc" + : "EnvoyGrpc", + std::get<2>(info.param) == ExporterType::GRPC ? "gRPC" : "HTTP"); + }); // Test a basic full access logging flow. TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) { @@ -165,6 +230,7 @@ TEST_P(AccessLogIntegrationTest, BasicAccessLogFlow) { cleanup(); } +// Tests that access logger stats persist across listener updates. TEST_P(AccessLogIntegrationTest, AccessLoggerStatsAreIndependentOfListener) { const std::string expected_access_log_results = R"EOF( resource_logs: @@ -201,8 +267,7 @@ TEST_P(AccessLogIntegrationTest, AccessLoggerStatsAreIndependentOfListener) { ASSERT_TRUE(waitForAccessLogRequest(expected_access_log_results)); // LDS update to modify the listener and corresponding drain. - // The config has the same GRPC access logger so it is not removed from the - // cache. + // The config has the same access logger so it is not removed from the cache. { ConfigHelper new_config_helper(version_, config_helper_.bootstrap()); new_config_helper.addConfigModifier( @@ -215,7 +280,7 @@ TEST_P(AccessLogIntegrationTest, AccessLoggerStatsAreIndependentOfListener) { test_server_->waitForGaugeEq("listener_manager.total_listeners_active", 1); } - // Make another request, the existing grpc access logger should be used. + // Make another request, the existing access logger should be used. auto codec_client_ = makeHttpConnection(lookupPort("http")); auto response2 = codec_client_->makeHeaderOnlyRequest(default_request_headers_); ASSERT_TRUE(response2->waitForEndStream()); diff --git a/test/extensions/access_loggers/open_telemetry/config_test.cc b/test/extensions/access_loggers/open_telemetry/config_test.cc index 36ad90e731c5a..e2112d42d4eb2 100644 --- a/test/extensions/access_loggers/open_telemetry/config_test.cc +++ b/test/extensions/access_loggers/open_telemetry/config_test.cc @@ -30,17 +30,18 @@ class OpenTelemetryAccessLogConfigTest : public testing::Test { message_ = factory_->createEmptyConfigProto(); ASSERT_NE(nullptr, message_); + } + // Helper to set up gRPC config and expectations using top-level fields. + void setupGrpcConfig() { EXPECT_CALL(context_.server_factory_context_.cluster_manager_.async_client_manager_, factoryForGrpcService(_, _, _)) .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { return std::make_unique>(); })); - auto* common_config = access_log_config_.mutable_common_config(); - common_config->set_log_name("foo"); - common_config->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar"); - common_config->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3); + access_log_config_.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar"); + access_log_config_.set_log_name("foo"); TestUtility::jsonConvert(access_log_config_, *message_); } @@ -52,14 +53,90 @@ class OpenTelemetryAccessLogConfigTest : public testing::Test { Envoy::AccessLog::AccessLogInstanceFactory* factory_{}; }; -// Normal OK configuration. -TEST_F(OpenTelemetryAccessLogConfigTest, Ok) { +// Verifies gRPC transport configuration creates a valid access log instance. +TEST_F(OpenTelemetryAccessLogConfigTest, GrpcConfigOk) { + setupGrpcConfig(); ::Envoy::AccessLog::InstanceSharedPtr instance = factory_->createAccessLogInstance(*message_, std::move(filter_), context_); EXPECT_NE(nullptr, instance); EXPECT_NE(nullptr, dynamic_cast(instance.get())); } +// Verifies HTTP transport configuration creates a valid access log instance. +TEST_F(OpenTelemetryAccessLogConfigTest, HttpConfigOk) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + auto* http_service = config.mutable_http_service(); + http_service->mutable_http_uri()->set_uri("http://localhost:4318/v1/logs"); + http_service->mutable_http_uri()->set_cluster("otel_collector"); + http_service->mutable_http_uri()->mutable_timeout()->set_seconds(1); + + ProtobufTypes::MessagePtr http_message = factory_->createEmptyConfigProto(); + TestUtility::jsonConvert(config, *http_message); + + ::Envoy::AccessLog::InstanceSharedPtr instance = + factory_->createAccessLogInstance(*http_message, std::move(filter_), context_); + EXPECT_NE(nullptr, instance); +} + +// Verifies top-level grpc_service configuration creates a valid access log instance. +TEST_F(OpenTelemetryAccessLogConfigTest, TopLevelGrpcServiceConfigOk) { + EXPECT_CALL(context_.server_factory_context_.cluster_manager_.async_client_manager_, + factoryForGrpcService(_, _, _)) + .WillOnce(Invoke([](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + return std::make_unique>(); + })); + + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("otel_collector"); + config.set_log_name("my_access_log"); + + ProtobufTypes::MessagePtr grpc_message = factory_->createEmptyConfigProto(); + TestUtility::jsonConvert(config, *grpc_message); + + ::Envoy::AccessLog::InstanceSharedPtr instance = + factory_->createAccessLogInstance(*grpc_message, std::move(filter_), context_); + EXPECT_NE(nullptr, instance); + EXPECT_NE(nullptr, dynamic_cast(instance.get())); +} + +// Verifies that configuring both gRPC and HTTP transport throws an exception. +TEST_F(OpenTelemetryAccessLogConfigTest, BothGrpcAndHttpConfigFails) { + // Set up gRPC config using top-level field. + access_log_config_.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("bar"); + access_log_config_.set_log_name("foo"); + + // Also add HTTP config - this should cause rejection. + auto* http_service = access_log_config_.mutable_http_service(); + http_service->mutable_http_uri()->set_uri("http://localhost:4318/v1/logs"); + http_service->mutable_http_uri()->set_cluster("otel_collector"); + http_service->mutable_http_uri()->mutable_timeout()->set_seconds(1); + + ProtobufTypes::MessagePtr both_message = factory_->createEmptyConfigProto(); + TestUtility::jsonConvert(access_log_config_, *both_message); + + EXPECT_THROW_WITH_MESSAGE( + factory_->createAccessLogInstance(*both_message, std::move(filter_), context_), + EnvoyException, + "OpenTelemetry access logger can only have one transport configured. " + "Specify exactly one of: grpc_service, http_service, or common_config.grpc_service."); +} + +// Verifies that missing transport config throws an exception. +TEST_F(OpenTelemetryAccessLogConfigTest, NoTransportConfigFails) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_log_name("my_access_log"); + // No transport configured. + + ProtobufTypes::MessagePtr no_transport_message = factory_->createEmptyConfigProto(); + TestUtility::jsonConvert(config, *no_transport_message); + + EXPECT_THROW_WITH_MESSAGE( + factory_->createAccessLogInstance(*no_transport_message, std::move(filter_), context_), + EnvoyException, + "OpenTelemetry access logger requires one of: grpc_service, http_service, or " + "common_config.grpc_service to be configured."); +} + } // namespace } // namespace OpenTelemetry } // namespace AccessLoggers diff --git a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc index 0013291aed670..1e4b909f08c26 100644 --- a/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc +++ b/test/extensions/access_loggers/open_telemetry/grpc_access_log_impl_test.cc @@ -475,6 +475,77 @@ TEST_F(GrpcAccessLoggerDisableBuiltinImplTest, WithResourceAttributes) { 1); } +// Test that top-level log_name is preferred over common_config.log_name. +class GrpcAccessLoggerTopLevelLogNameTest : public testing::Test { +public: + GrpcAccessLoggerTopLevelLogNameTest() + : async_client_(new Grpc::MockAsyncClient), factory_(new Grpc::MockAsyncClientFactory), + logger_cache_(async_client_manager_, scope_, tls_, local_info_), + grpc_access_logger_impl_test_helper_(local_info_, async_client_, true) { + EXPECT_CALL(async_client_manager_, factoryForGrpcService(_, _, true)) + .WillOnce(Invoke([this](const envoy::config::core::v3::GrpcService&, Stats::Scope&, bool) { + EXPECT_CALL(*factory_, createUncachedRawAsyncClient()).WillOnce(Invoke([this] { + return Grpc::RawAsyncClientPtr{async_client_}; + })); + return Grpc::AsyncClientFactoryPtr{factory_}; + })); + } + + Grpc::MockAsyncClient* async_client_; + Grpc::MockAsyncClientFactory* factory_; + Grpc::MockAsyncClientManager async_client_manager_; + LocalInfo::MockLocalInfo local_info_; + NiceMock stats_store_; + Stats::Scope& scope_{*stats_store_.rootScope()}; + NiceMock tls_; + GrpcAccessLoggerCacheImpl logger_cache_; + GrpcAccessLoggerImplTestHelper grpc_access_logger_impl_test_helper_; +}; + +// Verifies that top-level log_name takes precedence over common_config.log_name. +TEST_F(GrpcAccessLoggerTopLevelLogNameTest, TopLevelLogNamePreferred) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + // Set both top-level and common_config log_name. + config.set_log_name("top_level_log_name"); + config.mutable_common_config()->set_log_name("common_config_log_name"); + config.mutable_common_config()->set_transport_api_version( + envoy::config::core::v3::ApiVersion::V3); + // Force a flush for every log entry. + config.mutable_common_config()->mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES); + + GrpcAccessLoggerSharedPtr logger = + logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP); + // Verify that top_level_log_name is used, not common_config_log_name. + grpc_access_logger_impl_test_helper_.expectSentMessage(R"EOF( + resource_logs: + resource: + attributes: + - key: "log_name" + value: + string_value: "top_level_log_name" + - key: "zone_name" + value: + string_value: "zone_name" + - key: "cluster_name" + value: + string_value: "cluster_name" + - key: "node_name" + value: + string_value: "node_name" + scope_logs: + - log_records: + - severity_text: "test-severity-text" + )EOF"); + opentelemetry::proto::logs::v1::LogRecord entry; + entry.set_severity_text("test-severity-text"); + logger->log(opentelemetry::proto::logs::v1::LogRecord(entry)); + EXPECT_EQ(stats_store_.findCounterByString("access_logs.open_telemetry_access_log.logs_written") + .value() + .get() + .value(), + 1); +} + } // namespace } // namespace OpenTelemetry } // namespace AccessLoggers diff --git a/test/extensions/access_loggers/open_telemetry/http_access_log_impl_test.cc b/test/extensions/access_loggers/open_telemetry/http_access_log_impl_test.cc new file mode 100644 index 0000000000000..d4d2dfa4c770b --- /dev/null +++ b/test/extensions/access_loggers/open_telemetry/http_access_log_impl_test.cc @@ -0,0 +1,370 @@ +#include "source/extensions/access_loggers/open_telemetry/http_access_log_impl.h" + +#include "test/mocks/event/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/stats/mocks.h" +#include "test/mocks/thread_local/mocks.h" +#include "test/mocks/upstream/cluster_manager.h" +#include "test/test_common/utility.h" + +#include "absl/strings/match.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { + +using testing::_; +using testing::Invoke; +using testing::Return; +using testing::ReturnRef; + +const std::string ZONE_NAME = "test_zone"; +const std::string CLUSTER_NAME = "test_cluster"; +const std::string NODE_NAME = "test_node"; + +class HttpAccessLoggerImplTest : public testing::Test { +public: + HttpAccessLoggerImplTest() : timer_(new Event::MockTimer(&dispatcher_)) { + EXPECT_CALL(*timer_, enableTimer(_, _)).Times(testing::AnyNumber()); + } + + void setup(envoy::config::core::v3::HttpService http_service) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + setupWithConfig(http_service, config); + } + + void setupWithConfig( + envoy::config::core::v3::HttpService http_service, + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config) { + cluster_manager_.thread_local_cluster_.cluster_.info_->name_ = "my_o11y_backend"; + cluster_manager_.initializeThreadLocalClusters({"my_o11y_backend"}); + ON_CALL(cluster_manager_.thread_local_cluster_, httpAsyncClient()) + .WillByDefault(ReturnRef(cluster_manager_.thread_local_cluster_.async_client_)); + + cluster_manager_.initializeClusters({"my_o11y_backend"}, {}); + + ON_CALL(local_info_, zoneName()).WillByDefault(ReturnRef(ZONE_NAME)); + ON_CALL(local_info_, clusterName()).WillByDefault(ReturnRef(CLUSTER_NAME)); + ON_CALL(local_info_, nodeName()).WillByDefault(ReturnRef(NODE_NAME)); + + http_access_logger_ = + std::make_unique(cluster_manager_, http_service, config, dispatcher_, + local_info_, *stats_store_.rootScope()); + } + +protected: + NiceMock cluster_manager_; + NiceMock dispatcher_; + Event::MockTimer* timer_; + NiceMock local_info_; + NiceMock stats_store_; + std::unique_ptr http_access_logger_; +}; + +// Verifies OTLP HTTP export with custom headers, proper method, content-type, and user-agent. +TEST_F(HttpAccessLoggerImplTest, CreateExporterAndExportLog) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + request_headers_to_add: + - header: + key: "Authorization" + value: "auth-token" + - header: + key: "x-custom-header" + value: "custom-value" + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + Http::MockAsyncClientRequest request(&cluster_manager_.thread_local_cluster_.async_client_); + Http::AsyncClient::Callbacks* callback; + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, + send_(_, _, + Http::AsyncClient::RequestOptions() + .setTimeout(std::chrono::milliseconds(250)) + .setDiscardResponseBody(true))) + .WillOnce( + Invoke([&](Http::RequestMessagePtr& message, Http::AsyncClient::Callbacks& callbacks, + const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { + callback = &callbacks; + + // Verify OTLP HTTP spec compliance: POST method and protobuf content-type. + EXPECT_EQ(Http::Headers::get().MethodValues.Post, message->headers().getMethodValue()); + EXPECT_EQ(Http::Headers::get().ContentTypeValues.Protobuf, + message->headers().getContentTypeValue()); + + EXPECT_EQ("/otlp/v1/logs", message->headers().getPathValue()); + EXPECT_EQ("some-o11y.com", message->headers().getHostValue()); + + // Verify User-Agent follows OTLP spec. + EXPECT_TRUE(absl::StartsWith(message->headers().getUserAgentValue(), + "OTel-OTLP-Exporter-Envoy/")); + + // Custom headers provided in the configuration. + EXPECT_EQ("auth-token", message->headers() + .get(Http::LowerCaseString("authorization"))[0] + ->value() + .getStringView()); + EXPECT_EQ("custom-value", message->headers() + .get(Http::LowerCaseString("x-custom-header"))[0] + ->value() + .getStringView()); + + return &request; + })); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_INFO); + log_record.mutable_body()->set_string_value("test log message"); + http_access_logger_->log(std::move(log_record)); + + // Trigger flush via timer callback. + timer_->invokeCallback(); + + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); + + // onBeforeFinalizeUpstreamSpan is a no-op, included for coverage. + Tracing::NullSpan null_span; + callback->onBeforeFinalizeUpstreamSpan(null_span, nullptr); + + callback->onSuccess(request, std::move(msg)); +} + +// Verifies that export is aborted gracefully when the cluster is not found. +TEST_F(HttpAccessLoggerImplTest, UnsuccessfulLogWithoutThreadLocalCluster) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 10s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + ON_CALL(cluster_manager_, getThreadLocalCluster(absl::string_view("my_o11y_backend"))) + .WillByDefault(Return(nullptr)); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_INFO); + log_record.mutable_body()->set_string_value("test log message"); + http_access_logger_->log(std::move(log_record)); + + // Trigger flush via timer callback - the log should be dropped since cluster is not available. + timer_->invokeCallback(); +} + +// Verifies that non-success HTTP status codes (e.g., 503) are handled gracefully. +TEST_F(HttpAccessLoggerImplTest, ExportLogsNonSuccessStatusCode) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + Http::MockAsyncClientRequest request(&cluster_manager_.thread_local_cluster_.async_client_); + Http::AsyncClient::Callbacks* callback; + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, send_(_, _, _)) + .WillOnce( + Invoke([&](Http::RequestMessagePtr&, Http::AsyncClient::Callbacks& callbacks, + const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { + callback = &callbacks; + return &request; + })); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_ERROR); + log_record.mutable_body()->set_string_value("error log message"); + http_access_logger_->log(std::move(log_record)); + + // Trigger flush via timer callback. + timer_->invokeCallback(); + + // Simulate a 503 response. + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "503"}}})); + callback->onSuccess(request, std::move(msg)); +} + +// Verifies that HTTP request failures (e.g., connection reset) are handled gracefully. +TEST_F(HttpAccessLoggerImplTest, ExportLogsHttpFailure) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + Http::MockAsyncClientRequest request(&cluster_manager_.thread_local_cluster_.async_client_); + Http::AsyncClient::Callbacks* callback; + + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, send_(_, _, _)) + .WillOnce( + Invoke([&](Http::RequestMessagePtr&, Http::AsyncClient::Callbacks& callbacks, + const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { + callback = &callbacks; + return &request; + })); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_INFO); + log_record.mutable_body()->set_string_value("test log message"); + http_access_logger_->log(std::move(log_record)); + + // Trigger flush via timer callback. + timer_->invokeCallback(); + + callback->onFailure(request, Http::AsyncClient::FailureReason::Reset); +} + +// Verifies that flush with no log records is a no-op (doesn't send a request). +TEST_F(HttpAccessLoggerImplTest, FlushWithNoLogRecordsIsNoOp) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + // No send call should be made since there are no logs. + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, send_(_, _, _)).Times(0); + + // Trigger flush via timer callback with no logs buffered. + timer_->invokeCallback(); +} + +// Verifies that when send_ returns nullptr, we don't track the request. +TEST_F(HttpAccessLoggerImplTest, SendReturnsNullptr) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + setup(http_service); + + // send_ returns nullptr (simulating immediate failure). + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, send_(_, _, _)) + .WillOnce(Return(nullptr)); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_INFO); + log_record.mutable_body()->set_string_value("test log message"); + http_access_logger_->log(std::move(log_record)); + + // Trigger flush via timer callback - should handle nullptr return gracefully. + timer_->invokeCallback(); +} + +// Verifies that buffer overflow triggers immediate flush. +TEST_F(HttpAccessLoggerImplTest, BufferOverflowTriggersFlush) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + // Set a very small buffer size to trigger overflow. + config.mutable_buffer_size_bytes()->set_value(1); + setupWithConfig(http_service, config); + + Http::MockAsyncClientRequest request(&cluster_manager_.thread_local_cluster_.async_client_); + Http::AsyncClient::Callbacks* callback; + + // Expect a flush triggered by buffer overflow (not timer). + EXPECT_CALL(cluster_manager_.thread_local_cluster_.async_client_, send_(_, _, _)) + .WillOnce( + Invoke([&](Http::RequestMessagePtr&, Http::AsyncClient::Callbacks& callbacks, + const Http::AsyncClient::RequestOptions&) -> Http::AsyncClient::Request* { + callback = &callbacks; + return &request; + })); + + opentelemetry::proto::logs::v1::LogRecord log_record; + log_record.set_severity_number(opentelemetry::proto::logs::v1::SEVERITY_NUMBER_INFO); + log_record.mutable_body()->set_string_value("test log message that exceeds buffer"); + // This should trigger immediate flush due to buffer overflow. + http_access_logger_->log(std::move(log_record)); + + // Complete the request. + Http::ResponseMessagePtr msg(new Http::ResponseMessageImpl( + Http::ResponseHeaderMapPtr{new Http::TestResponseHeaderMapImpl{{":status", "200"}}})); + callback->onSuccess(request, std::move(msg)); +} + +// Verifies that getOrCreateLogger returns the same logger instance for identical config. +TEST(HttpAccessLoggerCacheTest, CacheHitReturnsSameLogger) { + std::string yaml_string = R"EOF( + http_uri: + uri: "https://some-o11y.com/otlp/v1/logs" + cluster: "my_o11y_backend" + timeout: 0.250s + )EOF"; + + envoy::config::core::v3::HttpService http_service; + TestUtility::loadFromYaml(yaml_string, http_service); + + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_log_name("test_log"); + + NiceMock cluster_manager; + cluster_manager.thread_local_cluster_.cluster_.info_->name_ = "my_o11y_backend"; + cluster_manager.initializeThreadLocalClusters({"my_o11y_backend"}); + cluster_manager.initializeClusters({"my_o11y_backend"}, {}); + + NiceMock stats_store; + NiceMock tls; + NiceMock local_info; + + ON_CALL(local_info, zoneName()).WillByDefault(ReturnRef(ZONE_NAME)); + ON_CALL(local_info, clusterName()).WillByDefault(ReturnRef(CLUSTER_NAME)); + ON_CALL(local_info, nodeName()).WillByDefault(ReturnRef(NODE_NAME)); + + auto cache = std::make_shared( + cluster_manager, *stats_store.rootScope(), tls, local_info); + + auto logger1 = cache->getOrCreateLogger(config, http_service); + ASSERT_NE(nullptr, logger1); + + auto logger2 = cache->getOrCreateLogger(config, http_service); + EXPECT_EQ(logger1.get(), logger2.get()); +} + +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/access_loggers/open_telemetry/otlp_log_utils_test.cc b/test/extensions/access_loggers/open_telemetry/otlp_log_utils_test.cc new file mode 100644 index 0000000000000..c252da6bf73f9 --- /dev/null +++ b/test/extensions/access_loggers/open_telemetry/otlp_log_utils_test.cc @@ -0,0 +1,468 @@ +#include "source/common/router/string_accessor_impl.h" +#include "source/common/stream_info/filter_state_impl.h" +#include "source/extensions/access_loggers/open_telemetry/otlp_log_utils.h" + +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/stream_info/mocks.h" +#include "test/test_common/utility.h" + +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" +#include "opentelemetry/proto/resource/v1/resource.pb.h" + +namespace Envoy { +namespace Extensions { +namespace AccessLoggers { +namespace OpenTelemetry { +namespace { + +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +const std::string kTestZone = "test_zone"; +const std::string kTestCluster = "test_cluster"; +const std::string kTestNode = "test_node"; + +TEST(OtlpLogUtilsTest, GetStringKeyValue) { + auto kv = getStringKeyValue("test_key", "test_value"); + EXPECT_EQ("test_key", kv.key()); + EXPECT_EQ("test_value", kv.value().string_value()); +} + +TEST(OtlpLogUtilsTest, PackUnpackBody) { + ::opentelemetry::proto::common::v1::AnyValue body; + body.set_string_value("test body content"); + + auto packed = packBody(body); + ASSERT_EQ(1, packed.values().size()); + EXPECT_EQ(BodyKey, packed.values(0).key()); + + auto unpacked = unpackBody(packed); + EXPECT_EQ("test body content", unpacked.string_value()); +} + +TEST(OtlpLogUtilsTest, GetOtlpUserAgentHeader) { + const auto& header = getOtlpUserAgentHeader(); + EXPECT_TRUE(absl::StartsWith(header, "OTel-OTLP-Exporter-Envoy/")); + // Should return the same instance each time. + EXPECT_EQ(&header, &getOtlpUserAgentHeader()); +} + +TEST(OtlpLogUtilsTest, PopulateTraceContextFullTraceId) { + opentelemetry::proto::logs::v1::LogRecord log_entry; + // 32-char (128-bit) trace ID. + const std::string trace_id_hex = "0123456789abcdef0123456789abcdef"; + const std::string span_id_hex = "0123456789abcdef"; + + populateTraceContext(log_entry, trace_id_hex, span_id_hex); + + EXPECT_EQ(16, log_entry.trace_id().size()); + EXPECT_EQ(8, log_entry.span_id().size()); + // Verify the hex conversion is correct. + EXPECT_EQ(absl::HexStringToBytes(trace_id_hex), log_entry.trace_id()); + EXPECT_EQ(absl::HexStringToBytes(span_id_hex), log_entry.span_id()); +} + +TEST(OtlpLogUtilsTest, PopulateTraceContextShortTraceId) { + opentelemetry::proto::logs::v1::LogRecord log_entry; + // 16-char (64-bit, Zipkin-style) trace ID. + const std::string short_trace_id_hex = "0123456789abcdef"; + const std::string span_id_hex = "fedcba9876543210"; + + populateTraceContext(log_entry, short_trace_id_hex, span_id_hex); + + EXPECT_EQ(16, log_entry.trace_id().size()); + EXPECT_EQ(8, log_entry.span_id().size()); + // Should be padded with zeros on the left. + const std::string expected_trace_id = "0000000000000000" + short_trace_id_hex; + EXPECT_EQ(absl::HexStringToBytes(expected_trace_id), log_entry.trace_id()); +} + +TEST(OtlpLogUtilsTest, PopulateTraceContextEmptyIds) { + opentelemetry::proto::logs::v1::LogRecord log_entry; + + populateTraceContext(log_entry, "", ""); + + EXPECT_TRUE(log_entry.trace_id().empty()); + EXPECT_TRUE(log_entry.span_id().empty()); +} + +TEST(OtlpLogUtilsTest, PopulateTraceContextInvalidTraceIdLength) { + opentelemetry::proto::logs::v1::LogRecord log_entry; + // Invalid length (not 16 or 32 chars). + const std::string invalid_trace_id = "0123456789"; + const std::string span_id_hex = "0123456789abcdef"; + + populateTraceContext(log_entry, invalid_trace_id, span_id_hex); + + // Trace ID should not be set for invalid length. + EXPECT_TRUE(log_entry.trace_id().empty()); + // Span ID should still be set. + EXPECT_EQ(8, log_entry.span_id().size()); +} + +// Tests for config helper functions with fallback to deprecated common_config. + +// Verifies that top-level log_name takes precedence over common_config.log_name. +TEST(OtlpLogUtilsTest, GetLogNamePrefersTopLevel) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_log_name("top_level_log"); + config.mutable_common_config()->set_log_name("common_config_log"); + + EXPECT_EQ("top_level_log", getLogName(config)); +} + +// Verifies fallback to common_config.log_name when top-level is not set. +TEST(OtlpLogUtilsTest, GetLogNameFallsBackToCommonConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_common_config()->set_log_name("common_config_log"); + + EXPECT_EQ("common_config_log", getLogName(config)); +} + +// Verifies that an empty string is returned when neither is set. +TEST(OtlpLogUtilsTest, GetLogNameReturnsEmptyWhenNotSet) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + + EXPECT_TRUE(getLogName(config).empty()); +} + +// Verifies that top-level grpc_service takes precedence over common_config.grpc_service. +TEST(OtlpLogUtilsTest, GetGrpcServicePrefersTopLevel) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name("top_level_cluster"); + config.mutable_common_config()->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name( + "common_config_cluster"); + + const auto& grpc_service = getGrpcService(config); + EXPECT_EQ("top_level_cluster", grpc_service.envoy_grpc().cluster_name()); +} + +// Verifies fallback to common_config.grpc_service when top-level is not set. +TEST(OtlpLogUtilsTest, GetGrpcServiceFallsBackToCommonConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_common_config()->mutable_grpc_service()->mutable_envoy_grpc()->set_cluster_name( + "common_config_cluster"); + + const auto& grpc_service = getGrpcService(config); + EXPECT_EQ("common_config_cluster", grpc_service.envoy_grpc().cluster_name()); +} + +// Tests for buffer_flush_interval. + +// Verifies that buffer_flush_interval is read from config. +TEST(OtlpLogUtilsTest, GetBufferFlushIntervalFromConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_buffer_flush_interval()->set_seconds(5); + + EXPECT_EQ(std::chrono::milliseconds(5000), getBufferFlushInterval(config)); +} + +// Verifies that the default (1 second) is returned when not set. +TEST(OtlpLogUtilsTest, GetBufferFlushIntervalReturnsDefaultWhenNotSet) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + + EXPECT_EQ(DefaultBufferFlushInterval, getBufferFlushInterval(config)); +} + +// Tests for buffer_size_bytes. + +// Verifies that buffer_size_bytes is read from config. +TEST(OtlpLogUtilsTest, GetBufferSizeBytesFromConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.mutable_buffer_size_bytes()->set_value(32768); + + EXPECT_EQ(32768, getBufferSizeBytes(config)); +} + +// Verifies that the default (16KB) is returned when not set. +TEST(OtlpLogUtilsTest, GetBufferSizeBytesReturnsDefaultWhenNotSet) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + + EXPECT_EQ(DefaultMaxBufferSizeBytes, getBufferSizeBytes(config)); +} + +// Tests for filter_state_objects_to_log. + +// Verifies that filter_state_objects_to_log is read from config. +TEST(OtlpLogUtilsTest, GetFilterStateObjectsToLogFromConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.add_filter_state_objects_to_log("obj1"); + config.add_filter_state_objects_to_log("obj2"); + + auto result = getFilterStateObjectsToLog(config); + ASSERT_EQ(2, result.size()); + EXPECT_EQ("obj1", result[0]); + EXPECT_EQ("obj2", result[1]); +} + +// Verifies that an empty vector is returned when not set. +TEST(OtlpLogUtilsTest, GetFilterStateObjectsToLogReturnsEmptyWhenNotSet) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + + auto result = getFilterStateObjectsToLog(config); + EXPECT_TRUE(result.empty()); +} + +// Tests for custom_tags. + +// Verifies that custom_tags is read from config. +TEST(OtlpLogUtilsTest, GetCustomTagsFromConfig) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + auto* tag1 = config.add_custom_tags(); + tag1->set_tag("tag1"); + tag1->mutable_literal()->set_value("value1"); + + auto result = getCustomTags(config); + ASSERT_EQ(1, result.size()); + EXPECT_EQ("tag1", result[0]->tag()); +} + +// Verifies that an empty vector is returned when not set. +TEST(OtlpLogUtilsTest, GetCustomTagsReturnsEmptyWhenNotSet) { + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + + auto result = getCustomTags(config); + EXPECT_TRUE(result.empty()); +} + +// Tests for addFilterStateToAttributes. + +// Verifies that filter state from downstream is added to log attributes. +TEST(OtlpLogUtilsTest, AddFilterStateToAttributesFromDownstream) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + stream_info.filter_state_->setData( + "downstream_key", std::make_unique("downstream_value"), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::FilterChain); + + std::vector filter_state_objects = {"downstream_key"}; + addFilterStateToAttributes(stream_info, filter_state_objects, log_entry); + + ASSERT_EQ(1, log_entry.attributes_size()); + EXPECT_EQ("downstream_key", log_entry.attributes(0).key()); + // The value is JSON-serialized from the protobuf. + EXPECT_FALSE(log_entry.attributes(0).value().string_value().empty()); +} + +// Verifies that filter state from upstream is added when not found in downstream. +TEST(OtlpLogUtilsTest, AddFilterStateToAttributesFromUpstream) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + auto upstream_filter_state = + std::make_shared(StreamInfo::FilterState::LifeSpan::FilterChain); + upstream_filter_state->setData( + "upstream_key", std::make_unique("upstream_value"), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::FilterChain); + stream_info.upstreamInfo()->setUpstreamFilterState(upstream_filter_state); + + std::vector filter_state_objects = {"upstream_key"}; + addFilterStateToAttributes(stream_info, filter_state_objects, log_entry); + + ASSERT_EQ(1, log_entry.attributes_size()); + EXPECT_EQ("upstream_key", log_entry.attributes(0).key()); + EXPECT_FALSE(log_entry.attributes(0).value().string_value().empty()); +} + +// Verifies that downstream takes precedence when the same key exists in both. +TEST(OtlpLogUtilsTest, AddFilterStateToAttributesDownstreamPrecedence) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + // Add to downstream. + stream_info.filter_state_->setData( + "same_key", std::make_unique("downstream_wins"), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::FilterChain); + + // Add to upstream. + auto upstream_filter_state = + std::make_shared(StreamInfo::FilterState::LifeSpan::FilterChain); + upstream_filter_state->setData( + "same_key", std::make_unique("upstream_loses"), + StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::FilterChain); + stream_info.upstreamInfo()->setUpstreamFilterState(upstream_filter_state); + + std::vector filter_state_objects = {"same_key"}; + addFilterStateToAttributes(stream_info, filter_state_objects, log_entry); + + // Should only have one attribute (from downstream, not both). + ASSERT_EQ(1, log_entry.attributes_size()); + EXPECT_EQ("same_key", log_entry.attributes(0).key()); + // Value should be non-empty (from downstream filter state). + EXPECT_FALSE(log_entry.attributes(0).value().string_value().empty()); +} + +// Verifies that missing filter state keys are silently ignored. +TEST(OtlpLogUtilsTest, AddFilterStateToAttributesMissingKey) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + std::vector filter_state_objects = {"nonexistent_key"}; + addFilterStateToAttributes(stream_info, filter_state_objects, log_entry); + + // No attributes should be added. + EXPECT_EQ(0, log_entry.attributes_size()); +} + +// Verifies that empty filter state objects list results in no attributes. +TEST(OtlpLogUtilsTest, AddFilterStateToAttributesEmptyList) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + std::vector filter_state_objects; + addFilterStateToAttributes(stream_info, filter_state_objects, log_entry); + + EXPECT_EQ(0, log_entry.attributes_size()); +} + +// Tests for addCustomTagsToAttributes. + +// Verifies that custom tags with literal values are added to attributes. +TEST(OtlpLogUtilsTest, AddCustomTagsToAttributesWithLiteralTags) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + // Create custom tags. + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + auto* tag = config.add_custom_tags(); + tag->set_tag("literal_tag"); + tag->mutable_literal()->set_value("literal_value"); + auto custom_tags = getCustomTags(config); + + // Create formatter context with request headers. + Http::TestRequestHeaderMapImpl request_headers; + Formatter::Context context(&request_headers); + + addCustomTagsToAttributes(custom_tags, context, stream_info, log_entry); + + opentelemetry::proto::logs::v1::LogRecord expected; + auto* attr = expected.add_attributes(); + attr->set_key("literal_tag"); + attr->mutable_value()->set_string_value("literal_value"); + + EXPECT_TRUE(TestUtility::protoEqual(log_entry, expected)); +} + +// Verifies that empty custom tags list is a no-op. +TEST(OtlpLogUtilsTest, AddCustomTagsToAttributesEmptyTags) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + std::vector empty_tags; + + Http::TestRequestHeaderMapImpl request_headers; + Formatter::Context context(&request_headers); + + addCustomTagsToAttributes(empty_tags, context, stream_info, log_entry); + + opentelemetry::proto::logs::v1::LogRecord expected; + EXPECT_TRUE(TestUtility::protoEqual(log_entry, expected)); +} + +// Verifies that custom tags work when request headers are not available. +TEST(OtlpLogUtilsTest, AddCustomTagsToAttributesWithoutRequestHeaders) { + NiceMock stream_info; + opentelemetry::proto::logs::v1::LogRecord log_entry; + + // Create custom tags. + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + auto* tag = config.add_custom_tags(); + tag->set_tag("env_tag"); + tag->mutable_literal()->set_value("env_value"); + auto custom_tags = getCustomTags(config); + + // Create context without request headers (simulating TCP connection). + Formatter::Context context; + + addCustomTagsToAttributes(custom_tags, context, stream_info, log_entry); + + opentelemetry::proto::logs::v1::LogRecord expected; + auto* attr = expected.add_attributes(); + attr->set_key("env_tag"); + attr->mutable_value()->set_string_value("env_value"); + + EXPECT_TRUE(TestUtility::protoEqual(log_entry, expected)); +} + +// Tests for initOtlpMessageRoot. + +// Verifies that builtin labels (log_name, zone, cluster, node) are added when not disabled. +TEST(OtlpLogUtilsTest, InitOtlpMessageRootWithBuiltinLabels) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message; + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_log_name("test_log"); + + NiceMock local_info; + ON_CALL(local_info, zoneName()).WillByDefault(ReturnRef(kTestZone)); + ON_CALL(local_info, clusterName()).WillByDefault(ReturnRef(kTestCluster)); + ON_CALL(local_info, nodeName()).WillByDefault(ReturnRef(kTestNode)); + + auto* root = initOtlpMessageRoot(message, config, local_info); + + ASSERT_NE(nullptr, root); + ASSERT_EQ(1, message.resource_logs_size()); + + opentelemetry::proto::resource::v1::Resource expected_resource; + auto* attr = expected_resource.add_attributes(); + attr->set_key("log_name"); + attr->mutable_value()->set_string_value("test_log"); + attr = expected_resource.add_attributes(); + attr->set_key("zone_name"); + attr->mutable_value()->set_string_value(kTestZone); + attr = expected_resource.add_attributes(); + attr->set_key("cluster_name"); + attr->mutable_value()->set_string_value(kTestCluster); + attr = expected_resource.add_attributes(); + attr->set_key("node_name"); + attr->mutable_value()->set_string_value(kTestNode); + + EXPECT_TRUE(TestUtility::protoEqual(message.resource_logs(0).resource(), expected_resource)); +} + +// Verifies that no builtin labels are added when disable_builtin_labels is true. +TEST(OtlpLogUtilsTest, InitOtlpMessageRootDisableBuiltinLabels) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message; + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_disable_builtin_labels(true); + + NiceMock local_info; + + auto* root = initOtlpMessageRoot(message, config, local_info); + + ASSERT_NE(nullptr, root); + ASSERT_EQ(1, message.resource_logs_size()); + + opentelemetry::proto::resource::v1::Resource expected_resource; + EXPECT_TRUE(TestUtility::protoEqual(message.resource_logs(0).resource(), expected_resource)); +} + +// Verifies that custom resource_attributes are added to the resource. +TEST(OtlpLogUtilsTest, InitOtlpMessageRootWithResourceAttributes) { + opentelemetry::proto::collector::logs::v1::ExportLogsServiceRequest message; + envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config; + config.set_disable_builtin_labels(true); + auto* kv = config.mutable_resource_attributes()->add_values(); + kv->set_key("custom_key"); + kv->mutable_value()->set_string_value("custom_value"); + + NiceMock local_info; + + auto* root = initOtlpMessageRoot(message, config, local_info); + + ASSERT_NE(nullptr, root); + + opentelemetry::proto::resource::v1::Resource expected_resource; + auto* attr = expected_resource.add_attributes(); + attr->set_key("custom_key"); + attr->mutable_value()->set_string_value("custom_value"); + + EXPECT_TRUE(TestUtility::protoEqual(message.resource_logs(0).resource(), expected_resource)); +} + +} // namespace +} // namespace OpenTelemetry +} // namespace AccessLoggers +} // namespace Extensions +} // namespace Envoy diff --git a/test/extensions/access_loggers/open_telemetry/substitution_formatter_test.cc b/test/extensions/access_loggers/open_telemetry/substitution_formatter_test.cc index f22fd74a01749..d36425d95e050 100644 --- a/test/extensions/access_loggers/open_telemetry/substitution_formatter_test.cc +++ b/test/extensions/access_loggers/open_telemetry/substitution_formatter_test.cc @@ -114,6 +114,29 @@ void verifyOpenTelemetryOutput(KeyValueList output, OpenTelemetryFormatMap expec } } +// Verifies that unsupported top-level value types (e.g., bool_value) throw an exception. +TEST(SubstitutionFormatterTest, UnsupportedValueTypeThrows) { + KeyValueList key_mapping; + auto* kv = key_mapping.add_values(); + kv->set_key("test"); + kv->mutable_value()->set_bool_value(true); + + std::vector commands; + EXPECT_THROW(OpenTelemetryFormatter(key_mapping, commands), EnvoyException); +} + +// Verifies that unsupported array element types (e.g., int_value) throw an exception. +TEST(SubstitutionFormatterTest, UnsupportedArrayValueTypeThrows) { + KeyValueList key_mapping; + auto* kv = key_mapping.add_values(); + kv->set_key("test"); + auto* array = kv->mutable_value()->mutable_array_value(); + array->add_values()->set_int_value(42); + + std::vector commands; + EXPECT_THROW(OpenTelemetryFormatter(key_mapping, commands), EnvoyException); +} + TEST(SubstitutionFormatterTest, OpenTelemetryFormatterPlainStringTest) { StreamInfo::MockStreamInfo stream_info;