diff --git a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs index 8a50976fd8..7a75934ed7 100644 --- a/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs +++ b/rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs @@ -334,7 +334,6 @@ impl PipelineCtrlMsgManager { otel_debug!( "pipeline.draining.ignored_start_timer", node_id = node_id, - "Ignoring StartTimer during shutdown draining" ); } else { self.tick_timers.start(node_id, duration); diff --git a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs index 133eb686b3..c4d7b83573 100644 --- a/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs +++ b/rust/otap-dataflow/crates/otap/src/experimental/recordset_kql_processor/processor.rs @@ -102,8 +102,8 @@ impl RecordsetKqlProcessor { otap_df_telemetry::otel_debug!( "processor.processing_logs", processor = "recordset_kql", - message = "Processing KQL query", input_items = input_items, + "Processing KQL query" ); self.process_logs(bytes, signal) } @@ -141,7 +141,7 @@ impl RecordsetKqlProcessor { "processor.failure", processor = "recordset_kql", input_items = input_items, - message = message, + error = message, ); effect_handler diff --git a/rust/otap-dataflow/crates/otap/src/fake_data_generator.rs b/rust/otap-dataflow/crates/otap/src/fake_data_generator.rs index b019079f68..75ec6832fd 100644 --- a/rust/otap-dataflow/crates/otap/src/fake_data_generator.rs +++ b/rust/otap-dataflow/crates/otap/src/fake_data_generator.rs @@ -354,7 +354,7 @@ impl local::Receiver for FakeGeneratorReceiver { otel_debug!( "rate_limit.sleep", sleep_duration_ms = remaining_time.as_millis() as u64, - message = "Sleeping to maintain configured signal rate" + "Sleeping to maintain configured signal rate" ); sleep(remaining_time).await; } @@ -362,7 +362,7 @@ impl local::Receiver for FakeGeneratorReceiver { } else { otel_debug!( "rate_limit.uncapped", - message = "Rate limiting disabled, continuing immediately" + "Rate limiting disabled, continuing immediately" ); } } diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs index adb648ca8d..27df5c5774 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs @@ -631,7 +631,7 @@ pub(crate) async fn connect_tcp_stream_with_proxy_config( ); ProxyError::ProxyConnectionFailed(e) })?; - otel_debug!("proxy.connected"); + otel_debug!("proxy.connected", host = proxy_host, port = proxy_port); // Apply socket options to the proxy connection let stream = apply_socket_options( diff --git a/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs b/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs index b67b429994..559ec057c5 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs @@ -195,10 +195,7 @@ impl Exporter for OTLPExporter { msg } else if inflight_exports.is_empty() { let msg = msg_chan.recv().await?; - otel_debug!( - "exporter.receive", - message = "Received message from pipeline" - ); + otel_debug!("exporter.receive", "Received message from pipeline"); msg } else { let completion_fut = inflight_exports.next_completion().fuse(); @@ -220,7 +217,7 @@ impl Exporter for OTLPExporter { } msg = recv_fut => { let msg = msg?; - otel_debug!("exporter.receive", message = "Received message from pipeline"); + otel_debug!("exporter.receive", "Received message from pipeline"); msg }, } diff --git a/rust/otap-dataflow/crates/otap/src/otlp_http.rs b/rust/otap-dataflow/crates/otap/src/otlp_http.rs index 2900371d16..b2eb986772 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_http.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_http.rs @@ -844,7 +844,7 @@ pub async fn serve( tokio::select! { res = &mut conn => { if let Err(err) = res { - otap_df_telemetry::otel_debug!("HttpConnectionError", error = err.to_string()); + otap_df_telemetry::otel_debug!("http.connection_error", error = err.to_string()); } }, _ = shutdown.cancelled() => { diff --git a/rust/otap-dataflow/crates/otap/src/tls_utils.rs b/rust/otap-dataflow/crates/otap/src/tls_utils.rs index b00c73a0f5..104fa93a91 100644 --- a/rust/otap-dataflow/crates/otap/src/tls_utils.rs +++ b/rust/otap-dataflow/crates/otap/src/tls_utils.rs @@ -364,7 +364,12 @@ async fn add_system_trust_anchors_if_enabled( let mut store = RootCertStore::empty(); // Best-effort: accept that some system certs might not parse. let (added, ignored) = store.add_parsable_certificates(roots); - otel_debug!("Loaded system CA certificates", added, ignored,); + otel_debug!( + "loaded.system.ca.certificates", + added = added, + ignored = ignored, + "Loaded system CA certificates" + ); Ok(tls.trust_anchors(store.roots)) } @@ -667,7 +672,7 @@ impl CaWatcherState { /// Process a file system event, potentially triggering a reload. fn process_event(&self, event: Event) { - otel_debug!("File watcher event", event = ?event); + otel_debug!("tls.file_watcher.event", event = ?event, "File watcher event"); // Filter out irrelevant event types early (before expensive path checks) if matches!(event.kind, notify::EventKind::Access(_)) { @@ -678,7 +683,10 @@ impl CaWatcherState { return; } - otel_debug!("Event matches our CA file, proceeding with reload check"); + otel_debug!( + "tls.file_watcher.match", + "Event matches our CA file, proceeding with reload check" + ); // Small delay to allow filesystem operations to complete (e.g., atomic renames). // This blocks the notify thread briefly, but is acceptable because: @@ -708,9 +716,10 @@ impl CaWatcherState { if !is_match { otel_debug!( - "Event not for our file", + "tls.file_watcher.no_match", event_paths = ?event.paths, watched_path = ?self.watched_path, + "Event not for our file" ); } @@ -723,23 +732,34 @@ impl CaWatcherState { let current_identity = match get_file_identity(&self.reload_path) { Ok(id) => id, Err(e) => { - otel_debug!("Failed to get file identity, skipping reload", error = ?e); + otel_debug!("tls.file_watcher.identity_error", error = ?e, "Failed to get file identity, skipping reload"); return false; } }; let prev_identity = self.last_identity.load(Ordering::Relaxed); if current_identity == prev_identity { - otel_debug!("File identity unchanged, skipping reload"); + otel_debug!( + "tls.file_watcher.identity_unchanged", + "File identity unchanged, skipping reload" + ); return false; } - otel_debug!("File identity changed", prev_identity, current_identity); + otel_debug!( + "tls.file_watcher.identity_changed", + prev_identity = prev_identity, + current_identity = current_identity, + "File identity changed" + ); // Check debounce window let now = current_timestamp(); let last = self.last_reload.load(Ordering::Relaxed); if now.saturating_sub(last) < CA_RELOAD_DEBOUNCE_SECS { - otel_debug!("Debouncing CA file change event"); + otel_debug!( + "tls.file_watcher.debounce", + "Debouncing CA file change event" + ); return false; } @@ -749,7 +769,10 @@ impl CaWatcherState { .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) .is_err() { - otel_debug!("CA reload already in progress, skipping"); + otel_debug!( + "tls.file_watcher.reload_in_progress", + "CA reload already in progress, skipping" + ); return false; } @@ -889,7 +912,11 @@ impl ReloadableClientCaVerifier { ) -> Result, io::Error> { // Initial load let ca_pem = read_file_with_limit_sync(&ca_file_path)?; - otel_debug!("Initial CA PEM size", size_bytes = ca_pem.len()); + otel_debug!( + "tls.ca.initial_load", + size_bytes = ca_pem.len(), + "Initial CA PEM size" + ); let verifier = build_webpki_verifier(&ca_pem, include_system_cas)?; let inner = Arc::new(ArcSwap::from_pointee(verifier)); @@ -1191,7 +1218,11 @@ fn build_webpki_verifier( )); } - otel_debug!("Built verifier with CA certificates", count); + otel_debug!( + "tls.ca.verifier_built", + count = count, + "Built verifier with CA certificates" + ); WebPkiClientVerifier::builder(roots.into()) .build() @@ -1204,7 +1235,11 @@ fn reload_ca_verifier( include_system_cas: bool, ) -> Result, io::Error> { let ca_pem = read_file_with_limit_sync(ca_path)?; - otel_debug!("Reloaded CA PEM", size_bytes = ca_pem.len()); + otel_debug!( + "tls.ca.reloaded", + size_bytes = ca_pem.len(), + "Reloaded CA PEM" + ); build_webpki_verifier(&ca_pem, include_system_cas) } @@ -1390,7 +1425,10 @@ async fn build_client_auth( if !has_ca_file && !has_ca_pem && !include_system_cas { // No client auth configured - otel_debug!("No client CA configured, disabling client authentication"); + otel_debug!( + "tls.mtls.disabled", + "No client CA configured, disabling client authentication" + ); return Ok(builder.with_no_client_auth()); } diff --git a/rust/otap-dataflow/crates/otap/tests/mtls_tests.rs b/rust/otap-dataflow/crates/otap/tests/mtls_tests.rs index 3013039005..fc6d1de5b6 100644 --- a/rust/otap-dataflow/crates/otap/tests/mtls_tests.rs +++ b/rust/otap-dataflow/crates/otap/tests/mtls_tests.rs @@ -107,7 +107,7 @@ async fn test_mtls_client_cert_verification() { match tls_acceptor.accept(stream).await { Ok(_tls_stream) => true, Err(e) => { - otel_debug!("Server handshake failed", error = ?e); + otel_debug!("handshake.failed", error = ?e, "Server handshake failed"); false } } diff --git a/rust/otap-dataflow/crates/otap/tests/otlp_exporter_proxy_tls.rs b/rust/otap-dataflow/crates/otap/tests/otlp_exporter_proxy_tls.rs index 5c0913d6c8..d581321161 100644 --- a/rust/otap-dataflow/crates/otap/tests/otlp_exporter_proxy_tls.rs +++ b/rust/otap-dataflow/crates/otap/tests/otlp_exporter_proxy_tls.rs @@ -193,7 +193,7 @@ async fn start_tls_logs_server() -> ( ) { if let Err(err) = rustls::crypto::ring::default_provider().install_default() { // It's fine if the provider is already installed (e.g. by another test) - otel_debug!("rustls default provider installation failed in test", error = ?err); + otel_debug!("provider.installation.failed", error = ?err, "rustls default provider installation failed in test"); } let (ca, ca_issuer) = new_ca(); diff --git a/rust/otap-dataflow/crates/telemetry/src/internal_events.rs b/rust/otap-dataflow/crates/telemetry/src/internal_events.rs index 1f5b2661cc..8dc472eace 100644 --- a/rust/otap-dataflow/crates/telemetry/src/internal_events.rs +++ b/rust/otap-dataflow/crates/telemetry/src/internal_events.rs @@ -69,12 +69,79 @@ macro_rules! otel_warn { /// # Example: /// ```ignore /// use otap_df_telemetry::otel_debug; -/// otel_debug!("processing.batch", batch_size = 100); +/// otel_debug!("processing.batch", batch_size = 100, "Processing batch of items"); +/// otel_debug!("processing.done", count = 5); // no message, just fields +/// otel_debug!("processing.start"); // event name only /// ``` #[macro_export] macro_rules! otel_debug { - ($name:expr $(, $($fields:tt)*)?) => { - $crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($($fields)*)? }, ""); + // Name only + ($name:expr) => { + $crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { }, ""); + }; + + // Name + bare literal message (no fields) + ($name:expr, $message:literal) => { + $crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { }, $message); + }; + + // With fields (and optional trailing message) - delegate to helper + ($name:expr, $($rest:tt)+) => { + $crate::__otel_debug_impl!(@munch [$name] [] $($rest)+) + }; +} + +/// Internal helper macro for otel_debug! to parse fields with optional trailing message. +#[doc(hidden)] +#[macro_export] +macro_rules! __otel_debug_impl { + // Terminal: trailing literal message only + (@munch [$name:expr] [$($fields:tt)*] $message:literal) => { + $crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($fields)* }, $message); + }; + + // Terminal: no more tokens (no trailing message) + (@munch [$name:expr] [$($fields:tt)*]) => { + $crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($fields)* }, ""); + }; + + // Munch: key = ?value, ... (debug format with comma, more tokens follow) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr, $($rest:tt)+) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,] $($rest)+) + }; + // Munch: key = ?value, (debug format with trailing comma, nothing follows) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr,) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,]) + }; + // Munch: key = ?value (debug format, last field, no comma) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,]) + }; + + // Munch: key = %value, ... (display format with comma, more tokens follow) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr, $($rest:tt)+) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,] $($rest)+) + }; + // Munch: key = %value, (display format with trailing comma, nothing follows) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr,) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,]) + }; + // Munch: key = %value (display format, last field, no comma) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,]) + }; + + // Munch: key = value, ... (regular with comma, more tokens follow) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr, $($rest:tt)+) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,] $($rest)+) + }; + // Munch: key = value, (regular with trailing comma, nothing follows) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr,) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,]) + }; + // Munch: key = value (regular, last field, no comma) + (@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr) => { + $crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,]) }; } @@ -155,4 +222,28 @@ mod tests { raw_error!("raw error message", error = ?err); raw_error!("simple error message"); } + + #[test] + fn test_otel_debug() { + otel_debug!("debug.event"); + } + + #[test] + fn test_otel_debug_with_attributes() { + otel_debug!("debug.event.with_attributes", value = 42); + } + + #[test] + fn test_otel_debug_with_message() { + otel_debug!("debug.event.with_message", "This is a debug message"); + } + + #[test] + fn test_otel_debug_with_message_and_attributes() { + otel_debug!( + "debug.event.with_message_and_attributes", + value = 42, + "This is a debug message" + ); + } } diff --git a/rust/otap-dataflow/crates/telemetry/src/tracing_init.rs b/rust/otap-dataflow/crates/telemetry/src/tracing_init.rs index cb31bf7c09..56060e79f6 100644 --- a/rust/otap-dataflow/crates/telemetry/src/tracing_init.rs +++ b/rust/otap-dataflow/crates/telemetry/src/tracing_init.rs @@ -206,7 +206,7 @@ mod tests { for level in ALL_LEVELS { let setup = test_setup(ProviderSetup::Noop, level); setup.with_subscriber(|| { - otel_debug!("debug"); + otel_debug!("debug", "debug message"); otel_info!("info"); otel_warn!("warn"); otel_error!("error"); @@ -227,7 +227,7 @@ mod tests { for level in ALL_LEVELS { let setup = test_setup(ProviderSetup::ConsoleDirect, level); setup.with_subscriber(|| { - otel_debug!("debug"); + otel_debug!("debug", "debug message"); otel_info!("info"); otel_warn!("warn"); otel_error!("error"); @@ -258,7 +258,7 @@ mod tests { let (reporter, receiver) = test_reporter(); let setup = test_setup(ProviderSetup::InternalAsync { reporter }, level); setup.with_subscriber(|| { - otel_debug!("debug"); + otel_debug!("debug", "debug message"); otel_info!("info"); otel_warn!("warn"); otel_error!("error"); @@ -283,7 +283,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Info); setup.with_subscriber(|| { - otel_debug!("filtered"); + otel_debug!("filtered", "debug message filtered out"); }); assert!( @@ -298,7 +298,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Warn); setup.with_subscriber(|| { - otel_debug!("filtered"); + otel_debug!("filtered", "debug message filtered out"); otel_info!("filtered"); otel_warn!("not_filtered"); }); @@ -315,7 +315,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Error); setup.with_subscriber(|| { - otel_debug!("filtered"); + otel_debug!("filtered", "debug message filtered out"); otel_info!("filtered"); otel_warn!("filtered"); otel_error!("not_filtered"); @@ -332,7 +332,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Off); setup.with_subscriber(|| { - otel_debug!("filtered"); + otel_debug!("filtered", "debug message filtered out"); otel_info!("filtered"); otel_warn!("filtered"); otel_error!("filtered"); @@ -347,7 +347,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Debug); setup.with_subscriber(|| { - otel_debug!("d"); + otel_debug!("d", "debug message"); otel_info!("i"); otel_warn!("w"); otel_error!("e"); @@ -403,7 +403,7 @@ mod tests { let setup = test_setup(ProviderSetup::InternalAsync { reporter }, LogLevel::Warn); setup.with_subscriber(|| { - otel_debug!("filtered"); + otel_debug!("filtered", "debug message filtered out"); otel_info!("filtered"); otel_warn!("not_filtered"); otel_error!("not_filtered");