Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/otap-dataflow/crates/engine/src/pipeline_ctrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ impl<PData> PipelineCtrlMsgManager<PData> {
otel_debug!(
"pipeline.draining.ignored_start_timer",
node_id = node_id,
"Ignoring StartTimer during shutdown draining"
);
} else {
self.tick_timers.start(node_id, duration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ impl RecordsetKqlProcessor {
otap_df_telemetry::otel_debug!(
"processor.processing_logs",
processor = "recordset_kql",
message = "Processing KQL query",
input_items = input_items,
"Processing KQL query"
);
self.process_logs(bytes, signal)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ impl RecordsetKqlProcessor {
"processor.failure",
processor = "recordset_kql",
input_items = input_items,
message = message,
error = message,
);

effect_handler
Expand Down
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/otap/src/fake_data_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,15 +354,15 @@ impl local::Receiver<OtapPdata> for FakeGeneratorReceiver {
otel_debug!(
"rate_limit.sleep",
sleep_duration_ms = remaining_time.as_millis() as u64,
message = "Sleeping to maintain configured signal rate"
"Sleeping to maintain configured signal rate"
);
sleep(remaining_time).await;
}
// ToDo: Handle negative time, not able to keep up with specified rate limit
} else {
otel_debug!(
"rate_limit.uncapped",
message = "Rate limiting disabled, continuing immediately"
"Rate limiting disabled, continuing immediately"
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/src/otap_grpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ pub(crate) async fn connect_tcp_stream_with_proxy_config(
);
ProxyError::ProxyConnectionFailed(e)
})?;
otel_debug!("proxy.connected");
otel_debug!("proxy.connected", host = proxy_host, port = proxy_port);

// Apply socket options to the proxy connection
let stream = apply_socket_options(
Expand Down
7 changes: 2 additions & 5 deletions rust/otap-dataflow/crates/otap/src/otlp_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,7 @@ impl Exporter<OtapPdata> for OTLPExporter {
msg
} else if inflight_exports.is_empty() {
let msg = msg_chan.recv().await?;
otel_debug!(
"exporter.receive",
message = "Received message from pipeline"
);
otel_debug!("exporter.receive", "Received message from pipeline");
msg
} else {
let completion_fut = inflight_exports.next_completion().fuse();
Expand All @@ -220,7 +217,7 @@ impl Exporter<OtapPdata> for OTLPExporter {
}
msg = recv_fut => {
let msg = msg?;
otel_debug!("exporter.receive", message = "Received message from pipeline");
otel_debug!("exporter.receive", "Received message from pipeline");
msg
},
}
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/src/otlp_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ pub async fn serve(
tokio::select! {
res = &mut conn => {
if let Err(err) = res {
otap_df_telemetry::otel_debug!("HttpConnectionError", error = err.to_string());
otap_df_telemetry::otel_debug!("http.connection_error", error = err.to_string());
}
},
_ = shutdown.cancelled() => {
Expand Down
64 changes: 51 additions & 13 deletions rust/otap-dataflow/crates/otap/src/tls_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ async fn add_system_trust_anchors_if_enabled(
let mut store = RootCertStore::empty();
// Best-effort: accept that some system certs might not parse.
let (added, ignored) = store.add_parsable_certificates(roots);
otel_debug!("Loaded system CA certificates", added, ignored,);
otel_debug!(
"loaded.system.ca.certificates",
added = added,
ignored = ignored,
"Loaded system CA certificates"
);

Ok(tls.trust_anchors(store.roots))
}
Expand Down Expand Up @@ -667,7 +672,7 @@ impl CaWatcherState {

/// Process a file system event, potentially triggering a reload.
fn process_event(&self, event: Event) {
otel_debug!("File watcher event", event = ?event);
otel_debug!("tls.file_watcher.event", event = ?event, "File watcher event");

// Filter out irrelevant event types early (before expensive path checks)
if matches!(event.kind, notify::EventKind::Access(_)) {
Expand All @@ -678,7 +683,10 @@ impl CaWatcherState {
return;
}

otel_debug!("Event matches our CA file, proceeding with reload check");
otel_debug!(
"tls.file_watcher.match",
"Event matches our CA file, proceeding with reload check"
);

// Small delay to allow filesystem operations to complete (e.g., atomic renames).
// This blocks the notify thread briefly, but is acceptable because:
Expand Down Expand Up @@ -708,9 +716,10 @@ impl CaWatcherState {

if !is_match {
otel_debug!(
"Event not for our file",
"tls.file_watcher.no_match",
event_paths = ?event.paths,
watched_path = ?self.watched_path,
"Event not for our file"
);
}

Expand All @@ -723,23 +732,34 @@ impl CaWatcherState {
let current_identity = match get_file_identity(&self.reload_path) {
Ok(id) => id,
Err(e) => {
otel_debug!("Failed to get file identity, skipping reload", error = ?e);
otel_debug!("tls.file_watcher.identity_error", error = ?e, "Failed to get file identity, skipping reload");
return false;
}
};

let prev_identity = self.last_identity.load(Ordering::Relaxed);
if current_identity == prev_identity {
otel_debug!("File identity unchanged, skipping reload");
otel_debug!(
"tls.file_watcher.identity_unchanged",
"File identity unchanged, skipping reload"
);
return false;
}
otel_debug!("File identity changed", prev_identity, current_identity);
otel_debug!(
"tls.file_watcher.identity_changed",
prev_identity = prev_identity,
current_identity = current_identity,
"File identity changed"
);

// Check debounce window
let now = current_timestamp();
let last = self.last_reload.load(Ordering::Relaxed);
if now.saturating_sub(last) < CA_RELOAD_DEBOUNCE_SECS {
otel_debug!("Debouncing CA file change event");
otel_debug!(
"tls.file_watcher.debounce",
"Debouncing CA file change event"
);
return false;
}

Expand All @@ -749,7 +769,10 @@ impl CaWatcherState {
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
otel_debug!("CA reload already in progress, skipping");
otel_debug!(
"tls.file_watcher.reload_in_progress",
"CA reload already in progress, skipping"
);
return false;
}

Expand Down Expand Up @@ -889,7 +912,11 @@ impl ReloadableClientCaVerifier {
) -> Result<Arc<Self>, io::Error> {
// Initial load
let ca_pem = read_file_with_limit_sync(&ca_file_path)?;
otel_debug!("Initial CA PEM size", size_bytes = ca_pem.len());
otel_debug!(
"tls.ca.initial_load",
size_bytes = ca_pem.len(),
"Initial CA PEM size"
);
let verifier = build_webpki_verifier(&ca_pem, include_system_cas)?;

let inner = Arc::new(ArcSwap::from_pointee(verifier));
Expand Down Expand Up @@ -1191,7 +1218,11 @@ fn build_webpki_verifier(
));
}

otel_debug!("Built verifier with CA certificates", count);
otel_debug!(
"tls.ca.verifier_built",
count = count,
"Built verifier with CA certificates"
);

WebPkiClientVerifier::builder(roots.into())
.build()
Expand All @@ -1204,7 +1235,11 @@ fn reload_ca_verifier(
include_system_cas: bool,
) -> Result<Arc<dyn ClientCertVerifier>, io::Error> {
let ca_pem = read_file_with_limit_sync(ca_path)?;
otel_debug!("Reloaded CA PEM", size_bytes = ca_pem.len());
otel_debug!(
"tls.ca.reloaded",
size_bytes = ca_pem.len(),
"Reloaded CA PEM"
);
build_webpki_verifier(&ca_pem, include_system_cas)
}

Expand Down Expand Up @@ -1390,7 +1425,10 @@ async fn build_client_auth(

if !has_ca_file && !has_ca_pem && !include_system_cas {
// No client auth configured
otel_debug!("No client CA configured, disabling client authentication");
otel_debug!(
"tls.mtls.disabled",
"No client CA configured, disabling client authentication"
);
return Ok(builder.with_no_client_auth());
}

Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/tests/mtls_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn test_mtls_client_cert_verification() {
match tls_acceptor.accept(stream).await {
Ok(_tls_stream) => true,
Err(e) => {
otel_debug!("Server handshake failed", error = ?e);
otel_debug!("handshake.failed", error = ?e, "Server handshake failed");
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn start_tls_logs_server() -> (
) {
if let Err(err) = rustls::crypto::ring::default_provider().install_default() {
// It's fine if the provider is already installed (e.g. by another test)
otel_debug!("rustls default provider installation failed in test", error = ?err);
otel_debug!("provider.installation.failed", error = ?err, "rustls default provider installation failed in test");
}

let (ca, ca_issuer) = new_ca();
Expand Down
97 changes: 94 additions & 3 deletions rust/otap-dataflow/crates/telemetry/src/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,79 @@ macro_rules! otel_warn {
/// # Example:
/// ```ignore
/// use otap_df_telemetry::otel_debug;
/// otel_debug!("processing.batch", batch_size = 100);
/// otel_debug!("processing.batch", batch_size = 100, "Processing batch of items");
/// otel_debug!("processing.done", count = 5); // no message, just fields
/// otel_debug!("processing.start"); // event name only
/// ```
#[macro_export]
macro_rules! otel_debug {
($name:expr $(, $($fields:tt)*)?) => {
$crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($($fields)*)? }, "");
// Name only
($name:expr) => {
$crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { }, "");
};

// Name + bare literal message (no fields)
($name:expr, $message:literal) => {
$crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { }, $message);
};

// With fields (and optional trailing message) - delegate to helper
($name:expr, $($rest:tt)+) => {
$crate::__otel_debug_impl!(@munch [$name] [] $($rest)+)
};
}

/// Internal helper macro for otel_debug! to parse fields with optional trailing message.
#[doc(hidden)]
#[macro_export]
macro_rules! __otel_debug_impl {
// Terminal: trailing literal message only
(@munch [$name:expr] [$($fields:tt)*] $message:literal) => {
$crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($fields)* }, $message);
};

// Terminal: no more tokens (no trailing message)
(@munch [$name:expr] [$($fields:tt)*]) => {
$crate::_private::debug!(name: $name, target: env!("CARGO_PKG_NAME"), { $($fields)* }, "");
};

// Munch: key = ?value, ... (debug format with comma, more tokens follow)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr, $($rest:tt)+) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,] $($rest)+)
};
// Munch: key = ?value, (debug format with trailing comma, nothing follows)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr,) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,])
};
// Munch: key = ?value (debug format, last field, no comma)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = ?$val:expr) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = ?$val,])
};

// Munch: key = %value, ... (display format with comma, more tokens follow)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr, $($rest:tt)+) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,] $($rest)+)
};
// Munch: key = %value, (display format with trailing comma, nothing follows)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr,) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,])
};
// Munch: key = %value (display format, last field, no comma)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = %$val:expr) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = %$val,])
};

// Munch: key = value, ... (regular with comma, more tokens follow)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr, $($rest:tt)+) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,] $($rest)+)
};
// Munch: key = value, (regular with trailing comma, nothing follows)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr,) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,])
};
// Munch: key = value (regular, last field, no comma)
(@munch [$name:expr] [$($acc:tt)*] $key:ident = $val:expr) => {
$crate::__otel_debug_impl!(@munch [$name] [$($acc)* $key = $val,])
};
}

Expand Down Expand Up @@ -155,4 +222,28 @@ mod tests {
raw_error!("raw error message", error = ?err);
raw_error!("simple error message");
}

#[test]
fn test_otel_debug() {
otel_debug!("debug.event");
}

#[test]
fn test_otel_debug_with_attributes() {
otel_debug!("debug.event.with_attributes", value = 42);
}

#[test]
fn test_otel_debug_with_message() {
otel_debug!("debug.event.with_message", "This is a debug message");
}

#[test]
fn test_otel_debug_with_message_and_attributes() {
otel_debug!(
"debug.event.with_message_and_attributes",
value = 42,
"This is a debug message"
);
}
}
Loading
Loading