diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index f279cbfd7f..0254a4b493 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -198,10 +198,16 @@ impl Controller { telemetry_config.logs.providers.uses_its_provider(), ) { (false, true) => { - otel_warn!("ITS provider requested yet internal pipeline nodes not defined") + otel_warn!( + "its.provider.missing_pipeline", + message = "ITS provider requested yet internal pipeline nodes not defined" + ) } (true, false) => { - otel_warn!("internal pipeline nodes defined yet ITS provider not requested") + otel_warn!( + "its.pipeline.missing_provider", + message = "Internal pipeline nodes defined yet ITS provider not requested" + ) } _ => {} }; diff --git a/rust/otap-dataflow/crates/otap/src/tls_utils.rs b/rust/otap-dataflow/crates/otap/src/tls_utils.rs index a8d0f91377..af7b9f336d 100644 --- a/rust/otap-dataflow/crates/otap/src/tls_utils.rs +++ b/rust/otap-dataflow/crates/otap/src/tls_utils.rs @@ -71,7 +71,7 @@ fn convert_native_certs_to_pem(cert_res: &rustls_native_certs::CertificateResult let mut pem_data = Vec::new(); for error in &cert_res.errors { - otel_warn!("Error loading native cert", error = ?error); + otel_warn!("tls.native_cert.load_error", error = ?error); } for cert in &cert_res.certs { @@ -111,11 +111,11 @@ pub async fn load_server_tls_config( ) { (Some(cert_file), Some(key_file), _, _) => { let cert = read_file_with_limit_async(cert_file).await.map_err(|e| { - otel_error!("Failed to read cert file", cert_file = ?cert_file, error = ?e); + otel_error!("tls.cert_file.read_error", cert_file = ?cert_file, error = ?e, message = "Failed to read cert file", ); e })?; let key = read_file_with_limit_async(key_file).await.map_err(|e| { - otel_error!("Failed to read key file", key_file = ?key_file, error = ?e); + otel_error!("tls.key_file.read_error", key_file = ?key_file, error = ?e, message = "Failed to read key file"); e })?; (cert, key) @@ -260,7 +260,7 @@ pub(crate) async fn load_client_tls_config( // Custom CA. if let Some(ca_file) = &config.ca_file { let ca_pem = read_file_with_limit_async(ca_file).await.map_err(|e| { - otel_error!("Failed to read CA file", ca_file = ?ca_file, error = ?e); + otel_error!("tls.ca_file.read_error", ca_file = ?ca_file, error = ?e, message = "Failed to read CA file"); e })?; tls = tls.ca_certificate(Certificate::from_pem(ca_pem)); @@ -292,25 +292,25 @@ pub(crate) async fn load_client_tls_config( ) { ((Some(cert_path), _), (Some(key_path), _)) => { let cert = read_file_with_limit_async(cert_path).await.map_err(|e| { - otel_error!("Failed to read client cert file", cert_path = ?cert_path, error = %e); + otel_error!("tls.client_cert_file.read_error", cert_path = ?cert_path, error = %e, message = "Failed to read client cert file"); e })?; let key = read_file_with_limit_async(key_path).await.map_err(|e| { - otel_error!("Failed to read client key file", key_path = ?key_path, error = ?e); + otel_error!("tls.client_key_file.read_error", key_path = ?key_path, error = ?e, message = "Failed to read client key file"); e })?; tls.identity(Identity::from_pem(cert, key)) } ((Some(cert_path), _), (None, Some(key_pem))) => { let cert = read_file_with_limit_async(cert_path).await.map_err(|e| { - otel_error!("Failed to read client cert file", cert_path = ?cert_path, error = ?e); + otel_error!("tls.client_cert_file.read_error", cert_path = ?cert_path, error = ?e, message = "Failed to read client cert file"); e })?; tls.identity(Identity::from_pem(cert, key_pem.as_bytes())) } ((None, Some(cert_pem)), (Some(key_path), _)) => { let key = read_file_with_limit_async(key_path).await.map_err(|e| { - otel_error!("Failed to read client key file", key_path = ?key_path, error = ?e); + otel_error!("tls.client_key_file.read_error", key_path = ?key_path, error = ?e, message = "Failed to read client key file"); e })?; tls.identity(Identity::from_pem(cert_pem.as_bytes(), key)) @@ -348,9 +348,10 @@ async fn add_system_trust_anchors_if_enabled( let native = load_native_certs(); if !native.errors.is_empty() { otel_warn!( - "Errors while loading native certificates", + "tls.native_cert.load_errors", count = native.errors.len(), first = ?native.errors.first(), + message = "Errors while loading native certificates", ); } native.certs @@ -368,7 +369,7 @@ async fn add_system_trust_anchors_if_enabled( "loaded.system.ca.certificates", added = added, ignored = ignored, - "Loaded system CA certificates" + message = "Loaded system CA certificates" ); Ok(tls.trust_anchors(store.roots)) @@ -412,11 +413,11 @@ where Ok(Ok(stream)) => Some(Ok::<_, io::Error>(stream)), Ok(Err(e)) => { // TLS handshake failed - log and continue - otel_warn!("TLS handshake failed", error = ?e); + otel_warn!("tls.handshake.failed", error = ?e, message = "TLS handshake failed"); None } Err(_) => { - otel_warn!("TLS handshake timed out"); + otel_warn!("tls.handshake.timeout", message = "TLS handshake timed out"); None } } @@ -526,7 +527,7 @@ impl LazyReloadableCertResolver { let current_cert_mtime = match get_mtime(&self.cert_path) { Ok(m) => m, Err(e) => { - otel_warn!("Failed to check cert mtime", error = ?e); + otel_warn!("tls.cert.mtime_check_failed", error = ?e, message = "Failed to check cert mtime"); return false; } }; @@ -534,7 +535,7 @@ impl LazyReloadableCertResolver { let current_key_mtime = match get_mtime(&self.key_path) { Ok(m) => m, Err(e) => { - otel_warn!("Failed to check key mtime", error = ?e); + otel_warn!("tls.key.mtime_check_failed", error = ?e, message = "Failed to check key mtime"); return false; } }; @@ -574,15 +575,17 @@ impl LazyReloadableCertResolver { cert_mtime.store(current_cert_mtime, Ordering::Relaxed); key_mtime.store(current_key_mtime, Ordering::Relaxed); otel_info!( - "TLS certificate reloaded asynchronously", + "tls.cert_reloaded", cert = ?cert_path, key = ?key_path, + message = "TLS certificate reloaded asynchronously", ); } Err(e) => { otel_error!( - "Failed to reload cert asynchronously (keeping current)", + "tls.cert_reload_failed", error = ?e, + message = "Failed to reload cert asynchronously (keeping current)", ); } } @@ -666,13 +669,15 @@ impl CaWatcherState { fn handle_event(&self, res: Result) { match res { Ok(event) => self.process_event(event), - Err(e) => otel_warn!("File watcher error", error = ?e), + Err(e) => { + otel_warn!("tls.file_watcher.error", error = ?e, message = "File watcher error") + } } } /// Process a file system event, potentially triggering a reload. fn process_event(&self, event: Event) { - otel_debug!("tls.file_watcher.event", event = ?event, "File watcher event"); + otel_debug!("tls.file_watcher.event", event = ?event, message = "File watcher event"); // Filter out irrelevant event types early (before expensive path checks) if matches!(event.kind, notify::EventKind::Access(_)) { @@ -685,7 +690,7 @@ impl CaWatcherState { otel_debug!( "tls.file_watcher.match", - "Event matches our CA file, proceeding with reload check" + message = "Event matches our CA file, proceeding with reload check" ); // Small delay to allow filesystem operations to complete (e.g., atomic renames). @@ -719,7 +724,7 @@ impl CaWatcherState { "tls.file_watcher.no_match", event_paths = ?event.paths, watched_path = ?self.watched_path, - "Event not for our file" + message = "Event not for our file" ); } @@ -732,7 +737,7 @@ impl CaWatcherState { let current_identity = match get_file_identity(&self.reload_path) { Ok(id) => id, Err(e) => { - otel_debug!("tls.file_watcher.identity_error", error = ?e, "Failed to get file identity, skipping reload"); + otel_debug!("tls.file_watcher.identity_error", error = ?e, message = "Failed to get file identity, skipping reload"); return false; } }; @@ -741,7 +746,7 @@ impl CaWatcherState { if current_identity == prev_identity { otel_debug!( "tls.file_watcher.identity_unchanged", - "File identity unchanged, skipping reload" + message = "File identity unchanged, skipping reload" ); return false; } @@ -749,7 +754,7 @@ impl CaWatcherState { "tls.file_watcher.identity_changed", prev_identity = prev_identity, current_identity = current_identity, - "File identity changed" + message = "File identity changed" ); // Check debounce window @@ -758,7 +763,7 @@ impl CaWatcherState { if now.saturating_sub(last) < CA_RELOAD_DEBOUNCE_SECS { otel_debug!( "tls.file_watcher.debounce", - "Debouncing CA file change event" + message = "Debouncing CA file change event" ); return false; } @@ -771,7 +776,7 @@ impl CaWatcherState { { otel_debug!( "tls.file_watcher.reload_in_progress", - "CA reload already in progress, skipping" + message = "CA reload already in progress, skipping" ); return false; } @@ -782,8 +787,9 @@ impl CaWatcherState { /// Perform the actual CA certificate reload. fn perform_reload(&self) { otel_info!( - "CA certificate file changed, reloading", + "tls.file_watcher.reload_start", path = ?self.reload_path, + message = "CA certificate file changed, reloading" ); // Note: There's a theoretical TOCTOU between getting identity and reading the file. @@ -798,12 +804,16 @@ impl CaWatcherState { self.last_identity .store(current_identity, Ordering::Relaxed); self.last_reload.store(now, Ordering::Relaxed); - otel_info!("Successfully reloaded client CA certificates"); + otel_info!( + "tls.file_watcher.reload_success", + message = "Successfully reloaded client CA certificates" + ); } Err(e) => { otel_error!( - "Failed to reload CA certificates (keeping previous)", + "tls.file_watcher.reload_failed", error = ?e, + message = "Failed to reload CA certificates (keeping previous)", ); } } @@ -915,7 +925,7 @@ impl ReloadableClientCaVerifier { otel_debug!( "tls.ca.initial_load", size_bytes = ca_pem.len(), - "Initial CA PEM size" + message = "Initial CA PEM size" ); let verifier = build_webpki_verifier(&ca_pem, include_system_cas)?; @@ -1028,9 +1038,10 @@ impl ReloadableClientCaVerifier { .map_err(io::Error::other)?; otel_info!( - "File watcher set up for CA certificates", + "tls.file_watcher.setup", ca_file = ?ca_file_path, watching_parent = ?parent_dir, + message = "File watcher set up for CA certificates" ); Ok(Box::new(watcher)) @@ -1066,19 +1077,21 @@ impl ReloadableClientCaVerifier { } otel_info!( - "CA certificate file changed (polling), reloading", + "tls.poll_watcher.event", ca_path = ?ca_path, + message = "CA certificate file changed (polling), reloading" ); match reload_ca_verifier(&ca_path, include_system_cas) { Ok(new_verifier) => { inner.store(Arc::new(new_verifier)); - otel_info!("Successfully reloaded client CA certificates"); + otel_info!("tls.poll_watcher.reload_success", message = "Successfully reloaded client CA certificates"); } Err(e) => { otel_error!( - "Failed to reload CA certificates (keeping previous)", + "tls.poll_watcher.reload_failed", error = ?e, + message = "Failed to reload CA certificates (keeping previous)" ); } } @@ -1086,7 +1099,7 @@ impl ReloadableClientCaVerifier { is_reloading.store(false, Ordering::Release); } Err(e) => { - otel_warn!("Poll watcher error", error = ?e); + otel_warn!("tls.poll_watcher.error", error = ?e, message = "Poll watcher error"); } }, config, @@ -1098,9 +1111,10 @@ impl ReloadableClientCaVerifier { .map_err(io::Error::other)?; otel_info!( - "Poll watcher set up for CA certificates", + "tls.poll_watcher.setup", ca_file = ?ca_file_path, interval = ?poll_interval, + message = "Poll watcher set up for CA certificates", ); Ok(Box::new(watcher)) @@ -1191,11 +1205,11 @@ fn build_webpki_verifier( if include_system_cas { let system_certs = load_native_certs(); for error in &system_certs.errors { - otel_warn!("Error loading native cert", error = ?error); + otel_warn!("tls.native_cert.load_error", error = ?error); } for cert in system_certs.certs { if let Err(e) = roots.add(cert) { - otel_warn!("Failed to add system", error = ?e); + otel_warn!("tls.native_cert.add_error", error = ?e); } } } @@ -1221,7 +1235,7 @@ fn build_webpki_verifier( otel_debug!( "tls.ca.verifier_built", count = count, - "Built verifier with CA certificates" + message = "Built verifier with CA certificates" ); WebPkiClientVerifier::builder(roots.into()) @@ -1238,7 +1252,7 @@ fn reload_ca_verifier( otel_debug!( "tls.ca.reloaded", size_bytes = ca_pem.len(), - "Reloaded CA PEM" + message = "Reloaded CA PEM" ); build_webpki_verifier(&ca_pem, include_system_cas) } @@ -1427,7 +1441,7 @@ async fn build_client_auth( // No client auth configured otel_debug!( "tls.mtls.disabled", - "No client CA configured, disabling client authentication" + message = "No client CA configured, disabling client authentication" ); return Ok(builder.with_no_client_auth()); } @@ -1437,15 +1451,17 @@ async fn build_client_auth( // File-based CA configuration let verifier = if watch_enabled { otel_info!( - "Configuring mTLS with file watching for CA certificates", + "tls.file_watcher.setup", ca_file = ?ca_file, + message = "Configuring mTLS with file watching for CA certificates" ); ReloadableClientCaVerifier::new_with_file_watch(ca_file.clone(), include_system_cas)? } else { otel_info!( - "Configuring mTLS with polling for CA certificates", + "tls.poll_watcher.setup", ca_file = ?ca_file, interval = ?check_interval, + message = "Configuring mTLS with polling for CA certificates" ); ReloadableClientCaVerifier::new_with_polling( ca_file.clone(), @@ -1457,7 +1473,10 @@ async fn build_client_auth( Ok(builder.with_client_cert_verifier(verifier)) } else if let Some(ca_pem) = &config.client_ca_pem { // PEM-based (static) CA configuration - otel_info!("Configuring mTLS with static PEM CA certificates"); + otel_info!( + "tls.mtls.configure_static_pem", + message = "Configuring mTLS with static PEM CA certificates" + ); // For PEM-based, we need to combine with system CAs if requested let mut combined_pem = Vec::new(); @@ -1476,7 +1495,10 @@ async fn build_client_auth( Ok(builder.with_client_cert_verifier(verifier)) } else if include_system_cas { // Only system CAs (no user-provided CA) - otel_info!("Configuring mTLS with system CA certificates only"); + otel_info!( + "tls.mtls.configure_system_cas", + message = "Configuring mTLS with system CA certificates only" + ); let cert_res = tokio::task::spawn_blocking(load_native_certs) .await