diff --git a/rust/otap-dataflow/crates/otap/src/console_exporter.rs b/rust/otap-dataflow/crates/otap/src/console_exporter.rs index b4cb55868f..a5cbac8f94 100644 --- a/rust/otap-dataflow/crates/otap/src/console_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/console_exporter.rs @@ -131,7 +131,7 @@ impl ConsoleExporter { self.formatter.print_logs_data(&logs_view).await; } Err(e) => { - otel_error!("Failed to create OTLP logs view", error = ?e); + otel_error!("console.logs_view.otlp_create_failed", error = ?e, message = "Failed to create OTLP logs view"); } }, OtapPayload::OtapArrowRecords(records) => match OtapLogsView::try_from(records) { @@ -139,7 +139,7 @@ impl ConsoleExporter { self.formatter.print_logs_data(&logs_view).await; } Err(e) => { - otel_error!("Failed to create OTAP logs view", error = ?e); + otel_error!("console.logs_view.otap_create_failed", error = ?e, message = "Failed to create OTAP logs view"); } }, } @@ -147,12 +147,18 @@ impl ConsoleExporter { async fn export_traces(&self, _payload: &OtapPayload) { // TODO: Implement traces formatting. - otel_error!("Traces formatting not yet implemented"); + otel_error!( + "console.traces.not_implemented", + message = "Traces formatting not yet implemented" + ); } async fn export_metrics(&self, _payload: &OtapPayload) { // TODO: Implement metrics formatting. - otel_error!("Metrics formatting not yet implemented"); + otel_error!( + "console.metrics.not_implemented", + message = "Metrics formatting not yet implemented" + ); } } @@ -209,7 +215,7 @@ impl HierarchicalFormatter { use tokio::io::AsyncWriteExt; if let Err(err) = tokio::io::stdout().write_all(&output).await { - otel_error!("could not write to console", error = ?err); + otel_error!("console.write_failed", error = ?err, message = "Could not write to console"); } } diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc.rs index b5d9253979..e73c6af6c2 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc.rs @@ -290,7 +290,7 @@ where { let batch_id = batch.batch_id; let batch = consumer.consume_bar(&mut batch).map_err(|e| { - otel_error!("Error decoding OTAP Batch. Closing stream", error = ?e); + otel_error!("otap.batch.decode_failed", error = ?e, message = "Error decoding OTAP Batch. Closing stream"); })?; let batch = from_record_messages::(batch); @@ -305,7 +305,7 @@ where match guard_result { Ok(mut guard) => guard.allocate(|| oneshot::channel()), Err(_) => { - otel_error!("Mutex poisoned"); + otel_error!("otap.mutex.poisoned", message = "Mutex poisoned"); return Err(()); } } @@ -313,7 +313,10 @@ where let (key, rx) = match allocation_result { None => { - otel_error!("Too many concurrent requests"); + otel_error!( + "otap.request.concurrency_limit", + message = "Too many concurrent requests" + ); // Send backpressure response tx.send(Ok(BatchStatus { @@ -326,7 +329,7 @@ where })) .await .map_err(|e| { - otel_error!("Error sending BatchStatus response", error = ?e); + otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response"); })?; return Ok(()); @@ -352,7 +355,7 @@ where { Ok(_) => {} Err(e) => { - otel_error!("Failed to send to pipeline", error = ?e); + otel_error!("otap.pipeline.send_failed", error = ?e, message = "Failed to send to pipeline"); return Err(()); } }; @@ -378,13 +381,16 @@ where })) .await .map_err(|e| { - otel_error!("Error sending BatchStatus response", error = ?e); + otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response"); })?; return Ok(()); } Err(_) => { - otel_error!("Response channel closed unexpectedly"); + otel_error!( + "otap.response.channel_closed", + message = "Response channel closed unexpectedly" + ); return Err(()); } } @@ -397,7 +403,7 @@ where })) .await .map_err(|e| { - otel_error!("Error sending BatchStatus response", error = ?e); + otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response"); }) }