Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use crate::bson::RawDocumentBuf;
use crate::bson::{doc, RawBsonRef, RawDocument, Timestamp};
#[cfg(feature = "in-use-encryption")]
use futures_core::future::BoxFuture;
#[cfg(feature = "opentelemetry")]
use opentelemetry::context::FutureExt;
use serde::de::DeserializeOwned;
use std::sync::LazyLock;

Expand All @@ -16,8 +14,6 @@ use std::{
};

use super::{options::ServerAddress, session::TransactionState, Client, ClientSession};
#[cfg(not(feature = "opentelemetry"))]
use crate::otel::OtelFutureStub as _;
use crate::{
bson::Document,
change_stream::{
Expand Down Expand Up @@ -105,19 +101,30 @@ impl Client {
.map(|details| details.output)
}

#[cfg(not(feature = "opentelemetry"))]
async fn execute_operation_with_details<T: Operation>(
&self,
op: &mut T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<ExecutionDetails<T>> {
self.execute_operation_with_details_inner(op, session.into())
.await
}

#[cfg(feature = "opentelemetry")]
async fn execute_operation_with_details<T: Operation>(
&self,
op: &mut T,
session: impl Into<Option<&mut ClientSession>>,
) -> Result<ExecutionDetails<T>> {
use crate::otel::FutureExt as _;

let session = session.into();
#[cfg(feature = "opentelemetry")]
let span = self.start_operation_span(op, session.as_deref());
let inner = self.execute_operation_with_details_inner(op, session);
#[cfg(feature = "opentelemetry")]
let inner = inner.with_context(span.context.clone());
let result = inner.await;
#[cfg(feature = "opentelemetry")]
let result = self
.execute_operation_with_details_inner(op, session)
.with_span(&span)
.await;
span.record_error(&result);

result
Expand Down Expand Up @@ -176,13 +183,7 @@ impl Client {
}
}

Box::pin(async {
self.execute_operation_with_retry(op, session)
.with_current_context()
.await
})
.with_current_context()
.await
Box::pin(async { self.execute_operation_with_retry(op, session).await }).await
}

/// Execute the given operation, returning the cursor created by the operation.
Expand Down Expand Up @@ -436,7 +437,6 @@ impl Client {
retryability,
effective_criteria,
)
.with_current_context()
.await
{
Ok(output) => ExecutionDetails {
Expand Down
5 changes: 0 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ mod index;
mod operation;
#[cfg(feature = "opentelemetry")]
pub mod otel;
#[cfg(not(feature = "opentelemetry"))]
mod otel_stub;
pub mod results;
pub(crate) mod runtime;
mod sdam;
Expand All @@ -73,9 +71,6 @@ pub use bson2 as bson;
#[cfg(feature = "bson-3")]
pub use bson3 as bson;

#[cfg(not(feature = "opentelemetry"))]
pub(crate) use otel_stub as otel;

#[cfg(feature = "in-use-encryption")]
pub use crate::client::csfle::client_encryption;
pub use crate::{
Expand Down
16 changes: 14 additions & 2 deletions src/otel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! Support for OpenTelemetry.

use std::sync::{Arc, LazyLock};
use std::{
future::Future,
sync::{Arc, LazyLock},
};

use derive_where::derive_where;

Expand Down Expand Up @@ -229,7 +232,7 @@ impl Client {
}

pub(crate) struct OpSpan {
pub(crate) context: Context,
context: Context,
enabled: bool,
}

Expand Down Expand Up @@ -443,3 +446,12 @@ impl<'a> From<&'a AggregateTarget> for OperationTarget<'a> {
}
}
}

pub(crate) trait FutureExt: Future + Sized {
fn with_span(self, span: &OpSpan) -> impl Future<Output = Self::Output> {
use opentelemetry::context::FutureExt;
self.with_context(span.context.clone())
}
}

impl<T: Future> FutureExt for T {}
7 changes: 0 additions & 7 deletions src/otel_stub.rs

This file was deleted.