diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index 0196003166..4bfa1380e8 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -38,9 +38,9 @@ tonic = { version = "0.13.1", default-features = false, features = [ "deflate", ] } sysinfo = "0.35.1" - byte-unit = "4" - fluke-hpack = "0.3.1" - serde = {version="1", features=["derive"]} +byte-unit = "4" +fluke-hpack = "0.3.1" +serde = {version="1", features=["derive"]} prost = "0.13.5" tokio-stream = "0.1.17" async-stream = "0.3.6" diff --git a/rust/otap-dataflow/crates/otap/src/otap_exporter.rs b/rust/otap-dataflow/crates/otap/src/otap_exporter.rs index 9deb4c7e39..7120231d08 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_exporter.rs @@ -3,7 +3,7 @@ //! Implementation of the OTAP exporter node //! //! ToDo: Handle Ack and Nack messages in the pipeline -//! ToDo: Handle configuratin changes +//! ToDo: Handle configuration changes //! ToDo: Implement proper deadline function for Shutdown ctrl msg use crate::LOCAL_EXPORTERS; diff --git a/rust/otap-dataflow/crates/otlp/Cargo.toml b/rust/otap-dataflow/crates/otlp/Cargo.toml index 0555ef7065..a1d3915cb0 100644 --- a/rust/otap-dataflow/crates/otlp/Cargo.toml +++ b/rust/otap-dataflow/crates/otlp/Cargo.toml @@ -25,6 +25,7 @@ prost = "0.13.5" prost-types = "0.13.5" hex = "0.4.3" linkme = "0.3.33" +serde = {version="1", features=["derive"]} # Workspace dependencies thiserror.workspace = true diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/README.md b/rust/otap-dataflow/crates/otlp/src/debug_exporter/README.md new file mode 100644 index 0000000000..2add21079c --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/README.md @@ -0,0 +1,326 @@ +# Debug Exporter + +Status: **WIP** + +This crate will contain the implementation of the debug exporter. + +## Example Output => Basic Verbosity + +```plaintext +Timer tick received +OTLP Metric objects received: 0 +OTLP Trace objects received: 0 +OTLP Profile objects received: 0 +OTLP Log objects received: 0 +Received 1 resource metrics +Received 5 metrics +Received 5 data points +Received 1 resource spans +Received 1 spans +Received 1 events +Received 1 links +Received 1 resource logs +Received 1 log records +Received 1 events +Received 1 resource profiles +Received 0 samples +Shutdown message received +Debug Exporter Summary: +OTLP Metric objects received: 1 +Received 1 Resource Metrics +Received 5 metrics +Received 5 datapoints +OTLP Trace objects received: 1 +Received 1 Resource Spans +Received 1 spans +Received 1 events +Received 1 links +OTLP Log objects received: 1 +Received 1 Resource logs +Received 1 log records +Received 1 log events +OTLP Profile objects received: 1 +Received 1 Resource profiles +Received 0 samples + +``` + +## Example Output => Normal Verbosity + +```plaintext +Timer tick received +OTLP Metric objects received: 0 +OTLP Trace objects received: 0 +OTLP Profile objects received: 0 +OTLP Log objects received: 0 +Received 1 resource metrics +Received 5 metrics +Received 5 data points +ResourceMetric #0, Schema:[http://schema.opentelemetry.io], Attributes: ip=192.168.0.2 + ScopeMetric #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: instrumentation_scope_k1=k1 value + system.cpu.time 0 + system.cpu.time freq=3GHz count=0 sum=56 min=12 max=100.1 + system.cpu.time freq=3GHz count=0 sum=56 min=12 max=100.1 le94.17542094619048=0 + system.cpu.time cpu_logical_processors=8 0 + system.cpu.time cpu_cores=4 count=0 sum=56 q0=0 + +Received 1 resource spans +Received 1 spans +Received 1 events +Received 1 links +ResourceSpan #0, Schema:[http://schema.opentelemetry.io], Attributes: ip=192.168.0.1 + ScopeSpan #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com + Name: user-account, Trace ID: 4327e52011a22f9662eac217d77d1ec0, Span ID: 7271ee06d7e5925f, Attributes: hostname=host4.gov + +Received 1 resource logs +Received 1 log records +Received 1 events +ResourceLog #0, Schema:[http://schema.opentelemetry.io], Attributes: version=2.0 + ScopeLog #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com + Body: Sint impedit non ut eligendi nisi neque harum maxime adipisci., Attributes: hostname=host3.thedomain.edu + +Received 1 resource profiles +Received 0 samples +ResourceProfile #0, Schema:[http://schema.opentelemetry.io], Attributes: hostname=host7.com + ScopeProfile #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com + +Shutdown message received +Debug Exporter Summary: +OTLP Metric objects received: 1 +Received 1 Resource Metrics +Received 5 metrics +Received 5 datapoints +OTLP Trace objects received: 1 +Received 1 Resource Spans +Received 1 spans +Received 1 events +Received 1 links +OTLP Log objects received: 1 +Received 1 Resource logs +Received 1 log records +Received 1 log events +OTLP Profile objects received: 1 +Received 1 Resource profiles +Received 0 samples +``` + +## Example Output => Detailed Verbosity + +```plaintext +Timer tick received +OTLP Metric objects received: 0 +OTLP Trace objects received: 0 +OTLP Profile objects received: 0 +OTLP Log objects received: 0 +Received 1 resource metrics +Received 5 metrics +Received 5 data points +ResourceMetric #0 + -> Resource SchemaURL: http://schema.opentelemetry.io + -> Resource Attributes: + -> ip: 192.168.0.2 + ScopeMetrics #0 + -> ScopeMetrics SchemaURL: http://schema.opentelemetry.io + -> Instrumentation Scope library @v1 + -> Instrumentation Scope Attributes: + -> instrumentation_scope_k1: k1 value + Metric #0 + -> Name: system.cpu.time + -> Description: time cpu has ran + -> Unit: s + -> DataType: Gauge + NumberDataPoints #0 + -> Attributes: + -> StartTimestamp: 1650499200000000100 + -> Timestamp: 1663718400000001400 + -> Value: 0 + Metric #1 + -> Name: system.cpu.time + -> Description: time cpu has ran + -> Unit: s + -> DataType: Exponential Histogram + -> AggregationTemporality: 4 + ExponentialHistogramDataPoints #0 + -> Attributes: + -> freq: 3GHz + -> StartTimestamp: 1650499200000000000 + -> Timestamp: 1663718400000001400 + -> Count: 0 + -> Sum: 56 + -> Min: 12 + -> Max: 100.1 + -> Bucket [-4.113250378782927, -1), Count: 0 + -> Bucket (1, 4.113250378782927], Count: 0 + -> Exemplars: + Exemplar #0 + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Span ID: 7271ee06d7e5925f + -> Timestamp: 1663718400000001400 + -> Value: 22.2 + -> FilteredAttributes: + -> cpu: 0 + Metric #2 + -> Name: system.cpu.time + -> Description: time cpu has ran + -> Unit: s + -> DataType: Histogram + -> AggregationTemporality: 4 + HistogramDataPoints #0 + -> Attributes: + -> freq: 3GHz + -> StartTimestamp: 1650499200000000000 + -> Timestamp: 1663718400000001400 + -> Count: 0 + -> Sum: 56 + -> Min: 12 + -> Max: 100.1 + -> ExplicitBound #0: 94.17542094619048 + -> ExplicitBound #1: 65.66722851519177 + -> Buckets #0, Count: 0 + -> Exemplars: + Exemplar #0 + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Span ID: 7271ee06d7e5925f + -> Timestamp: 1663718400000001400 + -> Value: 22.2 + -> FilteredAttributes: + -> cpu: 0 + Metric #3 + -> Name: system.cpu.time + -> Description: time cpu has ran + -> Unit: s + -> DataType: Sum + -> IsMonotonic: true + -> AggregationTemporality: 4 + NumberDataPoints #0 + -> Attributes: + -> cpu_logical_processors: 8 + -> StartTimestamp: 1650499200000000000 + -> Timestamp: 1663718400000001400 + -> Value: 0 + -> Exemplars: + Exemplar #0 + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Span ID: 7271ee06d7e5925f + -> Timestamp: 1663718400000001400 + -> Value: 22.2 + -> FilteredAttributes: + -> ************: true + Metric #4 + -> Name: system.cpu.time + -> Description: time cpu has ran + -> Unit: s + -> DataType: Summary + SummaryDataPoints #0 + -> Attributes: + -> cpu_cores: 4 + -> StartTimestamp: 1650499200000000100 + -> Timestamp: 1663718400000001400 + -> Count: 0 + -> Sum: 56 + -> QuantileValue #0: Quantile 0, Value 0 + +Received 1 resource spans +Received 1 spans +Received 1 events +Received 1 links +ResourceSpan #0 + -> Resource SchemaURL: http://schema.opentelemetry.io + -> Resource Attributes: + -> ip: 192.168.0.1 + ScopeSpans #0 + -> ScopeSpans SchemaURL: http://schema.opentelemetry.io + -> Instrumentation Scope library @v1 + -> Instrumentation Scope Attributes: + -> hostname: host5.retailer.com + Span #0 + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Parent ID: 7271ee06d7e5925f + -> ID: 7271ee06d7e5925f + -> Name: user-account + -> Kind: 4 + -> TraceState: ended + -> Start time: 1647648000000000106 + -> End time: 1647648000000000104 + -> Status code: 2 + -> Status message: Error + -> Attributes: + -> hostname: host4.gov + -> Events: + SpanEvent #0 + -> Name: message-receive + -> Timestamp: 1647648000000000108 + -> DroppedAttributesCount: 0 + -> Attributes: + -> hostname: host5.retailer.com + -> Links: + SpanLink: #0 + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Span ID: 7271ee06d7e5925f + -> TraceState: ended + -> DroppedAttributesCount: 0 + -> Attributes: + -> hostname: host2.org + +Received 1 resource logs +Received 1 log records +Received 1 events +ResourceLog #0 + -> Resource SchemaURL: http://schema.opentelemetry.io + -> Resource Attributes: + -> version: 2.0 + ScopeLogs #0 + -> ScopeLogs SchemaURL: http://schema.opentelemetry.io + -> Instrumentation Scope library @v1 + -> Instrumentation Scope Attributes: + -> hostname: host5.retailer.com + LogRecord #0 + -> ObservedTimestamp: 1663718400000001300 + -> Timestamp: 2000000000 + -> SeverityText: INFO + -> SeverityNumber: 2 + -> EventName: event1 + -> Body: Sint impedit non ut eligendi nisi neque harum maxime adipisci. + -> Attributes: + -> hostname: host3.thedomain.edu + -> Trace ID: 4327e52011a22f9662eac217d77d1ec0 + -> Span ID: 7271ee06d7e5925f + -> Flags: 8 + +Received 1 resource profiles +Received 0 samples +ResourceProfile #0 + -> Resource SchemaURL: http://schema.opentelemetry.io + -> Resource Attributes: + -> hostname: host7.com + ScopeProfiles #0 + -> ScopeProfiles SchemaURL: http://schema.opentelemetry.io + -> Instrumentation Scope library @v1 + -> Instrumentation Scope Attributes: + -> hostname: host5.retailer.com + Profile #0 + -> Profile ID: + -> Start time: 0 + -> Duration: 0 + -> Dropped attributes count: 0 + -> Location indices: [] + +Shutdown message received +Debug Exporter Summary: +OTLP Metric objects received: 1 +Received 1 Resource Metrics +Received 5 metrics +Received 5 datapoints +OTLP Trace objects received: 1 +Received 1 Resource Spans +Received 1 spans +Received 1 events +Received 1 links +OTLP Log objects received: 1 +Received 1 Resource logs +Received 1 log records +Received 1 log events +OTLP Profile objects received: 1 +Received 1 Resource profiles +Received 0 samples +``` diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/config.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/config.rs new file mode 100644 index 0000000000..da3785f658 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/config.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the configuration of the debug exporter +//! + +use serde::Deserialize; + +/// Enum that allows the user to specify how much information they want displayed +#[derive(Debug, Clone, Copy, PartialEq, Deserialize)] +pub enum Verbosity { + /// display the most detailed information available + Detailed, + /// display the basic amount of information available and some detail about each request + Normal, + /// just display number of logs, metrics, traces, profiles received with some additional detail about samples/datapoints + Basic, +} + +/// Defines the settings of the debug exporter, controls the level of verbosity the exporter outputs +#[derive(Debug, Clone, PartialEq, Deserialize)] +pub struct Config { + #[serde(default = "default_verbosity")] + verbosity: Verbosity, +} + +fn default_verbosity() -> Verbosity { + Verbosity::Normal +} + +impl Config { + /// Create a new Config object + #[must_use] + pub fn new(verbosity: Verbosity) -> Self { + Self { verbosity } + } + /// check the frequency interval + #[must_use] + pub fn verbosity(&self) -> Verbosity { + self.verbosity + } +} + +impl Default for Config { + fn default() -> Self { + Self { + verbosity: default_verbosity(), + } + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/counter.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/counter.rs new file mode 100644 index 0000000000..2ade941173 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/counter.rs @@ -0,0 +1,210 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Debug Counter which enables the debug exporter to keep track of stats to report on +//! + +use std::fmt::Write; + +/// Struct that has a counter for various data we want to track +#[derive(Default, Clone, Debug)] +pub struct DebugCounter { + // counter to count the signals we receive between timerticks + metric_signal_count: u64, + profile_signal_count: u64, + span_signal_count: u64, + log_signal_count: u64, + + // counters to keep total count + // counter to count the signals we receive + total_metric_signal_count: u64, + total_profile_signal_count: u64, + total_span_signal_count: u64, + total_log_signal_count: u64, + + // counter to count the resources of each signal + total_resource_metric_count: u64, + total_resource_span_count: u64, + total_resource_profile_count: u64, + total_resource_log_count: u64, + + // counter to count additional data we receive from the signals + total_metric_count: u64, // count the metrics + total_log_count: u64, // count the log records + total_span_count: u64, // count the spans + total_sample_count: u64, // count the samples we receive in profile signal + total_span_event_count: u64, // count the number of span events + total_span_link_count: u64, // count the number of span links + total_log_event_count: u64, // count the number of log records with event name + total_data_point_count: u64, // count the datapoints we receive in metric signal +} + +impl DebugCounter { + /// reset the signal counts after timertick + pub fn reset_signal_count(&mut self) { + self.metric_signal_count = 0; + self.profile_signal_count = 0; + self.span_signal_count = 0; + self.log_signal_count = 0; + } + /// increment counter for metric signal + pub fn increment_metric_signal_count(&mut self) { + self.total_metric_signal_count += 1; + self.metric_signal_count += 1; + } + /// increment counter for profile signal + pub fn increment_profile_signal_count(&mut self) { + self.total_profile_signal_count += 1; + self.profile_signal_count += 1; + } + /// increment counter for span signal + pub fn increment_span_signal_count(&mut self) { + self.total_span_signal_count += 1; + self.span_signal_count += 1; + } + /// increment counter for log signal + pub fn increment_log_signal_count(&mut self) { + self.total_log_signal_count += 1; + self.log_signal_count += 1; + } + + /// update the counters for metric data + pub fn update_metric_data(&mut self, resource_metrics: u64, metrics: u64, data_points: u64) { + self.total_resource_metric_count += resource_metrics; + self.total_metric_count += metrics; + self.total_data_point_count += data_points; + } + /// update the counters for span data + pub fn update_span_data(&mut self, resource_spans: u64, spans: u64, events: u64, links: u64) { + self.total_resource_span_count += resource_spans; + self.total_span_count += spans; + self.total_span_event_count += events; + self.total_span_link_count += links; + } + /// update the counters for log date + pub fn update_log_data(&mut self, resource_logs: u64, log_records: u64, events: u64) { + self.total_resource_log_count += resource_logs; + self.total_log_count += log_records; + self.total_log_event_count += events; + } + /// update the counters for profile data + pub fn update_profile_data(&mut self, resource_profiles: u64, samples: u64) { + self.total_resource_profile_count += resource_profiles; + self.total_sample_count += samples; + } + /// Generate report of total data received, output stats + #[must_use] + pub fn debug_report(&self) -> String { + let mut report = String::new(); + _ = writeln!(&mut report, "Debug Exporter Summary:"); + + _ = writeln!( + &mut report, + "OTLP Metric objects received: {metric_signal}", + metric_signal = self.total_metric_signal_count + ); + _ = writeln!( + &mut report, + "Received {resource_metric} Resource Metrics", + resource_metric = self.total_resource_metric_count + ); + _ = writeln!( + &mut report, + "Received {metric} metrics", + metric = self.total_metric_count + ); + _ = writeln!( + &mut report, + "Received {datapoint} datapoints", + datapoint = self.total_data_point_count + ); + _ = writeln!( + &mut report, + "OTLP Trace objects received: {span_signal}", + span_signal = self.total_span_signal_count + ); + _ = writeln!( + &mut report, + "Received {resource_spans} Resource Spans", + resource_spans = self.total_resource_span_count + ); + _ = writeln!( + &mut report, + "Received {spans} spans", + spans = self.total_span_count + ); + _ = writeln!( + &mut report, + "Received {events} events", + events = self.total_span_event_count + ); + _ = writeln!( + &mut report, + "Received {links} links", + links = self.total_span_link_count + ); + _ = writeln!( + &mut report, + "OTLP Log objects received: {log_signal}", + log_signal = self.total_log_signal_count + ); + _ = writeln!( + &mut report, + "Received {resource_log} Resource logs", + resource_log = self.total_resource_log_count + ); + _ = writeln!( + &mut report, + "Received {log_record} log records", + log_record = self.total_log_count + ); + _ = writeln!( + &mut report, + "Received {log_event} log events", + log_event = self.total_log_event_count + ); + _ = writeln!( + &mut report, + "OTLP Profile objects received: {profile_signal}", + profile_signal = self.total_profile_signal_count + ); + _ = writeln!( + &mut report, + "Received {resource_profile} Resource profiles", + resource_profile = self.total_resource_profile_count + ); + _ = writeln!( + &mut report, + "Received {samples} samples", + samples = self.total_sample_count + ); + + report + } + /// Generate report of objects received between timer ticks + #[must_use] + pub fn signals_count_report(&self) -> String { + let mut report = String::new(); + _ = writeln!( + &mut report, + "OTLP Metric objects received: {metric_count}", + metric_count = self.metric_signal_count + ); + _ = writeln!( + &mut report, + "OTLP Trace objects received: {span_count}", + span_count = self.span_signal_count + ); + _ = writeln!( + &mut report, + "OTLP Profile objects received: {profile_count}", + profile_count = self.profile_signal_count + ); + _ = writeln!( + &mut report, + "OTLP Log objects received: {log_count}", + log_count = self.log_signal_count + ); + + report + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/detailed_otlp_marshaler.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/detailed_otlp_marshaler.rs new file mode 100644 index 0000000000..e9d2862f58 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/detailed_otlp_marshaler.rs @@ -0,0 +1,1139 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the OTLPMarshaler for converting OTLP messages to structured string reports. +//! + +use crate::debug_exporter::marshaler::OTLPMarshaler; +use crate::proto::opentelemetry::{ + collector::{ + logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest, + profiles::v1development::ExportProfilesServiceRequest, + trace::v1::ExportTraceServiceRequest, + }, + common::v1::{InstrumentationScope, KeyValue}, + metrics::v1::{ + Exemplar, ExponentialHistogramDataPoint, HistogramDataPoint, NumberDataPoint, + SummaryDataPoint, exemplar::Value as ExemplarValue, metric::Data, + number_data_point::Value as NumberValue, + }, +}; + +use std::fmt::Write; + +/// The Detailed Marshaler takes OTLP messages and converts them to a string by extracting their information +/// the finalized string will be the output for a detailed verbosity level +#[derive(Default)] +pub struct DetailedOTLPMarshaler; + +impl OTLPMarshaler for DetailedOTLPMarshaler { + fn marshal_logs(&self, logs: ExportLogsServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_log) in logs.resource_logs.iter().enumerate() { + _ = writeln!(&mut report, "ResourceLog #{resource_index}",); + _ = writeln!( + &mut report, + " -> Resource SchemaURL: {schema_url}", + schema_url = resource_log.schema_url + ); + if let Some(resource) = &resource_log.resource { + _ = writeln!( + &mut report, + " -> Resource Attributes:{attributes}", + attributes = attributes_string_detailed(&resource.attributes, " ->"), + ); + } + + for (scope_index, scope_log) in resource_log.scope_logs.iter().enumerate() { + _ = writeln!(&mut report, " ScopeLogs #{scope_index}",); + _ = writeln!( + &mut report, + " -> ScopeLogs SchemaURL: {schema_url}", + schema_url = scope_log.schema_url + ); + if let Some(scope) = &scope_log.scope { + write_instrumentation_scope(&mut report, scope); + } + + for (record_index, log_record) in scope_log.log_records.iter().enumerate() { + _ = writeln!(&mut report, " LogRecord #{record_index}",); + _ = writeln!( + &mut report, + " -> ObservedTimestamp: {timestamp}", + timestamp = log_record.observed_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = log_record.time_unix_nano + ); + _ = writeln!( + &mut report, + " -> SeverityText: {severity}", + severity = log_record.severity_text + ); + _ = writeln!( + &mut report, + " -> SeverityNumber: {severity_number}", + severity_number = log_record.severity_number + ); + + if !log_record.event_name.is_empty() { + _ = writeln!( + &mut report, + " -> EventName: {event_name}", + event_name = log_record.event_name + ); + } + if let Some(body) = &log_record.body { + _ = writeln!(&mut report, " -> Body: {body}"); + } + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = + attributes_string_detailed(&log_record.attributes, " ->"), + ); + if let Ok(trace_id) = std::str::from_utf8(&log_record.trace_id) { + _ = writeln!(&mut report, " -> Trace ID: {trace_id}",); + } + + if let Ok(span_id) = std::str::from_utf8(&log_record.span_id) { + _ = writeln!(&mut report, " -> Span ID: {span_id}",); + } + + _ = writeln!( + &mut report, + " -> Flags: {flags}", + flags = log_record.flags + ); + } + } + } + report + } + fn marshal_metrics(&self, metrics: ExportMetricsServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_metric) in metrics.resource_metrics.iter().enumerate() { + _ = writeln!(&mut report, "ResourceMetric #{resource_index}",); + _ = writeln!( + &mut report, + " -> Resource SchemaURL: {schema_url}", + schema_url = resource_metric.schema_url + ); + + if let Some(resource) = &resource_metric.resource { + _ = writeln!( + &mut report, + " -> Resource Attributes:{attributes}", + attributes = attributes_string_detailed(&resource.attributes, " ->"), + ); + } + for (scope_index, scope_metric) in resource_metric.scope_metrics.iter().enumerate() { + _ = writeln!(&mut report, " ScopeMetrics #{scope_index}",); + _ = writeln!( + &mut report, + " -> ScopeMetrics SchemaURL: {schema_url}", + schema_url = scope_metric.schema_url + ); + if let Some(scope) = &scope_metric.scope { + write_instrumentation_scope(&mut report, scope); + } + + for (metric_index, metric) in scope_metric.metrics.iter().enumerate() { + _ = writeln!(&mut report, " Metric #{metric_index}",); + _ = writeln!(&mut report, " -> Name: {name}", name = metric.name); + _ = writeln!( + &mut report, + " -> Description: {description}", + description = metric.description + ); + _ = writeln!(&mut report, " -> Unit: {unit}", unit = metric.unit); + if let Some(data) = &metric.data { + write_datapoints_detailed(&mut report, data); + } + } + } + } + report + } + fn marshal_traces(&self, traces: ExportTraceServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_span) in traces.resource_spans.iter().enumerate() { + _ = writeln!(&mut report, "ResourceSpan #{resource_index}",); + _ = writeln!( + &mut report, + " -> Resource SchemaURL: {schema_url}", + schema_url = resource_span.schema_url + ); + if let Some(resource) = &resource_span.resource { + _ = writeln!( + &mut report, + " -> Resource Attributes:{attributes}", + attributes = attributes_string_detailed(&resource.attributes, " ->"), + ); + } + for (scope_index, scope_span) in resource_span.scope_spans.iter().enumerate() { + _ = writeln!(&mut report, " ScopeSpans #{scope_index}",); + _ = writeln!( + &mut report, + " -> ScopeSpans SchemaURL: {schema_url}", + schema_url = scope_span.schema_url + ); + if let Some(scope) = &scope_span.scope { + write_instrumentation_scope(&mut report, scope); + } + + for (span_index, span) in scope_span.spans.iter().enumerate() { + _ = writeln!(&mut report, " Span #{span_index}",); + if let Ok(trace_id) = std::str::from_utf8(&span.trace_id) { + _ = writeln!(&mut report, " -> Trace ID: {trace_id}",); + } + if let Ok(parent_span_id) = std::str::from_utf8(&span.parent_span_id) { + _ = writeln!(&mut report, " -> Parent ID: {parent_span_id}",); + } + if let Ok(span_id) = std::str::from_utf8(&span.span_id) { + _ = writeln!(&mut report, " -> ID: {span_id}",); + } + + _ = writeln!(&mut report, " -> Name: {name}", name = span.name); + _ = writeln!(&mut report, " -> Kind: {kind}", kind = span.kind); + if !span.trace_state.is_empty() { + _ = writeln!( + &mut report, + " -> TraceState: {trace_state}", + trace_state = span.trace_state + ); + } + + _ = writeln!( + &mut report, + " -> Start time: {start_time}", + start_time = span.start_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> End time: {end_time}", + end_time = span.end_time_unix_nano + ); + if let Some(status) = &span.status { + _ = writeln!( + &mut report, + " -> Status code: {status_code}", + status_code = status.code + ); + _ = writeln!( + &mut report, + " -> Status message: {status_message}", + status_message = status.message + ); + } + + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = attributes_string_detailed(&span.attributes, " ->"), + ); + + if !span.events.is_empty() { + _ = writeln!(&mut report, " -> Events:"); + for (event_index, event) in span.events.iter().enumerate() { + _ = writeln!(&mut report, " SpanEvent #{event_index}",); + _ = writeln!( + &mut report, + " -> Name: {name}", + name = event.name + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = event.time_unix_nano + ); + _ = writeln!( + &mut report, + " -> DroppedAttributesCount: {dropped_attributes_count}", + dropped_attributes_count = event.dropped_attributes_count + ); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = attributes_string_detailed( + &event.attributes, + " ->" + ), + ); + } + } + + if !span.links.is_empty() { + _ = writeln!(&mut report, " -> Links:"); + for (index, link) in span.links.iter().enumerate() { + _ = writeln!(&mut report, " SpanLink: #{index}"); + if let Ok(trace_id) = std::str::from_utf8(&link.trace_id) { + _ = writeln!(&mut report, " -> Trace ID: {trace_id}"); + } + if let Ok(span_id) = std::str::from_utf8(&link.span_id) { + _ = writeln!(&mut report, " -> Span ID: {span_id}"); + } + + _ = writeln!( + &mut report, + " -> TraceState: {state}", + state = link.trace_state + ); + _ = writeln!( + &mut report, + " -> DroppedAttributesCount: {count}", + count = link.dropped_attributes_count + ); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = attributes_string_detailed( + &link.attributes, + " ->" + ), + ); + } + } + } + } + } + report + } + fn marshal_profiles(&self, profiles: ExportProfilesServiceRequest) -> String { + let mut report = String::new(); + + // ToDo: Display profile mapping, profile location, profile functions, + for (resource_index, resource_profile) in profiles.resource_profiles.iter().enumerate() { + _ = writeln!(&mut report, "ResourceProfile #{resource_index}",); + _ = writeln!( + &mut report, + " -> Resource SchemaURL: {schema_url}", + schema_url = resource_profile.schema_url + ); + if let Some(resource) = &resource_profile.resource { + _ = writeln!( + &mut report, + " -> Resource Attributes:{attributes}", + attributes = attributes_string_detailed(&resource.attributes, " ->"), + ); + } + for (scope_index, scope_profile) in resource_profile.scope_profiles.iter().enumerate() { + _ = writeln!(&mut report, " ScopeProfiles #{scope_index}",); + _ = writeln!( + &mut report, + " -> ScopeProfiles SchemaURL: {schema_url}", + schema_url = scope_profile.schema_url + ); + if let Some(scope) = &scope_profile.scope { + write_instrumentation_scope(&mut report, scope); + } + + for (profile_index, profile) in scope_profile.profiles.iter().enumerate() { + _ = writeln!(&mut report, " Profile #{profile_index}",); + if let Ok(profile_id) = std::str::from_utf8(&profile.profile_id) { + _ = writeln!(&mut report, " -> Profile ID: {profile_id}"); + } + _ = writeln!( + &mut report, + " -> Start time: {profile_start_time}", + profile_start_time = profile.time_nanos + ); + _ = writeln!( + &mut report, + " -> Duration: {profile_duration}", + profile_duration = profile.duration_nanos + ); + _ = writeln!( + &mut report, + " -> Dropped attributes count: {profile_dropped_attributes_count}", + profile_dropped_attributes_count = profile.dropped_attributes_count + ); + + _ = writeln!( + &mut report, + " -> Location indices: {location_indices:?}", + location_indices = profile.location_indices + ); + + // ToDo: display profile samples + + if !profile.comment_strindices.is_empty() { + _ = writeln!(&mut report, " Comment:"); + for comment in profile.comment_strindices.iter() { + _ = writeln!(&mut report, " -> {comment}"); + } + } + } + } + } + report + } +} + +fn attributes_string_detailed(attributes: &[KeyValue], prefix: &str) -> String { + let mut attribute_string = String::new(); + for attribute in attributes.iter() { + if let Some(value) = &attribute.value { + _ = write!( + &mut attribute_string, + "\n{prefix} {key}: {value}", + key = attribute.key, + ); + } + } + + attribute_string +} + +fn write_datapoints_detailed(mut report: &mut String, data: &Data) { + match data { + Data::Gauge(gauge) => { + _ = writeln!(&mut report, " -> DataType: Gauge"); + write_number_datapoints_detailed(report, &gauge.data_points); + } + Data::Sum(sum) => { + _ = writeln!(&mut report, " -> DataType: Sum"); + _ = writeln!( + &mut report, + " -> IsMonotonic: {is_monotonic}", + is_monotonic = sum.is_monotonic + ); + _ = writeln!( + &mut report, + " -> AggregationTemporality: {aggregation_temporality}", + aggregation_temporality = sum.aggregation_temporality + ); + write_number_datapoints_detailed(report, &sum.data_points); + } + Data::Histogram(histogram) => { + _ = writeln!(&mut report, " -> DataType: Histogram"); + _ = writeln!( + &mut report, + " -> AggregationTemporality: {aggregation_temporality}", + aggregation_temporality = histogram.aggregation_temporality + ); + write_histogram_datapoints_detailed(report, &histogram.data_points); + } + Data::ExponentialHistogram(exponential_histogram) => { + _ = writeln!(&mut report, " -> DataType: Exponential Histogram"); + _ = writeln!( + &mut report, + " -> AggregationTemporality: {aggregation_temporality}", + aggregation_temporality = exponential_histogram.aggregation_temporality + ); + write_exponential_histogram_datapoints_detailed( + report, + &exponential_histogram.data_points, + ); + } + Data::Summary(summary) => { + _ = writeln!(&mut report, " -> DataType: Summary"); + write_summary_datapoints_detailed(report, &summary.data_points); + } + } +} + +fn write_number_datapoints_detailed(mut report: &mut String, datapoints: &[NumberDataPoint]) { + for (datapoint_index, datapoint) in datapoints.iter().enumerate() { + _ = writeln!( + &mut report, + " NumberDataPoints #{datapoint_index}", + ); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = + attributes_string_detailed(&datapoint.attributes, " ->"), + ); + _ = writeln!( + &mut report, + " -> StartTimestamp: {timestamp}", + timestamp = datapoint.start_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = datapoint.time_unix_nano + ); + if let Some(value) = &datapoint.value { + match value { + NumberValue::AsInt(value) => { + _ = writeln!(&mut report, " -> Value: {value}"); + } + NumberValue::AsDouble(value) => { + _ = writeln!(&mut report, " -> Value: {value}",); + } + } + } + + write_exemplars(report, &datapoint.exemplars); + } +} + +fn write_histogram_datapoints_detailed(mut report: &mut String, datapoints: &[HistogramDataPoint]) { + for (index, datapoint) in datapoints.iter().enumerate() { + _ = writeln!(&mut report, " HistogramDataPoints #{index}"); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = + attributes_string_detailed(&datapoint.attributes, " ->"), + ); + + _ = writeln!( + &mut report, + " -> StartTimestamp: {timestamp}", + timestamp = datapoint.start_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = datapoint.time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Count: {count}", + count = datapoint.count + ); + + if let Some(sum) = &datapoint.sum { + _ = writeln!(&mut report, " -> Sum: {sum}"); + } + if let Some(min) = &datapoint.min { + _ = writeln!(&mut report, " -> Min: {min}"); + } + if let Some(max) = &datapoint.max { + _ = writeln!(&mut report, " -> Max: {max}"); + } + + for (index, bound) in datapoint.explicit_bounds.iter().enumerate() { + _ = writeln!( + &mut report, + " -> ExplicitBound #{index}: {bound}", + ); + } + for (index, count) in datapoint.bucket_counts.iter().enumerate() { + _ = writeln!( + &mut report, + " -> Buckets #{index}, Count: {count}", + ); + } + + write_exemplars(report, &datapoint.exemplars); + } +} + +fn write_exponential_histogram_datapoints_detailed( + mut report: &mut String, + datapoints: &[ExponentialHistogramDataPoint], +) { + for (datapoint_index, datapoint) in datapoints.iter().enumerate() { + _ = writeln!( + &mut report, + " ExponentialHistogramDataPoints #{datapoint_index}", + ); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = + attributes_string_detailed(&datapoint.attributes, " ->"), + ); + _ = writeln!( + &mut report, + " -> StartTimestamp: {timestamp}", + timestamp = datapoint.start_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = datapoint.time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Count: {count}", + count = datapoint.count + ); + if let Some(sum) = &datapoint.sum { + _ = writeln!(&mut report, " -> Sum: {sum}"); + } + if let Some(min) = &datapoint.min { + _ = writeln!(&mut report, " -> Min: {min}"); + } + if let Some(max) = &datapoint.max { + _ = writeln!(&mut report, " -> Max: {max}"); + } + + // calcualate the base -> 2^(2^(-scale)) -> e^(ln(2) * 2^(-scale)) + + let base: f64 = (std::f64::consts::LN_2 * 2.0_f64.powf(-datapoint.scale as f64)).exp(); + + if let Some(negative) = &datapoint.negative { + let num_buckets = negative.bucket_counts.len(); + for position in 0..num_buckets { + let updated_position = num_buckets - position - 1; + + let index: f64 = negative.offset as f64 + updated_position as f64; + // calculate lower bound base^index + let lower_bound = -(index * base).exp(); + // calculate upper bound base^(index + 1) + let upper_bound = -((index + 1.0) * base).exp(); + _ = writeln!( + report, + " -> Bucket [{upper_bound}, {lower_bound}), Count: {count}", + count = negative.bucket_counts[updated_position] + ); + } + } + if let Some(positive) = &datapoint.positive { + let num_buckets = positive.bucket_counts.len(); + + for position in 0..num_buckets { + let index: f64 = positive.offset as f64 + position as f64; + let lower_bound = (index * base).exp(); + let upper_bound = ((index + 1.0) * base).exp(); + _ = writeln!( + report, + " -> Bucket ({lower_bound}, {upper_bound}], Count: {count}", + count = positive.bucket_counts[position] + ); + } + } + + if datapoint.zero_count != 0 { + _ = writeln!( + &mut report, + " -> Bucket [0, 0], Count: {count}", + count = datapoint.zero_count + ); + } + + write_exemplars(report, &datapoint.exemplars); + } +} + +fn write_summary_datapoints_detailed(mut report: &mut String, datapoints: &[SummaryDataPoint]) { + for (datapoint_index, datapoint) in datapoints.iter().enumerate() { + _ = writeln!( + &mut report, + " SummaryDataPoints #{datapoint_index}", + ); + _ = writeln!( + &mut report, + " -> Attributes:{attributes}", + attributes = + attributes_string_detailed(&datapoint.attributes, " ->"), + ); + _ = writeln!( + &mut report, + " -> StartTimestamp: {timestamp}", + timestamp = datapoint.start_time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = datapoint.time_unix_nano + ); + _ = writeln!( + &mut report, + " -> Count: {count}", + count = datapoint.count + ); + _ = writeln!( + &mut report, + " -> Sum: {sum}", + sum = datapoint.sum + ); + for (quantile_index, quantile) in datapoint.quantile_values.iter().enumerate() { + _ = writeln!( + &mut report, + " -> QuantileValue #{quantile_index}: Quantile {quantile}, Value {value}", + quantile = quantile.quantile, + value = quantile.value + ); + } + } +} + +fn write_exemplars(mut report: &mut String, exemplars: &[Exemplar]) { + if !exemplars.is_empty() { + _ = writeln!(&mut report, " -> Exemplars:"); + + for (exemplar_index, exemplar) in exemplars.iter().enumerate() { + _ = writeln!(&mut report, " Exemplar #{exemplar_index}",); + if let Ok(trace_id) = std::str::from_utf8(&exemplar.trace_id) { + _ = writeln!(&mut report, " -> Trace ID: {trace_id}",); + } + if let Ok(span_id) = std::str::from_utf8(&exemplar.span_id) { + _ = writeln!(&mut report, " -> Span ID: {span_id}",); + } + _ = writeln!( + &mut report, + " -> Timestamp: {timestamp}", + timestamp = exemplar.time_unix_nano + ); + if let Some(value) = &exemplar.value { + match value { + ExemplarValue::AsInt(value) => { + _ = writeln!(&mut report, " -> Value: {value}",); + } + ExemplarValue::AsDouble(value) => { + _ = writeln!(&mut report, " -> Value: {value}",); + } + } + } + _ = writeln!( + &mut report, + " -> FilteredAttributes:{attributes}", + attributes = attributes_string_detailed( + &exemplar.filtered_attributes, + " ->" + ) + ); + } + } +} + +fn write_instrumentation_scope(mut report: &mut String, scope: &InstrumentationScope) { + _ = writeln!( + &mut report, + " -> Instrumentation Scope {name} @{version}", + name = scope.name, + version = scope.version + ); + _ = writeln!( + &mut report, + " -> Instrumentation Scope Attributes:{attributes}", + attributes = attributes_string_detailed(&scope.attributes, " ->") + ); +} + +#[cfg(test)] +mod tests { + + use crate::debug_exporter::detailed_otlp_marshaler::DetailedOTLPMarshaler; + use crate::debug_exporter::marshaler::OTLPMarshaler; + use crate::mock::{ + create_otlp_log, create_otlp_metric, create_otlp_profile, create_otlp_trace, + }; + + #[test] + fn test_marshal_traces() { + let trace = create_otlp_trace(1, 1, 1, 1, 1); + let marshaler = DetailedOTLPMarshaler; + let marshaled_trace = marshaler.marshal_traces(trace); + let mut output_lines = Vec::new(); + for line in marshaled_trace.lines() { + output_lines.push(line); + } + assert_eq!(output_lines[0], "ResourceSpan #0"); + assert_eq!( + output_lines[1], + " -> Resource SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!(output_lines[2], " -> Resource Attributes:"); + assert_eq!(output_lines[3], " -> ip: 192.168.0.1"); + assert_eq!(output_lines[4], " ScopeSpans #0"); + assert_eq!( + output_lines[5], + " -> ScopeSpans SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!( + output_lines[6], + " -> Instrumentation Scope library @v1" + ); + assert_eq!( + output_lines[7], + " -> Instrumentation Scope Attributes:" + ); + assert_eq!(output_lines[8], " -> hostname: host5.retailer.com"); + assert_eq!(output_lines[9], " Span #0"); + assert_eq!( + output_lines[10], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!(output_lines[11], " -> Parent ID: 7271ee06d7e5925f"); + assert_eq!(output_lines[12], " -> ID: 7271ee06d7e5925f"); + assert_eq!(output_lines[13], " -> Name: user-account"); + assert_eq!(output_lines[14], " -> Kind: 4"); + assert_eq!(output_lines[15], " -> TraceState: ended"); + assert_eq!( + output_lines[16], + " -> Start time: 1647648000000000106" + ); + assert_eq!( + output_lines[17], + " -> End time: 1647648000000000104" + ); + assert_eq!(output_lines[18], " -> Status code: 2"); + assert_eq!(output_lines[19], " -> Status message: Error"); + assert_eq!(output_lines[20], " -> Attributes:"); + assert_eq!(output_lines[21], " -> hostname: host4.gov"); + assert_eq!(output_lines[22], " -> Events:"); + assert_eq!(output_lines[23], " SpanEvent #0"); + assert_eq!(output_lines[24], " -> Name: message-receive"); + assert_eq!( + output_lines[25], + " -> Timestamp: 1647648000000000108" + ); + assert_eq!( + output_lines[26], + " -> DroppedAttributesCount: 0" + ); + assert_eq!(output_lines[27], " -> Attributes:"); + assert_eq!( + output_lines[28], + " -> hostname: host5.retailer.com" + ); + assert_eq!(output_lines[29], " -> Links:"); + assert_eq!(output_lines[30], " SpanLink: #0"); + assert_eq!( + output_lines[31], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!( + output_lines[32], + " -> Span ID: 7271ee06d7e5925f" + ); + assert_eq!(output_lines[33], " -> TraceState: ended"); + assert_eq!( + output_lines[34], + " -> DroppedAttributesCount: 0" + ); + assert_eq!(output_lines[35], " -> Attributes:"); + assert_eq!(output_lines[36], " -> hostname: host2.org"); + } + + #[test] + fn test_marshal_metrics() { + let metric = create_otlp_metric(1, 1, 5, 1); + let marshaler = DetailedOTLPMarshaler; + let marshaled_metric = marshaler.marshal_metrics(metric); + let mut output_lines = Vec::new(); + for line in marshaled_metric.lines() { + output_lines.push(line); + } + assert_eq!(output_lines[0], "ResourceMetric #0"); + assert_eq!( + output_lines[1], + " -> Resource SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!(output_lines[2], " -> Resource Attributes:"); + assert_eq!(output_lines[3], " -> ip: 192.168.0.2"); + assert_eq!(output_lines[4], " ScopeMetrics #0"); + assert_eq!( + output_lines[5], + " -> ScopeMetrics SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!( + output_lines[6], + " -> Instrumentation Scope library @v1" + ); + assert_eq!( + output_lines[7], + " -> Instrumentation Scope Attributes:" + ); + assert_eq!( + output_lines[8], + " -> instrumentation_scope_k1: k1 value" + ); + assert_eq!(output_lines[9], " Metric #0"); + assert_eq!(output_lines[10], " -> Name: system.cpu.time"); + assert_eq!( + output_lines[11], + " -> Description: time cpu has ran" + ); + assert_eq!(output_lines[12], " -> Unit: s"); + assert_eq!(output_lines[13], " -> DataType: Gauge"); + assert_eq!(output_lines[14], " NumberDataPoints #0"); + assert_eq!(output_lines[15], " -> Attributes:"); + assert_eq!( + output_lines[16], + " -> StartTimestamp: 1650499200000000100" + ); + assert_eq!( + output_lines[17], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[18], " -> Value: 0"); + assert_eq!(output_lines[19], " Metric #1"); + assert_eq!(output_lines[20], " -> Name: system.cpu.time"); + assert_eq!( + output_lines[21], + " -> Description: time cpu has ran" + ); + assert_eq!(output_lines[22], " -> Unit: s"); + assert_eq!( + output_lines[23], + " -> DataType: Exponential Histogram" + ); + assert_eq!(output_lines[24], " -> AggregationTemporality: 4"); + assert_eq!( + output_lines[25], + " ExponentialHistogramDataPoints #0" + ); + assert_eq!(output_lines[26], " -> Attributes:"); + assert_eq!(output_lines[27], " -> freq: 3GHz"); + assert_eq!( + output_lines[28], + " -> StartTimestamp: 1650499200000000000" + ); + assert_eq!( + output_lines[29], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[30], " -> Count: 0"); + assert_eq!(output_lines[31], " -> Sum: 56"); + assert_eq!(output_lines[32], " -> Min: 12"); + assert_eq!(output_lines[33], " -> Max: 100.1"); + assert_eq!( + output_lines[34], + " -> Bucket [-4.113250378782927, -1), Count: 0" + ); + assert_eq!( + output_lines[35], + " -> Bucket (1, 4.113250378782927], Count: 0" + ); + assert_eq!(output_lines[36], " -> Exemplars:"); + assert_eq!(output_lines[37], " Exemplar #0"); + assert_eq!( + output_lines[38], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!( + output_lines[39], + " -> Span ID: 7271ee06d7e5925f" + ); + assert_eq!( + output_lines[40], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[41], " -> Value: 22.2"); + assert_eq!( + output_lines[42], + " -> FilteredAttributes:" + ); + assert_eq!(output_lines[43], " -> cpu: 0"); + assert_eq!(output_lines[44], " Metric #2"); + assert_eq!(output_lines[45], " -> Name: system.cpu.time"); + assert_eq!( + output_lines[46], + " -> Description: time cpu has ran" + ); + assert_eq!(output_lines[47], " -> Unit: s"); + assert_eq!(output_lines[48], " -> DataType: Histogram"); + assert_eq!(output_lines[49], " -> AggregationTemporality: 4"); + assert_eq!(output_lines[50], " HistogramDataPoints #0"); + assert_eq!(output_lines[51], " -> Attributes:"); + assert_eq!(output_lines[52], " -> freq: 3GHz"); + assert_eq!( + output_lines[53], + " -> StartTimestamp: 1650499200000000000" + ); + assert_eq!( + output_lines[54], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[55], " -> Count: 0"); + assert_eq!(output_lines[56], " -> Sum: 56"); + assert_eq!(output_lines[57], " -> Min: 12"); + assert_eq!(output_lines[58], " -> Max: 100.1"); + assert_eq!( + output_lines[59], + " -> ExplicitBound #0: 94.17542094619048" + ); + assert_eq!( + output_lines[60], + " -> ExplicitBound #1: 65.66722851519177" + ); + assert_eq!(output_lines[61], " -> Buckets #0, Count: 0"); + assert_eq!(output_lines[62], " -> Exemplars:"); + assert_eq!(output_lines[63], " Exemplar #0"); + assert_eq!( + output_lines[64], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!( + output_lines[65], + " -> Span ID: 7271ee06d7e5925f" + ); + assert_eq!( + output_lines[66], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[67], " -> Value: 22.2"); + assert_eq!( + output_lines[68], + " -> FilteredAttributes:" + ); + assert_eq!(output_lines[69], " -> cpu: 0"); + assert_eq!(output_lines[70], " Metric #3"); + assert_eq!(output_lines[71], " -> Name: system.cpu.time"); + assert_eq!( + output_lines[72], + " -> Description: time cpu has ran" + ); + assert_eq!(output_lines[73], " -> Unit: s"); + assert_eq!(output_lines[74], " -> DataType: Sum"); + assert_eq!(output_lines[75], " -> IsMonotonic: true"); + assert_eq!(output_lines[76], " -> AggregationTemporality: 4"); + assert_eq!(output_lines[77], " NumberDataPoints #0"); + assert_eq!(output_lines[78], " -> Attributes:"); + assert_eq!( + output_lines[79], + " -> cpu_logical_processors: 8" + ); + assert_eq!( + output_lines[80], + " -> StartTimestamp: 1650499200000000000" + ); + assert_eq!( + output_lines[81], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[82], " -> Value: 0"); + assert_eq!(output_lines[83], " -> Exemplars:"); + assert_eq!(output_lines[84], " Exemplar #0"); + assert_eq!( + output_lines[85], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!( + output_lines[86], + " -> Span ID: 7271ee06d7e5925f" + ); + assert_eq!( + output_lines[87], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[88], " -> Value: 22.2"); + assert_eq!( + output_lines[89], + " -> FilteredAttributes:" + ); + assert_eq!( + output_lines[90], + " -> ************: true" + ); + assert_eq!(output_lines[91], " Metric #4"); + assert_eq!(output_lines[92], " -> Name: system.cpu.time"); + assert_eq!( + output_lines[93], + " -> Description: time cpu has ran" + ); + assert_eq!(output_lines[94], " -> Unit: s"); + assert_eq!(output_lines[95], " -> DataType: Summary"); + assert_eq!(output_lines[96], " SummaryDataPoints #0"); + assert_eq!(output_lines[97], " -> Attributes:"); + assert_eq!(output_lines[98], " -> cpu_cores: 4"); + assert_eq!( + output_lines[99], + " -> StartTimestamp: 1650499200000000100" + ); + assert_eq!( + output_lines[100], + " -> Timestamp: 1663718400000001400" + ); + assert_eq!(output_lines[101], " -> Count: 0"); + assert_eq!(output_lines[102], " -> Sum: 56"); + assert_eq!( + output_lines[103], + " -> QuantileValue #0: Quantile 0, Value 0" + ); + } + #[test] + fn test_marshal_logs() { + let logs = create_otlp_log(1, 1, 1); + let marshaler = DetailedOTLPMarshaler; + let marshaled_logs = marshaler.marshal_logs(logs); + let mut output_lines = Vec::new(); + for line in marshaled_logs.lines() { + output_lines.push(line); + } + + assert_eq!(output_lines[0], "ResourceLog #0"); + assert_eq!( + output_lines[1], + " -> Resource SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!(output_lines[2], " -> Resource Attributes:"); + assert_eq!(output_lines[3], " -> version: 2.0"); + assert_eq!(output_lines[4], " ScopeLogs #0"); + assert_eq!( + output_lines[5], + " -> ScopeLogs SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!( + output_lines[6], + " -> Instrumentation Scope library @v1" + ); + assert_eq!( + output_lines[7], + " -> Instrumentation Scope Attributes:" + ); + assert_eq!(output_lines[8], " -> hostname: host5.retailer.com"); + assert_eq!(output_lines[9], " LogRecord #0"); + assert_eq!( + output_lines[10], + " -> ObservedTimestamp: 1663718400000001300" + ); + assert_eq!(output_lines[11], " -> Timestamp: 2000000000"); + assert_eq!(output_lines[12], " -> SeverityText: INFO"); + assert_eq!(output_lines[13], " -> SeverityNumber: 2"); + assert_eq!(output_lines[14], " -> EventName: event1"); + assert_eq!( + output_lines[15], + " -> Body: Sint impedit non ut eligendi nisi neque harum maxime adipisci." + ); + assert_eq!(output_lines[16], " -> Attributes:"); + assert_eq!( + output_lines[17], + " -> hostname: host3.thedomain.edu" + ); + assert_eq!( + output_lines[18], + " -> Trace ID: 4327e52011a22f9662eac217d77d1ec0" + ); + assert_eq!(output_lines[19], " -> Span ID: 7271ee06d7e5925f"); + assert_eq!(output_lines[20], " -> Flags: 8"); + } + + #[test] + fn test_marshal_profiles() { + let profiles = create_otlp_profile(1, 1, 1); + let marshaler = DetailedOTLPMarshaler; + let marshaled_profiles = marshaler.marshal_profiles(profiles); + let mut output_lines = Vec::new(); + for line in marshaled_profiles.lines() { + output_lines.push(line); + } + + assert_eq!(output_lines[0], "ResourceProfile #0"); + assert_eq!( + output_lines[1], + " -> Resource SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!(output_lines[2], " -> Resource Attributes:"); + assert_eq!(output_lines[3], " -> hostname: host7.com"); + assert_eq!(output_lines[4], " ScopeProfiles #0"); + assert_eq!( + output_lines[5], + " -> ScopeProfiles SchemaURL: http://schema.opentelemetry.io" + ); + assert_eq!( + output_lines[6], + " -> Instrumentation Scope library @v1" + ); + assert_eq!( + output_lines[7], + " -> Instrumentation Scope Attributes:" + ); + assert_eq!(output_lines[8], " -> hostname: host5.retailer.com"); + assert_eq!(output_lines[9], " Profile #0"); + assert_eq!(output_lines[10], " -> Profile ID: "); + assert_eq!(output_lines[11], " -> Start time: 0"); + assert_eq!(output_lines[12], " -> Duration: 0"); + assert_eq!(output_lines[13], " -> Dropped attributes count: 0"); + assert_eq!(output_lines[14], " -> Location indices: []"); + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/exporter.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/exporter.rs new file mode 100644 index 0000000000..044b5bb0b2 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/exporter.rs @@ -0,0 +1,500 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the OTLP Debug exporter node +//! +//! ToDo: Handle Ack and Nack messages in the pipeline +//! ToDo: Handle configuration changes +//! ToDo: Implement proper deadline function for Shutdown ctrl msg +//! ToDo: Use OTLP Views instead of the OTLP Request structs +//! + +use crate::LOCAL_EXPORTERS; +use crate::debug_exporter::{ + config::{Config, Verbosity}, + counter::DebugCounter, + detailed_otlp_marshaler::DetailedOTLPMarshaler, + marshaler::OTLPMarshaler, + normal_otlp_marshaler::NormalOTLPMarshaler, +}; +use crate::grpc::OTLPData; +use crate::proto::opentelemetry::{ + collector::{ + logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest, + profiles::v1development::ExportProfilesServiceRequest, + trace::v1::ExportTraceServiceRequest, + }, + metrics::v1::metric::Data, +}; +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_engine::error::Error; +use otap_df_engine::local::{LocalExporterFactory, exporter as local}; +use otap_df_engine::message::{ControlMsg, Message, MessageChannel}; +use serde_json::Value; +use std::fs::OpenOptions; +use std::io::Write; + +/// Exporter that outputs all data received to stdout +pub struct DebugExporter { + config: Config, + output: Option, +} + +/// Declares the Debug exporter as a local exporter factory +/// +/// Unsafe code is temporarily used here to allow the use of `distributed_slice` macro +/// This macro is part of the `linkme` crate which is considered safe and well maintained. +#[allow(unsafe_code)] +#[distributed_slice(LOCAL_EXPORTERS)] +pub static DEBUG_EXPORTER: LocalExporterFactory = LocalExporterFactory { + name: "urn:otel:debug:exporter", + create: |config: &Value| Box::new(DebugExporter::from_config(config)), +}; + +impl DebugExporter { + /// Creates a new Debug exporter + #[must_use] + #[allow(dead_code)] + pub fn new(config: Config, output: Option) -> Self { + DebugExporter { config, output } + } + + /// Creates a new DebugExporter from a configuration object + #[must_use] + pub fn from_config(config: &Value) -> Self { + let config: Config = serde_json::from_value(config.clone()) + .unwrap_or_else(|_| Config::new(Verbosity::Normal)); + DebugExporter { + config, + output: None, + } + } +} + +/// Implement the local exporter trait for a OTAP Exporter +#[async_trait(?Send)] +impl local::Exporter for DebugExporter { + async fn start( + self: Box, + mut msg_chan: MessageChannel, + effect_handler: local::EffectHandler, + ) -> Result<(), Error> { + // counter to count number of objects received between timerticks + let mut counter = DebugCounter::default(); + + // create a marshaler to take the otlp objects and extract various data to report + let marshaler: Box = if self.config.verbosity() == Verbosity::Normal { + Box::new(NormalOTLPMarshaler) + } else { + Box::new(DetailedOTLPMarshaler) + }; + + // get a writer to write to stdout or to a file + let mut writer = get_writer(self.output); + // Loop until a Shutdown event is received. + loop { + match msg_chan.recv().await? { + // handle control messages + Message::Control(ControlMsg::TimerTick { .. }) => { + _ = writeln!(writer, "Timer tick received"); + + // output count of messages received since last timertick + _ = write!(writer, "{report}", report = counter.signals_count_report()); + + // reset counters after timertick + counter.reset_signal_count(); + } + Message::Control(ControlMsg::Config { .. }) => { + _ = writeln!(writer, "Config message received"); + } + // shutdown the exporter + Message::Control(ControlMsg::Shutdown { .. }) => { + // ToDo: add proper deadline function + _ = writeln!(writer, "Shutdown message received"); + _ = write!(writer, "{report}", report = counter.debug_report()); + break; + } + //send data + Message::PData(message) => { + match message { + // ToDo: Add Ack/Nack handling, send a signal that data has been exported + // ToDo: Use the views instead of OTLPData + + // match on OTLPData type and use the respective method to collect data about the received object + // increment the counters for each respective OTLP Datatype + OTLPData::Metrics(req) => { + push_metric( + &self.config.verbosity(), + req, + &*marshaler, + &mut writer, + &mut counter, + ); + counter.increment_metric_signal_count(); + } + OTLPData::Logs(req) => { + push_log( + &self.config.verbosity(), + req, + &*marshaler, + &mut writer, + &mut counter, + ); + counter.increment_log_signal_count(); + } + OTLPData::Traces(req) => { + push_trace( + &self.config.verbosity(), + req, + &*marshaler, + &mut writer, + &mut counter, + ); + counter.increment_span_signal_count(); + } + OTLPData::Profiles(req) => { + push_profile( + &self.config.verbosity(), + req, + &*marshaler, + &mut writer, + &mut counter, + ); + counter.increment_profile_signal_count(); + } + } + } + _ => { + return Err(Error::ExporterError { + exporter: effect_handler.exporter_name(), + error: "Unknown control message".to_owned(), + }); + } + } + } + Ok(()) + } +} + +/// determine if output goes to console or to a file +fn get_writer(output_file: Option) -> Box { + match output_file { + Some(file_name) => Box::new( + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(file_name) + .expect("could not open output file"), + ), + None => Box::new(std::io::stdout()), + } +} + +/// Function to collect and report the data contained in a Metrics object received by the Debug exporter +fn push_metric( + verbosity: &Verbosity, + metric_request: ExportMetricsServiceRequest, + marshaler: &dyn OTLPMarshaler, + writer: &mut impl Write, + counter: &mut DebugCounter, +) { + // collect number of resource metrics + // collect number of metrics + // collect number of datapoints + let resource_metrics = metric_request.resource_metrics.len(); + let mut data_points = 0; + let mut metrics = 0; + for resource_metrics in &metric_request.resource_metrics { + for scope_metrics in &resource_metrics.scope_metrics { + metrics += scope_metrics.metrics.len(); + for metric in &scope_metrics.metrics { + if let Some(data) = &metric.data { + match data { + Data::Gauge(gauge) => { + data_points += gauge.data_points.len(); + } + Data::Sum(sum) => { + data_points += sum.data_points.len(); + } + Data::Histogram(histogram) => { + data_points += histogram.data_points.len(); + } + Data::ExponentialHistogram(exponential_histogram) => { + data_points += exponential_histogram.data_points.len(); + } + Data::Summary(summary) => { + data_points += summary.data_points.len(); + } + } + } + } + } + } + + _ = writeln!(writer, "Received {resource_metrics} resource metrics"); + _ = writeln!(writer, "Received {metrics} metrics"); + _ = writeln!(writer, "Received {data_points} data points"); + counter.update_metric_data(resource_metrics as u64, metrics as u64, data_points as u64); + // if verbosity is basic we don't report anymore information, if a higher verbosity is specified than we call the marshaler + if *verbosity == Verbosity::Basic { + return; + } + + let report = marshaler.marshal_metrics(metric_request); + _ = writeln!(writer, "{report}"); +} + +fn push_trace( + verbosity: &Verbosity, + trace_request: ExportTraceServiceRequest, + marshaler: &dyn OTLPMarshaler, + writer: &mut impl Write, + counter: &mut DebugCounter, +) { + // collect number of resource spans + // collect number of spans + let resource_spans = trace_request.resource_spans.len(); + let mut spans = 0; + let mut events = 0; + let mut links = 0; + for resource_span in &trace_request.resource_spans { + for scope_span in &resource_span.scope_spans { + spans += scope_span.spans.len(); + for span in &scope_span.spans { + events += span.events.len(); + links += span.links.len(); + } + } + } + + _ = writeln!(writer, "Received {resource_spans} resource spans"); + _ = writeln!(writer, "Received {spans} spans"); + _ = writeln!(writer, "Received {events} events"); + _ = writeln!(writer, "Received {links} links"); + counter.update_span_data( + resource_spans as u64, + spans as u64, + events as u64, + links as u64, + ); + // if verbosity is basic we don't report anymore information, if a higher verbosity is specified than we call the marshaler + if *verbosity == Verbosity::Basic { + return; + } + + let report = marshaler.marshal_traces(trace_request); + _ = writeln!(writer, "{report}"); +} + +fn push_log( + verbosity: &Verbosity, + log_request: ExportLogsServiceRequest, + marshaler: &dyn OTLPMarshaler, + writer: &mut impl Write, + counter: &mut DebugCounter, +) { + let resource_logs = log_request.resource_logs.len(); + let mut log_records = 0; + let mut events = 0; + for resource_log in &log_request.resource_logs { + for scope_log in &resource_log.scope_logs { + log_records += scope_log.log_records.len(); + for log_record in &scope_log.log_records { + if !log_record.event_name.is_empty() { + events += 1; + } + } + } + } + _ = writeln!(writer, "Received {resource_logs} resource logs"); + _ = writeln!(writer, "Received {log_records} log records"); + _ = writeln!(writer, "Received {events} events"); + counter.update_log_data(resource_logs as u64, log_records as u64, events as u64); + if *verbosity == Verbosity::Basic { + return; + } + + let report = marshaler.marshal_logs(log_request); + _ = writeln!(writer, "{report}"); +} + +fn push_profile( + verbosity: &Verbosity, + profile_request: ExportProfilesServiceRequest, + marshaler: &dyn OTLPMarshaler, + writer: &mut impl Write, + counter: &mut DebugCounter, +) { + // collect number of resource profiles + // collect number of sample records + let resource_profiles = profile_request.resource_profiles.len(); + let mut samples = 0; + for resource_profile in &profile_request.resource_profiles { + for scope_profile in &resource_profile.scope_profiles { + for profile in &scope_profile.profiles { + samples += profile.sample.len(); + } + } + } + + _ = writeln!(writer, "Received {resource_profiles} resource profiles"); + _ = writeln!(writer, "Received {samples} samples"); + counter.update_profile_data(resource_profiles as u64, samples as u64); + if *verbosity == Verbosity::Basic { + return; + } + + let report = marshaler.marshal_profiles(profile_request); + _ = writeln!(writer, "{report}"); +} + +#[cfg(test)] +mod tests { + + use crate::debug_exporter::config::{Config, Verbosity}; + use crate::debug_exporter::exporter::DebugExporter; + use crate::grpc::OTLPData; + use crate::mock::{ + create_otlp_log, create_otlp_metric, create_otlp_profile, create_otlp_trace, + }; + + use otap_df_engine::exporter::ExporterWrapper; + use otap_df_engine::testing::exporter::TestContext; + use otap_df_engine::testing::exporter::TestRuntime; + use tokio::time::{Duration, sleep}; + + use std::fs::{File, remove_file}; + use std::io::{BufReader, read_to_string}; + + /// Test closure that simulates a typical test scenario by sending timer ticks, config, + /// data message, and shutdown control messages. + /// + fn scenario() + -> impl FnOnce(TestContext) -> std::pin::Pin>> { + |ctx| { + Box::pin(async move { + // send some messages to the exporter to calculate pipeline statistics + // // Send a data message + ctx.send_pdata(OTLPData::Metrics(create_otlp_metric(1, 1, 5, 1))) + .await + .expect("Failed to send data message"); + ctx.send_pdata(OTLPData::Traces(create_otlp_trace(1, 1, 1, 1, 1))) + .await + .expect("Failed to send data message"); + ctx.send_pdata(OTLPData::Logs(create_otlp_log(1, 1, 1))) + .await + .expect("Failed to send data message"); + ctx.send_pdata(OTLPData::Profiles(create_otlp_profile(1, 1, 1))) + .await + .expect("Failed to send data message"); + + // TODO ADD DELAY BETWEEN HERE + _ = sleep(Duration::from_millis(5000)); + + // send timertick to generate the report + ctx.send_timer_tick() + .await + .expect("Failed to send TimerTick"); + + // Send shutdown + ctx.send_shutdown(Duration::from_millis(200), "test complete") + .await + .expect("Failed to send Shutdown"); + }) + } + } + + /// Validation closure that checks the expected counter values + fn validation_procedure( + output_file: String, + ) -> impl FnOnce(TestContext) -> std::pin::Pin>> { + |_| { + Box::pin(async move { + // get a file to read and validate the output + // open file + // read the output file + // assert each line accordingly + let file = File::open(output_file).expect("failed to open file"); + let reader = read_to_string(BufReader::new(file)).expect("failed to get string"); + + // check the the exporter has received the expected number of messages + assert!(reader.contains("Timer tick received")); + assert!(reader.contains("OTLP Metric objects received: 0")); + assert!(reader.contains("OTLP Trace objects received: 0")); + assert!(reader.contains("OTLP Profile objects received: 0")); + assert!(reader.contains("OTLP Log objects received: 0")); + assert!(reader.contains("Received 1 resource metrics")); + assert!(reader.contains("Received 5 metrics")); + assert!(reader.contains("Received 5 data points")); + assert!(reader.contains("Received 1 resource spans")); + assert!(reader.contains("Received 1 spans")); + assert!(reader.contains("Received 1 events")); + assert!(reader.contains("Received 1 links")); + assert!(reader.contains("Received 1 resource logs")); + assert!(reader.contains("Received 1 log records")); + assert!(reader.contains("Received 1 events")); + assert!(reader.contains("Received 1 resource profiles")); + assert!(reader.contains("Received 0 samples")); + assert!(reader.contains("Shutdown message received")); + }) + } + } + + #[test] + fn test_debug_exporter_basic_verbosity() { + let test_runtime = TestRuntime::new(); + let output_file = "debug_output_basic.txt".to_string(); + let config = Config::new(Verbosity::Basic); + let exporter = ExporterWrapper::local( + DebugExporter::new(config, Some(output_file.clone())), + test_runtime.config(), + ); + + test_runtime + .set_exporter(exporter) + .run_test(scenario()) + .run_validation(validation_procedure(output_file.clone())); + + // remove the created file, prevent accidental check in of report + remove_file(output_file).expect("Failed to remove file"); + } + + #[test] + fn test_debug_exporter_normal_verbosity() { + let test_runtime = TestRuntime::new(); + let output_file = "debug_output_normal.txt".to_string(); + let config = Config::new(Verbosity::Normal); + let exporter = ExporterWrapper::local( + DebugExporter::new(config, Some(output_file.clone())), + test_runtime.config(), + ); + + test_runtime + .set_exporter(exporter) + .run_test(scenario()) + .run_validation(validation_procedure(output_file.clone())); + + // remove the created file, prevent accidental check in of report + remove_file(output_file).expect("Failed to remove file"); + } + + #[test] + fn test_debug_exporter_detailed_verbosity() { + let test_runtime = TestRuntime::new(); + let output_file = "debug_output_detailed.txt".to_string(); + let config = Config::new(Verbosity::Detailed); + let exporter = ExporterWrapper::local( + DebugExporter::new(config, Some(output_file.clone())), + test_runtime.config(), + ); + + test_runtime + .set_exporter(exporter) + .run_test(scenario()) + .run_validation(validation_procedure(output_file.clone())); + + // remove the created file, prevent accidental check in of report + remove_file(output_file).expect("Failed to remove file"); + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/marshaler.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/marshaler.rs new file mode 100644 index 0000000000..bf306f18de --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/marshaler.rs @@ -0,0 +1,75 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the OTLPMarshaler for converting OTLP messages to structured string reports. +//! + +use crate::proto::opentelemetry::{ + collector::{ + logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest, + profiles::v1development::ExportProfilesServiceRequest, + trace::v1::ExportTraceServiceRequest, + }, + common::v1::{AnyValue, any_value::Value}, +}; +use std::fmt; +use std::fmt::Write; + +/// Trait that provides methods to take OTLP messages and extract information from them and generate a report +pub trait OTLPMarshaler { + /// extract data from logs and generate string report + fn marshal_logs(&self, logs: ExportLogsServiceRequest) -> String; + /// extract data from metricss and generate string report + fn marshal_metrics(&self, metrics: ExportMetricsServiceRequest) -> String; + /// extract data from traces and generate string report + fn marshal_traces(&self, traces: ExportTraceServiceRequest) -> String; + /// extract data from profiles and generate string report + fn marshal_profiles(&self, profiles: ExportProfilesServiceRequest) -> String; +} + +impl fmt::Display for AnyValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(value) = &self.value { + match value { + Value::StringValue(string) => { + write!(f, "{string}")?; + } + Value::BoolValue(bool) => { + write!(f, "{bool}")?; + } + Value::IntValue(int) => { + write!(f, "{int}")?; + } + Value::DoubleValue(double) => { + write!(f, "{double}")?; + } + Value::ArrayValue(array) => { + let values = &array.values; + write!(f, "{values:?}")?; + } + Value::KvlistValue(kvlist) => { + let mut kv_string = String::new(); + for kv in kvlist.values.iter() { + if let Some(value) = &kv.value { + _ = write!( + &mut kv_string, + "{key}={value} ", + key = kv.key, + value = value + ); + } + } + write!(f, "{kv_string}")?; + } + Value::BytesValue(bytes) => { + if let Ok(byte_string) = String::from_utf8(bytes.to_vec()) { + write!(f, "{byte_string}")?; + } + write!(f, "")?; + } + } + } else { + write!(f, "")?; + } + Ok(()) + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/mod.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/mod.rs new file mode 100644 index 0000000000..0d51289821 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/mod.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the Debug Exporter node +//! +/// allows the user to configure their perf exporter +pub mod config; +/// implements the debug counter to allow the debug exporter to keep track of certain stats +pub mod counter; +/// implements the otlp marshaler trait for a detailed verbosity output +pub mod detailed_otlp_marshaler; +/// debug exporter implementation +pub mod exporter; +/// helps take otlp data and extract data to report on +pub mod marshaler; +/// implements the otlp marshaler trait for a normal verbosity output +pub mod normal_otlp_marshaler; diff --git a/rust/otap-dataflow/crates/otlp/src/debug_exporter/normal_otlp_marshaler.rs b/rust/otap-dataflow/crates/otlp/src/debug_exporter/normal_otlp_marshaler.rs new file mode 100644 index 0000000000..f9b1d8cc78 --- /dev/null +++ b/rust/otap-dataflow/crates/otlp/src/debug_exporter/normal_otlp_marshaler.rs @@ -0,0 +1,489 @@ +// SPDX-License-Identifier: Apache-2.0 + +//! Implementation of the OTLPMarshaler for converting OTLP messages to structured string reports. +//! + +use crate::debug_exporter::marshaler::OTLPMarshaler; +use crate::proto::opentelemetry::{ + collector::{ + logs::v1::ExportLogsServiceRequest, metrics::v1::ExportMetricsServiceRequest, + profiles::v1development::ExportProfilesServiceRequest, + trace::v1::ExportTraceServiceRequest, + }, + common::v1::KeyValue, + metrics::v1::{ + ExponentialHistogramDataPoint, HistogramDataPoint, Metric, NumberDataPoint, + SummaryDataPoint, metric::Data, number_data_point::Value as NumberValue, + }, +}; +use std::fmt::Write; + +/// The Normal Marshaler takes OTLP messages and converts them to a string by extracting their information +/// the finalized string will be the output for a normal verbosity level +#[derive(Default)] +pub struct NormalOTLPMarshaler; + +impl OTLPMarshaler for NormalOTLPMarshaler { + fn marshal_logs(&self, logs: ExportLogsServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_log) in logs.resource_logs.iter().enumerate() { + let mut resource_attributes = String::new(); + if let Some(resource) = &resource_log.resource { + resource_attributes = attributes_string_normal(&resource.attributes); + } + + _ = writeln!( + &mut report, + "ResourceLog #{resource_index}, Schema:[{schema}], Attributes: {attributes}", + schema = resource_log.schema_url, + attributes = resource_attributes + ); + + for (scope_index, scope_log) in resource_log.scope_logs.iter().enumerate() { + if let Some(scope) = &scope_log.scope { + _ = writeln!( + &mut report, + " ScopeLog #{scope_index}, Name: {name}, Version: @{version}, Schema: [{schema}], Attributes: {attributes}", + name = scope.name, + version = scope.version, + schema = scope_log.schema_url, + attributes = attributes_string_normal(&scope.attributes) + ); + } else { + _ = writeln!( + &mut report, + " ScopeLog #{scope_index}, Schema:, Schema: [{schema}]", + schema = scope_log.schema_url, + ); + } + + for log_record in scope_log.log_records.iter() { + if let Some(body) = &log_record.body { + _ = write!(&mut report, " Body: {body}, "); + } + // TODO + _ = writeln!( + &mut report, + "Attributes: {attributes}", + attributes = attributes_string_normal(&log_record.attributes) + ); + } + } + } + report + } + fn marshal_metrics(&self, metrics: ExportMetricsServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_metric) in metrics.resource_metrics.iter().enumerate() { + let mut resource_attributes = String::new(); + if let Some(resource) = &resource_metric.resource { + resource_attributes = attributes_string_normal(&resource.attributes); + } + + _ = writeln!( + &mut report, + "ResourceMetric #{resource_index}, Schema:[{schema}], Attributes: {attributes}", + schema = resource_metric.schema_url, + attributes = resource_attributes + ); + + for (scope_index, scope_metric) in resource_metric.scope_metrics.iter().enumerate() { + if let Some(scope) = &scope_metric.scope { + _ = writeln!( + &mut report, + " ScopeMetric #{scope_index}, Name: {name}, Version: @{version}, Schema: [{schema}], Attributes: {attributes}", + name = scope.name, + version = scope.version, + schema = scope_metric.schema_url, + attributes = attributes_string_normal(&scope.attributes) + ); + } else { + _ = writeln!( + &mut report, + " ScopeMetric #{scope_index}, Schema: [{schema}]", + schema = scope_metric.schema_url, + ); + } + + for metric in scope_metric.metrics.iter() { + if let Some(data) = &metric.data { + match data { + Data::Gauge(gauge) => write_number_datapoints_normal( + &mut report, + metric, + &gauge.data_points, + ), + Data::Sum(sum) => write_number_datapoints_normal( + &mut report, + metric, + &sum.data_points, + ), + Data::Histogram(histogram) => write_histogram_datapoints_normal( + &mut report, + metric, + &histogram.data_points, + ), + Data::ExponentialHistogram(exponential_histogram) => { + write_exponential_histogram_datapoints_normal( + &mut report, + metric, + &exponential_histogram.data_points, + ) + } + Data::Summary(summary) => write_summary_datapoints_normal( + &mut report, + metric, + &summary.data_points, + ), + } + } + } + } + } + report + } + fn marshal_traces(&self, traces: ExportTraceServiceRequest) -> String { + let mut report = String::new(); + for (resource_index, resource_span) in traces.resource_spans.iter().enumerate() { + let mut resource_attributes = String::new(); + if let Some(resource) = &resource_span.resource { + resource_attributes = attributes_string_normal(&resource.attributes); + } + _ = writeln!( + &mut report, + "ResourceSpan #{resource_index}, Schema:[{schema}], Attributes: {attributes}", + schema = resource_span.schema_url, + attributes = resource_attributes + ); + + for (scope_index, scope_span) in resource_span.scope_spans.iter().enumerate() { + if let Some(scope) = &scope_span.scope { + _ = writeln!( + &mut report, + " ScopeSpan #{scope_index}, Name: {name}, Version: @{version}, Schema: [{schema}], Attributes: {attributes}", + name = scope.name, + version = scope.version, + schema = scope_span.schema_url, + attributes = attributes_string_normal(&scope.attributes) + ); + } else { + _ = writeln!( + &mut report, + " ScopeSpan #{scope_index}, Schema: [{schema}]", + schema = scope_span.schema_url, + ); + } + + for span in scope_span.spans.iter() { + // write line " {name} {trace_id} {span_id} {attributes}" + _ = write!(&mut report, " Name: {name}, ", name = &span.name,); + if let Ok(trace_id) = String::from_utf8(span.trace_id.clone()) { + _ = write!(&mut report, "Trace ID: {trace_id}, "); + } + if let Ok(span_id) = String::from_utf8(span.span_id.clone()) { + _ = write!(&mut report, "Span ID: {span_id}, "); + } + + _ = writeln!( + &mut report, + "Attributes: {attributes}", + attributes = attributes_string_normal(&span.attributes) + ); + } + } + } + report + } + fn marshal_profiles(&self, profiles: ExportProfilesServiceRequest) -> String { + // marshal_profiles to string based on verbosity + let mut report = String::new(); + for (resource_index, resource_profile) in profiles.resource_profiles.iter().enumerate() { + let mut resource_attributes = String::new(); + if let Some(resource) = &resource_profile.resource { + resource_attributes = attributes_string_normal(&resource.attributes); + } + + _ = writeln!( + &mut report, + "ResourceProfile #{resource_index}, Schema:[{schema}], Attributes: {attributes}", + schema = resource_profile.schema_url, + attributes = resource_attributes + ); + for (scope_index, scope_profile) in resource_profile.scope_profiles.iter().enumerate() { + if let Some(scope) = &scope_profile.scope { + _ = writeln!( + &mut report, + " ScopeProfile #{scope_index}, Name: {name}, Version: @{version}, Schema: [{schema}], Attributes: {attributes}", + name = scope.name, + version = scope.version, + schema = scope_profile.schema_url, + attributes = attributes_string_normal(&scope.attributes) + ); + } else { + _ = writeln!( + &mut report, + " ScopeProfile #{scope_index}:, Schema: [{schema}]", + schema = scope_profile.schema_url, + ); + } + + for _ in scope_profile.profiles.iter() { + // Todo: use the attributes indicies from the profile object to get the attributes from the attribute table + } + } + } + report + } +} + +fn attributes_string_normal(attributes: &[KeyValue]) -> String { + let mut attribute_string = String::new(); + for attribute in attributes.iter() { + if let Some(value) = &attribute.value { + _ = write!(&mut attribute_string, "{key}={value} ", key = attribute.key,); + } + } + + attribute_string +} + +fn write_number_datapoints_normal( + mut report: &mut String, + metric: &Metric, + datapoints: &[NumberDataPoint], +) { + for datapoint in datapoints.iter() { + let datapoint_attributes = attributes_string_normal(&datapoint.attributes); + if let Some(value) = datapoint.value { + match value { + NumberValue::AsDouble(value) => { + _ = writeln!( + &mut report, + " {name} {attributes}{value}", + name = metric.name, + attributes = datapoint_attributes, + ); + } + NumberValue::AsInt(value) => { + _ = writeln!( + &mut report, + " {name} {attributes}{value}", + name = metric.name, + attributes = datapoint_attributes, + ); + } + } + } + } +} + +fn write_histogram_datapoints_normal( + mut report: &mut String, + metric: &Metric, + datapoints: &[HistogramDataPoint], +) { + for datapoint in datapoints.iter() { + let mut values = String::new(); + _ = write!(&mut values, "count={count} ", count = datapoint.count); + if let Some(sum) = datapoint.sum { + _ = write!(&mut values, "sum={sum} "); + } + if let Some(min) = datapoint.min { + _ = write!(&mut values, "min={min} "); + } + if let Some(max) = datapoint.max { + _ = write!(&mut values, "max={max} "); + } + + for (i, bucket) in datapoint.bucket_counts.iter().enumerate() { + let mut bucket_bound = String::new(); + if i < datapoint.explicit_bounds.len() { + bucket_bound = format!("le{bound}=", bound = datapoint.explicit_bounds[i]); + } + _ = write!(&mut values, "{bucket_bound}{bucket} "); + } + + _ = writeln!( + &mut report, + " {name} {attributes}{values}", + name = metric.name, + attributes = attributes_string_normal(&datapoint.attributes), + ); + } +} + +fn write_exponential_histogram_datapoints_normal( + mut report: &mut String, + metric: &Metric, + datapoints: &[ExponentialHistogramDataPoint], +) { + for datapoint in datapoints.iter() { + let mut values = String::new(); + _ = write!(&mut values, "count={count} ", count = datapoint.count); + + if let Some(sum) = datapoint.sum { + _ = write!(&mut values, "sum={sum} "); + } + if let Some(min) = datapoint.min { + _ = write!(&mut values, "min={min} "); + } + if let Some(max) = datapoint.max { + _ = write!(&mut values, "max={max} "); + } + + _ = writeln!( + &mut report, + " {name} {attributes}{values}", + name = metric.name, + attributes = attributes_string_normal(&datapoint.attributes), + ); + } +} + +fn write_summary_datapoints_normal( + mut report: &mut String, + metric: &Metric, + datapoints: &[SummaryDataPoint], +) { + for datapoint in datapoints.iter() { + let mut values = String::new(); + + _ = write!(&mut values, "count={count} ", count = datapoint.count); + _ = write!(&mut values, "sum={sum} ", sum = datapoint.sum); + + for quantile in datapoint.quantile_values.iter() { + write!( + &mut values, + "q{quantile}={value} ", + quantile = quantile.quantile, + value = quantile.value + ) + .unwrap(); + } + + _ = writeln!( + &mut report, + " {name} {attributes}{values}", + name = metric.name, + attributes = attributes_string_normal(&datapoint.attributes), + ); + } +} + +#[cfg(test)] +mod tests { + + use crate::debug_exporter::marshaler::OTLPMarshaler; + use crate::debug_exporter::normal_otlp_marshaler::NormalOTLPMarshaler; + use crate::mock::{ + create_otlp_log, create_otlp_metric, create_otlp_profile, create_otlp_trace, + }; + + #[test] + fn test_marshal_traces() { + let trace = create_otlp_trace(1, 1, 1, 1, 1); + + let marshaler = NormalOTLPMarshaler; + + let marshaled_trace = marshaler.marshal_traces(trace); + + let mut output_lines = Vec::new(); + for line in marshaled_trace.lines() { + output_lines.push(line); + } + + assert_eq!( + output_lines[0], + "ResourceSpan #0, Schema:[http://schema.opentelemetry.io], Attributes: ip=192.168.0.1 " + ); + assert_eq!( + output_lines[1], + " ScopeSpan #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com " + ); + assert_eq!( + output_lines[2], + " Name: user-account, Trace ID: 4327e52011a22f9662eac217d77d1ec0, Span ID: 7271ee06d7e5925f, Attributes: hostname=host4.gov " + ) + } + + #[test] + fn test_marshal_metrics() { + let metrics = create_otlp_metric(1, 1, 5, 1); + let marshaler = NormalOTLPMarshaler; + let marshaled_metrics = marshaler.marshal_metrics(metrics); + let mut output_lines = Vec::new(); + for line in marshaled_metrics.lines() { + output_lines.push(line); + } + + assert_eq!( + output_lines[0], + "ResourceMetric #0, Schema:[http://schema.opentelemetry.io], Attributes: ip=192.168.0.2 " + ); + assert_eq!( + output_lines[1], + " ScopeMetric #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: instrumentation_scope_k1=k1 value " + ); + assert_eq!(output_lines[2], " system.cpu.time 0"); + assert_eq!( + output_lines[3], + " system.cpu.time freq=3GHz count=0 sum=56 min=12 max=100.1 " + ); + assert_eq!( + output_lines[4], + " system.cpu.time freq=3GHz count=0 sum=56 min=12 max=100.1 le94.17542094619048=0 " + ); + assert_eq!( + output_lines[5], + " system.cpu.time cpu_logical_processors=8 0" + ); + assert_eq!( + output_lines[6], + " system.cpu.time cpu_cores=4 count=0 sum=56 q0=0 " + ); + } + + #[test] + fn test_marshal_logs() { + let logs = create_otlp_log(1, 1, 1); + let marshaler = NormalOTLPMarshaler; + let marshaled_logs = marshaler.marshal_logs(logs); + let mut output_lines = Vec::new(); + for line in marshaled_logs.lines() { + output_lines.push(line); + } + + assert_eq!( + output_lines[0], + "ResourceLog #0, Schema:[http://schema.opentelemetry.io], Attributes: version=2.0 " + ); + assert_eq!( + output_lines[1], + " ScopeLog #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com " + ); + assert_eq!( + output_lines[2], + " Body: Sint impedit non ut eligendi nisi neque harum maxime adipisci., Attributes: hostname=host3.thedomain.edu " + ); + } + + #[test] + fn test_marshal_profiles() { + let profiles = create_otlp_profile(1, 1, 1); + let marshaler = NormalOTLPMarshaler; + let marshaled_profiles = marshaler.marshal_profiles(profiles); + let mut output_lines = Vec::new(); + for line in marshaled_profiles.lines() { + output_lines.push(line); + } + + assert_eq!( + output_lines[0], + "ResourceProfile #0, Schema:[http://schema.opentelemetry.io], Attributes: hostname=host7.com " + ); + assert_eq!( + output_lines[1], + " ScopeProfile #0, Name: library, Version: @v1, Schema: [http://schema.opentelemetry.io], Attributes: hostname=host5.retailer.com " + ); + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/lib.rs b/rust/otap-dataflow/crates/otlp/src/lib.rs index 0667749059..668e6b3b26 100644 --- a/rust/otap-dataflow/crates/otlp/src/lib.rs +++ b/rust/otap-dataflow/crates/otlp/src/lib.rs @@ -36,6 +36,7 @@ use otap_df_engine::shared::{ /// compression formats pub mod compression; +pub mod debug_exporter; /// gRPC service implementation pub mod grpc; /// otlp exporter implementation diff --git a/rust/otap-dataflow/crates/otlp/src/mock.rs b/rust/otap-dataflow/crates/otlp/src/mock.rs index 7251640879..5ddce59607 100644 --- a/rust/otap-dataflow/crates/otlp/src/mock.rs +++ b/rust/otap-dataflow/crates/otlp/src/mock.rs @@ -7,20 +7,38 @@ //! use crate::grpc::OTLPData; -use crate::proto::opentelemetry::collector::{ - logs::v1::{ - ExportLogsServiceRequest, ExportLogsServiceResponse, logs_service_server::LogsService, +use crate::proto::opentelemetry::{ + collector::{ + logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, logs_service_server::LogsService, + }, + metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, + metrics_service_server::MetricsService, + }, + profiles::v1development::{ + ExportProfilesServiceRequest, ExportProfilesServiceResponse, + profiles_service_server::ProfilesService, + }, + trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, + trace_service_server::TraceService, + }, }, + common::v1::{AnyValue, InstrumentationScope, KeyValue, any_value::Value}, + logs::v1::{LogRecord, ResourceLogs, ScopeLogs}, metrics::v1::{ - ExportMetricsServiceRequest, ExportMetricsServiceResponse, - metrics_service_server::MetricsService, - }, - profiles::v1development::{ - ExportProfilesServiceRequest, ExportProfilesServiceResponse, - profiles_service_server::ProfilesService, + Exemplar, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge, Histogram, + HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, Summary, + SummaryDataPoint, exemplar::Value as ExemplarValue, + exponential_histogram_data_point::Buckets, metric::Data, + number_data_point::Value as NumberValue, summary_data_point::ValueAtQuantile, }, + profiles::v1development::{Profile, ResourceProfiles, ScopeProfiles}, + resource::v1::Resource, trace::v1::{ - ExportTraceServiceRequest, ExportTraceServiceResponse, trace_service_server::TraceService, + ResourceSpans, ScopeSpans, Span, Status as SpanStatus, + span::{Event, Link}, }, }; use tokio::sync::mpsc::Sender; @@ -140,3 +158,478 @@ impl ProfilesService for ProfilesServiceMock { })) } } + +pub fn create_otlp_metric( + resource_metrics_count: usize, + scope_metrics_count: usize, + metric_count: usize, + datapoint_count: usize, +) -> ExportMetricsServiceRequest { + let mut resource_metrics: Vec = vec![]; + + for _ in 0..resource_metrics_count { + let mut scope_metrics: Vec = vec![]; + for _ in 0..scope_metrics_count { + let mut metrics: Vec = vec![]; + for metric_index in 0..metric_count { + let metric_data = if (metric_index + 1) % 5 == 0 { + // summary datapoint + let mut datapoints = vec![]; + for _ in 0..datapoint_count { + datapoints.push(SummaryDataPoint { + start_time_unix_nano: 1650499200000000100, + time_unix_nano: 1663718400000001400, + attributes: vec![KeyValue { + key: "cpu_cores".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("4".to_string())), + }), + }], + sum: 56.0, + count: 0, + flags: 0, + quantile_values: vec![ValueAtQuantile { + quantile: 0.0, + value: 0.0, + }], + }); + } + Data::Summary(Summary { + data_points: datapoints.clone(), + }) + } else if (metric_index + 1) % 4 == 0 { + // sum datapoint + let mut datapoints = vec![]; + for datapoint in 0..datapoint_count { + datapoints.push(NumberDataPoint { + start_time_unix_nano: 1650499200000000000, + time_unix_nano: 1663718400000001400, + attributes: vec![KeyValue { + key: "cpu_logical_processors".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("8".to_string())), + }), + }], + value: Some(NumberValue::AsInt(datapoint as i64)), + flags: 0, + exemplars: vec![Exemplar { + time_unix_nano: 1663718400000001400, + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + value: Some(ExemplarValue::AsDouble(22.2)), + filtered_attributes: vec![KeyValue { + key: "************".to_string(), + value: Some(AnyValue { + value: Some(Value::BoolValue(true)), + }), + }], + }], + }); + } + + Data::Sum(Sum { + data_points: datapoints.clone(), + aggregation_temporality: 4, // AGGREGATION_TEMPORALITY_DELTA + is_monotonic: true, + }) + } else if (metric_index + 1) % 3 == 0 { + // histogram datapoint + let mut datapoints = vec![]; + for _ in 0..datapoint_count { + datapoints.push(HistogramDataPoint { + attributes: vec![KeyValue { + key: "freq".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("3GHz".to_string())), + }), + }], + start_time_unix_nano: 1650499200000000000, + time_unix_nano: 1663718400000001400, + explicit_bounds: vec![94.17542094619048, 65.66722851519177], + bucket_counts: vec![0], + sum: Some(56.0), + count: 0, + flags: 0, + min: Some(12.0), + max: Some(100.1), + exemplars: vec![Exemplar { + time_unix_nano: 1663718400000001400, + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + value: Some(ExemplarValue::AsDouble(22.2)), + filtered_attributes: vec![KeyValue { + key: "cpu".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(0)), + }), + }], + }], + }); + } + + Data::Histogram(Histogram { + data_points: datapoints.clone(), + aggregation_temporality: 4, // AGGREGATION_TEMPORALITY_DELTA + }) + } else if (metric_index + 1) % 2 == 0 { + // exponential histogram datapoint + let mut datapoints = vec![]; + for _ in 0..datapoint_count { + datapoints.push(ExponentialHistogramDataPoint { + attributes: vec![KeyValue { + key: "freq".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("3GHz".to_string())), + }), + }], + start_time_unix_nano: 1650499200000000000, + time_unix_nano: 1663718400000001400, + sum: Some(56.0), + count: 0, + flags: 0, + min: Some(12.0), + max: Some(100.1), + exemplars: vec![Exemplar { + time_unix_nano: 1663718400000001400, + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + value: Some(ExemplarValue::AsDouble(22.2)), + filtered_attributes: vec![KeyValue { + key: "cpu".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(0)), + }), + }], + }], + scale: 1, + positive: Some(Buckets { + offset: 0, + bucket_counts: vec![0], + }), + negative: Some(Buckets { + offset: 0, + bucket_counts: vec![0], + }), + zero_threshold: 0.0, + zero_count: 0, + }); + } + + Data::ExponentialHistogram(ExponentialHistogram { + data_points: datapoints.clone(), + aggregation_temporality: 4, // AGGREGATION_TEMPORALITY_DELTA + }) + } else { + // gauge datapoint + let mut datapoints = vec![]; + for datapoint in 0..datapoint_count { + datapoints.push(NumberDataPoint { + start_time_unix_nano: 1650499200000000100, + time_unix_nano: 1663718400000001400, + attributes: vec![], + value: Some(NumberValue::AsInt(datapoint as i64)), + flags: 0, + exemplars: vec![], + }); + } + Data::Gauge(Gauge { + data_points: datapoints.clone(), + }) + }; + + metrics.push(Metric { + name: "system.cpu.time".to_string(), + description: "time cpu has ran".to_string(), + unit: "s".to_string(), + metadata: vec![], + data: Some(metric_data), + }); + } + scope_metrics.push(ScopeMetrics { + schema_url: "http://schema.opentelemetry.io".to_string(), + scope: Some(InstrumentationScope { + name: "library".to_string(), + version: "v1".to_string(), + attributes: vec![KeyValue { + key: "instrumentation_scope_k1".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("k1 value".to_string())), + }), + }], + dropped_attributes_count: 5, + }), + metrics: metrics.clone(), + }); + } + + resource_metrics.push(ResourceMetrics { + schema_url: "http://schema.opentelemetry.io".to_string(), + resource: Some(Resource { + attributes: vec![KeyValue { + key: "ip".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("192.168.0.2".to_string())), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_metrics: scope_metrics.clone(), + }); + } + + ExportMetricsServiceRequest { + resource_metrics: resource_metrics.clone(), + } +} + +pub fn create_otlp_trace( + resource_spans_count: usize, + scope_spans_count: usize, + span_count: usize, + event_count: usize, + link_count: usize, +) -> ExportTraceServiceRequest { + let mut resource_spans: Vec = vec![]; + + for _ in 0..resource_spans_count { + let mut scope_spans: Vec = vec![]; + for _ in 0..scope_spans_count { + let mut spans: Vec = vec![]; + for _ in 0..span_count { + let mut links: Vec = vec![]; + for _ in 0..link_count { + links.push(Link { + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host2.org".to_string())), + }), + }], + trace_state: "ended".to_string(), + dropped_attributes_count: 0, + flags: 4, + }); + } + let mut events: Vec = vec![]; + for _ in 0..event_count { + events.push(Event { + time_unix_nano: 1647648000000000108, + name: "message-receive".to_string(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host5.retailer.com".to_string())), + }), + }], + dropped_attributes_count: 0, + }); + } + spans.push(Span { + end_time_unix_nano: 1647648000000000104, + start_time_unix_nano: 1647648000000000106, + name: "user-account".to_string(), + kind: 4, + trace_state: "ended".to_string(), + status: Some(SpanStatus { + code: 2, + message: "Error".to_string(), + }), + links: links.clone(), + events: events.clone(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host4.gov".to_string())), + }), + }], + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + parent_span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + dropped_attributes_count: 0, + flags: 4, + dropped_events_count: 0, + dropped_links_count: 0, + }); + } + scope_spans.push(ScopeSpans { + schema_url: "http://schema.opentelemetry.io".to_string(), + scope: Some(InstrumentationScope { + name: "library".to_string(), + version: "v1".to_string(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host5.retailer.com".to_string())), + }), + }], + dropped_attributes_count: 5, + }), + spans: spans.clone(), + }); + } + + resource_spans.push(ResourceSpans { + schema_url: "http://schema.opentelemetry.io".to_string(), + resource: Some(Resource { + attributes: vec![KeyValue { + key: "ip".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("192.168.0.1".to_string())), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_spans: scope_spans.clone(), + }); + } + + ExportTraceServiceRequest { + resource_spans: resource_spans.clone(), + } +} + +pub fn create_otlp_log( + resource_logs_count: usize, + scope_logs_count: usize, + log_records_count: usize, +) -> ExportLogsServiceRequest { + let mut resource_logs: Vec = vec![]; + + for _ in 0..resource_logs_count { + let mut scope_logs: Vec = vec![]; + for _ in 0..scope_logs_count { + let mut log_records: Vec = vec![]; + for _ in 0..log_records_count { + log_records.push(LogRecord { + time_unix_nano: 2_000_000_000, + observed_time_unix_nano: 1663718400000001300, + severity_text: "INFO".to_string(), + severity_number: 2, + event_name: "event1".to_string(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host3.thedomain.edu".to_string())), + }), + }], + trace_id: Vec::from("4327e52011a22f9662eac217d77d1ec0".as_bytes()), + span_id: Vec::from("7271ee06d7e5925f".as_bytes()), + body: Some(AnyValue { + value: Some(Value::StringValue( + "Sint impedit non ut eligendi nisi neque harum maxime adipisci." + .to_string(), + )), + }), + flags: 8, + dropped_attributes_count: 0, + }); + } + scope_logs.push(ScopeLogs { + schema_url: "http://schema.opentelemetry.io".to_string(), + scope: Some(InstrumentationScope { + name: "library".to_string(), + version: "v1".to_string(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host5.retailer.com".to_string())), + }), + }], + dropped_attributes_count: 5, + }), + log_records: log_records.clone(), + }); + } + + resource_logs.push(ResourceLogs { + schema_url: "http://schema.opentelemetry.io".to_string(), + resource: Some(Resource { + attributes: vec![KeyValue { + key: "version".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("2.0".to_string())), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_logs: scope_logs.clone(), + }); + } + + ExportLogsServiceRequest { + resource_logs: resource_logs.clone(), + } +} + +pub fn create_otlp_profile( + resource_profiles_count: usize, + scope_profiles_count: usize, + profile_count: usize, +) -> ExportProfilesServiceRequest { + let mut resource_profiles: Vec = vec![]; + + for _ in 0..resource_profiles_count { + let mut scope_profiles: Vec = vec![]; + for _ in 0..scope_profiles_count { + let mut profiles: Vec = vec![]; + for _ in 0..profile_count { + profiles.push(Profile { + sample_type: vec![], + sample: vec![], + location_indices: vec![], + time_nanos: 0, + duration_nanos: 0, + period_type: None, + period: 0, + comment_strindices: vec![], + default_sample_type_index: 0, + profile_id: vec![], + dropped_attributes_count: 0, + original_payload: vec![], + original_payload_format: "".to_string(), + attribute_indices: vec![], + }); + } + + scope_profiles.push(ScopeProfiles { + schema_url: "http://schema.opentelemetry.io".to_string(), + scope: Some(InstrumentationScope { + name: "library".to_string(), + version: "v1".to_string(), + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host5.retailer.com".to_string())), + }), + }], + dropped_attributes_count: 5, + }), + profiles: profiles.clone(), + }); + } + + resource_profiles.push(ResourceProfiles { + schema_url: "http://schema.opentelemetry.io".to_string(), + resource: Some(Resource { + attributes: vec![KeyValue { + key: "hostname".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("host7.com".to_string())), + }), + }], + dropped_attributes_count: 0, + entity_refs: vec![], + }), + scope_profiles: scope_profiles.clone(), + }); + } + + ExportProfilesServiceRequest { + resource_profiles: resource_profiles.clone(), + } +} diff --git a/rust/otap-dataflow/crates/otlp/src/otlp_exporter.rs b/rust/otap-dataflow/crates/otlp/src/otlp_exporter.rs index 1f2da14c60..5ee72b995c 100644 --- a/rust/otap-dataflow/crates/otlp/src/otlp_exporter.rs +++ b/rust/otap-dataflow/crates/otlp/src/otlp_exporter.rs @@ -3,7 +3,7 @@ //! Implementation of the OTLP exporter node //! //! ToDo: Handle Ack and Nack messages in the pipeline -//! ToDo: Handle configuratin changes +//! ToDo: Handle configuration changes //! ToDo: Implement proper deadline function for Shutdown ctrl msg use crate::LOCAL_EXPORTERS;