Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions rust/otap-dataflow/crates/otap/src/console_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,34 @@ impl ConsoleExporter {
self.formatter.print_logs_data(&logs_view).await;
}
Err(e) => {
otel_error!("Failed to create OTLP logs view", error = ?e);
otel_error!("console.logs_view.otlp_create_failed", error = ?e, message = "Failed to create OTLP logs view");
}
},
OtapPayload::OtapArrowRecords(records) => match OtapLogsView::try_from(records) {
Ok(logs_view) => {
self.formatter.print_logs_data(&logs_view).await;
}
Err(e) => {
otel_error!("Failed to create OTAP logs view", error = ?e);
otel_error!("console.logs_view.otap_create_failed", error = ?e, message = "Failed to create OTAP logs view");
}
},
}
}

async fn export_traces(&self, _payload: &OtapPayload) {
// TODO: Implement traces formatting.
otel_error!("Traces formatting not yet implemented");
otel_error!(
"console.traces.not_implemented",
message = "Traces formatting not yet implemented"
);
}

async fn export_metrics(&self, _payload: &OtapPayload) {
// TODO: Implement metrics formatting.
otel_error!("Metrics formatting not yet implemented");
otel_error!(
"console.metrics.not_implemented",
message = "Metrics formatting not yet implemented"
);
}
}

Expand Down Expand Up @@ -209,7 +215,7 @@ impl HierarchicalFormatter {
use tokio::io::AsyncWriteExt;

if let Err(err) = tokio::io::stdout().write_all(&output).await {
otel_error!("could not write to console", error = ?err);
otel_error!("console.write_failed", error = ?err, message = "Could not write to console");
}
}

Expand Down
22 changes: 14 additions & 8 deletions rust/otap-dataflow/crates/otap/src/otap_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ where
{
let batch_id = batch.batch_id;
let batch = consumer.consume_bar(&mut batch).map_err(|e| {
otel_error!("Error decoding OTAP Batch. Closing stream", error = ?e);
otel_error!("otap.batch.decode_failed", error = ?e, message = "Error decoding OTAP Batch. Closing stream");
})?;

let batch = from_record_messages::<T>(batch);
Expand All @@ -305,15 +305,18 @@ where
match guard_result {
Ok(mut guard) => guard.allocate(|| oneshot::channel()),
Err(_) => {
otel_error!("Mutex poisoned");
otel_error!("otap.mutex.poisoned", message = "Mutex poisoned");
return Err(());
}
}
}; // MutexGuard is dropped here

let (key, rx) = match allocation_result {
None => {
otel_error!("Too many concurrent requests");
otel_error!(
"otap.request.concurrency_limit",
message = "Too many concurrent requests"
);

// Send backpressure response
tx.send(Ok(BatchStatus {
Expand All @@ -326,7 +329,7 @@ where
}))
.await
.map_err(|e| {
otel_error!("Error sending BatchStatus response", error = ?e);
otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response");
})?;

return Ok(());
Expand All @@ -352,7 +355,7 @@ where
{
Ok(_) => {}
Err(e) => {
otel_error!("Failed to send to pipeline", error = ?e);
otel_error!("otap.pipeline.send_failed", error = ?e, message = "Failed to send to pipeline");
return Err(());
}
};
Expand All @@ -378,13 +381,16 @@ where
}))
.await
.map_err(|e| {
otel_error!("Error sending BatchStatus response", error = ?e);
otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response");
})?;

return Ok(());
}
Err(_) => {
otel_error!("Response channel closed unexpectedly");
otel_error!(
"otap.response.channel_closed",
message = "Response channel closed unexpectedly"
);
return Err(());
}
}
Expand All @@ -397,7 +403,7 @@ where
}))
.await
.map_err(|e| {
otel_error!("Error sending BatchStatus response", error = ?e);
otel_error!("otap.response.send_failed", error = ?e, message = "Error sending BatchStatus response");
})
}

Expand Down
Loading