Skip to content

Commit

Permalink
feat: Refine runtime trait (#2641)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Feb 27, 2025
1 parent 367e484 commit fb74565
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 70 deletions.
6 changes: 4 additions & 2 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

## vNext

- **Breaking**: The `Runtime` trait has been simplified and refined. See the [#2641](https://github.com/open-telemetry/opentelemetry-rust/pull/2641)
for the changes.
- Calls to `MeterProviderBuilder::with_resource`, `TracerProviderBuilder::with_resource`,
`LoggerProviderBuilder::with_resource` are now additive ([#2677](https://github.com/open-telemetry/opentelemetry-rust/pull/2677)).
- Moved `ExportError` trait from `opentelemetry::trace::ExportError` to `opentelemetry_sdk::export::ExportError`
- Moved `TraceError` enum from `opentelemetry::trace::TraceError` to `opentelemetry_sdk::trace::TraceError`
- Moved `TraceResult` type alias from `opentelemetry::trace::TraceResult` to `opentelemetry_sdk::trace::TraceResult`
- *Breaking*: Make `force_flush()` in `PushMetricExporter` synchronous
- **Breaking Change:** Updated the `SpanExporter` trait method signature:
- **Breaking**: Make `force_flush()` in `PushMetricExporter` synchronous
- **Breaking**: Updated the `SpanExporter` trait method signature:

```rust
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult>;
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{
};

use super::{BatchConfig, LogProcessor};
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use futures_channel::oneshot;
use futures_util::{
future::{self, Either},
Expand Down Expand Up @@ -126,13 +126,13 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
let inner_runtime = runtime.clone();

// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));

let timeout_runtime = inner_runtime.clone();
let mut logs = Vec::new();
let mut messages = Box::pin(stream::select(message_receiver, ticker));
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
}
}
}
}));
});
// Return batch processor with link to worker
BatchLogProcessor {
message_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use futures_util::{
};
use opentelemetry::{otel_debug, otel_error};

use crate::runtime::Runtime;
use crate::runtime::{to_interval_stream, Runtime};
use crate::{
error::{OTelSdkError, OTelSdkResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Expand Down Expand Up @@ -109,9 +109,8 @@ where
let worker = move |reader: &PeriodicReader<E>| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
let ticker = runtime
.interval(self.interval)
self.runtime.spawn(async move {
let ticker = to_interval_stream(runtime.clone(), self.interval)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
Expand All @@ -126,7 +125,7 @@ where
}
.run(messages)
.await
}));
});
};

otel_debug!(
Expand Down
101 changes: 52 additions & 49 deletions opentelemetry-sdk/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,57 @@
//! [Tokio]: https://crates.io/crates/tokio
//! [async-std]: https://crates.io/crates/async-std
use futures_util::{future::BoxFuture, stream::Stream};
use futures_util::stream::{unfold, Stream};
use std::{fmt::Debug, future::Future, time::Duration};
use thiserror::Error;

/// A runtime is an abstraction of an async runtime like [Tokio] or [async-std]. It allows
/// OpenTelemetry to work with any current and hopefully future runtime implementation.
/// OpenTelemetry to work with any current and hopefully future runtime implementations.
///
/// [Tokio]: https://crates.io/crates/tokio
/// [async-std]: https://crates.io/crates/async-std
///
/// # Note
///
/// OpenTelemetry expects a *multithreaded* runtime because its types can move across threads.
/// For this reason, this trait requires the `Send` and `Sync` bounds. Single-threaded runtimes
/// can implement this trait in a way that spawns the tasks on the same thread as the calling code.
#[cfg(feature = "experimental_async_runtime")]
pub trait Runtime: Clone + Send + Sync + 'static {
/// A future stream, which returns items in a previously specified interval. The item type is
/// not important.
type Interval: Stream + Send;

/// A future, which resolves after a previously specified amount of time. The output type is
/// not important.
type Delay: Future + Send + Unpin;

/// Create a [futures_util::stream::Stream], which returns a new item every
/// [std::time::Duration].
fn interval(&self, duration: Duration) -> Self::Interval;

/// Spawn a new task or thread, which executes the given future.
///
/// # Note
///
/// This is mainly used to run batch span processing in the background. Note, that the function
/// does not return a handle. OpenTelemetry will use a different way to wait for the future to
/// finish when TracerProvider gets shutdown. At the moment this happens by blocking the
/// finish when the caller shuts down.
///
/// At the moment, the shutdown happens by blocking the
/// current thread. This means runtime implementations need to make sure they can still execute
/// the given future even if the main thread is blocked.
fn spawn(&self, future: BoxFuture<'static, ()>);
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static;

/// Return a future that resolves after the specified [Duration].
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static;
}

/// Return a new future, which resolves after the specified [std::time::Duration].
fn delay(&self, duration: Duration) -> Self::Delay;
/// Uses the given runtime to produce an interval stream.
#[cfg(feature = "experimental_async_runtime")]
#[allow(dead_code)]
pub(crate) fn to_interval_stream<T: Runtime>(
runtime: T,
interval: Duration,
) -> impl Stream<Item = ()> {
unfold((), move |_| {
let runtime_cloned = runtime.clone();

async move {
runtime_cloned.delay(interval).await;
Some(((), ()))
}
})
}

/// Runtime implementation, which works with Tokio's multi thread runtime.
Expand All @@ -59,21 +74,17 @@ pub struct Tokio;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-tokio")))
)]
impl Runtime for Tokio {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
// we don't have to await on the returned future to execute
let _ = tokio::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand Down Expand Up @@ -104,14 +115,10 @@ pub struct TokioCurrentThread;
)))
)]
impl Runtime for TokioCurrentThread {
type Interval = tokio_stream::wrappers::IntervalStream;
type Delay = ::std::pin::Pin<Box<tokio::time::Sleep>>;

fn interval(&self, duration: Duration) -> Self::Interval {
crate::util::tokio_interval_stream(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
// We cannot force push tracing in current thread tokio scheduler because we rely on
// BatchSpanProcessor to export spans in a background task, meanwhile we need to block the
// shutdown function so that the runtime will not finish the blocked task and kill any
Expand All @@ -127,8 +134,8 @@ impl Runtime for TokioCurrentThread {
});
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(tokio::time::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
tokio::time::sleep(duration)
}
}

Expand All @@ -147,20 +154,16 @@ pub struct AsyncStd;
doc(cfg(all(feature = "experimental_async_runtime", feature = "rt-async-std")))
)]
impl Runtime for AsyncStd {
type Interval = async_std::stream::Interval;
type Delay = BoxFuture<'static, ()>;

fn interval(&self, duration: Duration) -> Self::Interval {
async_std::stream::interval(duration)
}

fn spawn(&self, future: BoxFuture<'static, ()>) {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
#[allow(clippy::let_underscore_future)]
let _ = async_std::task::spawn(future);
}

fn delay(&self, duration: Duration) -> Self::Delay {
Box::pin(async_std::task::sleep(duration))
fn delay(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
async_std::task::sleep(duration)
}
}

Expand Down Expand Up @@ -193,7 +196,7 @@ pub enum TrySendError {
/// Send failed due to the channel being closed.
#[error("cannot send message to batch processor as the channel is closed")]
ChannelClosed,
/// Any other send error that isnt covered above.
/// Any other send error that isn't covered above.
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
}
Expand Down
9 changes: 5 additions & 4 deletions opentelemetry-sdk/src/trace/sampler/jaeger_remote/sampler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::RuntimeChannel;
use crate::runtime::{to_interval_stream, RuntimeChannel};
use crate::trace::error::TraceError;
use crate::trace::sampler::jaeger_remote::remote::SamplingStrategyResponse;
use crate::trace::sampler::jaeger_remote::sampling_strategy::Inner;
Expand Down Expand Up @@ -190,8 +190,9 @@ impl JaegerRemoteSampler {
C: HttpClient + 'static,
{
// todo: review if we need 'static here
let interval = runtime.interval(update_timeout);
runtime.spawn(Box::pin(async move {
let interval = to_interval_stream(runtime.clone(), update_timeout);

runtime.spawn(async move {
// either update or shutdown
let mut update = Box::pin(stream::select(
shutdown.map(|_| false),
Expand All @@ -217,7 +218,7 @@ impl JaegerRemoteSampler {
break;
}
}
}));
});
}

async fn request_new_strategy<C>(
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/trace/span_processor_with_async_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::{OTelSdkError, OTelSdkResult};
use crate::resource::Resource;
use crate::runtime::{RuntimeChannel, TrySend};
use crate::runtime::{to_interval_stream, RuntimeChannel, TrySend};
use crate::trace::BatchConfig;
use crate::trace::Span;
use crate::trace::SpanProcessor;
Expand Down Expand Up @@ -309,6 +309,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
let export = self.exporter.export(self.spans.split_off(0));
let timeout = self.runtime.delay(self.config.max_export_timeout);
let time_out = self.config.max_export_timeout;

pin_mut!(export);
pin_mut!(timeout);

Expand Down Expand Up @@ -353,11 +354,10 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {

let inner_runtime = runtime.clone();
// Spawn worker process via user-defined spawn function.
runtime.spawn(Box::pin(async move {
runtime.spawn(async move {
// Timer will take a reference to the current runtime, so its important we do this within the
// runtime.spawn()
let ticker = inner_runtime
.interval(config.scheduled_delay)
let ticker = to_interval_stream(inner_runtime.clone(), config.scheduled_delay)
.skip(1) // The ticker is fired immediately, so we should skip the first one to align with the interval.
.map(|_| BatchMessage::Flush(None));
let timeout_runtime = inner_runtime.clone();
Expand All @@ -372,7 +372,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
};

processor.run(messages).await
}));
});

// Return batch processor with link to worker
BatchSpanProcessor {
Expand Down

0 comments on commit fb74565

Please sign in to comment.