Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {

// Start the metrics aggregation
let telemetry_registry = telemetry_system.registry();
let (internal_collector, otel_shutdown_handle) = telemetry_system.into_parts();
let internal_collector = telemetry_system.collector();
let metrics_agg_handle =
spawn_thread_local_task("metrics-aggregator", move |cancellation_token| {
internal_collector.run(cancellation_token)
Expand Down Expand Up @@ -256,7 +256,7 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
handle.shutdown_and_join()?;
}
obs_state_join_handle.shutdown_and_join()?;
otel_shutdown_handle.shutdown()?;
telemetry_system.shutdown()?;

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/engine/src/testing/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl<PData: Debug + 'static> TestPhase<PData> {
{
let metrics_reporter = self.metrics_system.reporter();
// Spawn metrics collection loop
let (collector, _) = self.metrics_system.into_parts();
let collector = self.metrics_system.collector();
let metrics_collection_handle = self.rt.spawn(collector.run_collection_loop());

// The entire scenario is run to completion before the validation phase
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/src/parquet_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ mod test {
// Run everything on the local task set, including the metrics collector
let _ = rt.block_on(local.run_until(async move {
// Start collector in background
let (collector, _) = metrics_system.into_parts();
let collector = metrics_system.collector();
let _handle = tokio::task::spawn_local(collector.run_collection_loop());

tokio::join!(
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/src/signal_type_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ mod tests {
let telemetry_registry = telemetry.registry();
let reporter = telemetry.reporter();
let collector_task = tokio::task::spawn_local(async move {
let (collector, _) = telemetry.into_parts();
let collector = telemetry.collector();
let _ = collector.run_collection_loop().await;
});
(telemetry_registry, reporter, collector_task)
Expand Down
20 changes: 12 additions & 8 deletions rust/otap-dataflow/crates/telemetry/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

//! Task periodically collecting the internal signals emitted by the engine and the pipelines.

use std::sync::Arc;

use otap_df_config::pipeline::service::telemetry::TelemetryConfig;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -44,7 +46,7 @@ impl InternalCollector {
/// Collects metrics from the reporting channel and aggregates them into the `registry`.
/// The collection runs indefinitely until the metrics channel is closed.
/// Returns the pipeline instance when the loop ends (or None if no pipeline was configured).
pub async fn run_collection_loop(self) -> Result<(), Error> {
pub async fn run_collection_loop(self: Arc<Self>) -> Result<(), Error> {
loop {
match self.metrics_receiver.recv_async().await {
Ok(metrics) => {
Expand All @@ -63,15 +65,17 @@ impl InternalCollector {
///
/// This method starts the internal signal collection loop and listens for a shutdown signal.
/// It returns when either the collection loop ends (Ok/Err) or the shutdown signal fires.
pub async fn run(self, cancel: CancellationToken) -> Result<(), Error> {
pub async fn run(self: Arc<Self>, cancel: CancellationToken) -> Result<(), Error> {
tokio::select! {
res = self.run_collection_loop() => {
res
}
biased;

_ = cancel.cancelled() => {
// Shutdown requested; cancel the collection loop by dropping its future.
Ok(())
}
res = self.run_collection_loop() => {
res
}
}
}
}
Expand Down Expand Up @@ -224,7 +228,7 @@ mod tests {

// Close immediately
drop(_reporter);
collector.run_collection_loop().await.unwrap();
Arc::new(collector).run_collection_loop().await.unwrap();
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool.

}

#[tokio::test]
Expand All @@ -239,7 +243,7 @@ mod tests {

let (collector, reporter) = InternalCollector::new(&config, telemetry_registry.clone());

let handle = tokio::spawn(async move { collector.run_collection_loop().await });
let handle = tokio::spawn(async move { Arc::new(collector).run_collection_loop().await });

// Send two snapshots that should be accumulated: [10,20] + [5,15] => [15,35]
reporter
Expand Down Expand Up @@ -289,7 +293,7 @@ mod tests {
let key = metric_set.key;

let (collector, reporter) = InternalCollector::new(&config, telemetry_registry.clone());
let handle = tokio::spawn(async move { collector.run_collection_loop().await });
let handle = tokio::spawn(async move { Arc::new(collector).run_collection_loop().await });

reporter
.report_snapshot(create_test_snapshot(
Expand Down
63 changes: 9 additions & 54 deletions rust/otap-dataflow/crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct InternalTelemetrySystem {
registry: TelemetryRegistryHandle,

/// The process collecting and processing internal signals.
collector: collector::InternalCollector,
collector: Arc<collector::InternalCollector>,

/// The process reporting metrics to an external system.
reporter: reporter::MetricsReporter,
Expand Down Expand Up @@ -131,7 +131,7 @@ impl InternalTelemetrySystem {

Ok(Self {
registry: telemetry_registry,
collector,
collector: Arc::new(collector),
reporter,
dispatcher,
meter_provider,
Expand All @@ -146,6 +146,12 @@ impl InternalTelemetrySystem {
self.registry.clone()
}

/// Returns a shareable/cloneable handle to the internal metrics collector.
#[must_use]
pub fn collector(&self) -> Arc<collector::InternalCollector> {
self.collector.clone()
}

/// Returns a shareable/cloneable handle to the metrics reporter.
#[must_use]
pub fn reporter(&self) -> reporter::MetricsReporter {
Expand All @@ -158,26 +164,8 @@ impl InternalTelemetrySystem {
self.dispatcher.clone()
}

/// Splits the system into the collection loop and a shutdown handle.
///
/// Returns a tuple of:
/// - The collector that runs the aggregation loop
/// - A shutdown handle for the OTel SDK providers
///
/// This allows the caller to run the collector in a separate task while
/// retaining the ability to shutdown the OTel providers.
#[must_use]
pub fn into_parts(self) -> (collector::InternalCollector, ShutdownHandle) {
let shutdown_handle = ShutdownHandle {
meter_provider: self.meter_provider,
logger_provider: self.logger_provider,
_otel_runtime: self._otel_runtime,
};
(self.collector, shutdown_handle)
}

/// Shuts down the OpenTelemetry SDK providers.
pub fn shutdown(&self) -> Result<(), Error> {
pub fn shutdown(self) -> Result<(), Error> {
let meter_shutdown_result = self.meter_provider.shutdown();
let logger_shutdown_result = self.logger_provider.shutdown();

Expand All @@ -197,36 +185,3 @@ impl Default for InternalTelemetrySystem {
Self::new(&TelemetryConfig::default()).expect("default telemetry config should be valid")
}
}

/// Handle for shutting down telemetry providers.
///
/// This handle is returned by [`InternalTelemetrySystem::into_parts`] and
/// holds ownership of the OTel SDK providers and runtime. Call [`shutdown`](Self::shutdown)
/// to gracefully shut down the providers before dropping.
pub struct ShutdownHandle {
/// OTel SDK meter provider for metrics export.
meter_provider: SdkMeterProvider,

/// OTel SDK logger provider for logs export.
logger_provider: SdkLoggerProvider,

/// Tokio runtime for OTLP exporters.
_otel_runtime: Option<tokio::runtime::Runtime>,
}

impl ShutdownHandle {
/// Shuts down the OpenTelemetry SDK providers.
pub fn shutdown(&self) -> Result<(), Error> {
let meter_shutdown_result = self.meter_provider.shutdown();
let logger_shutdown_result = self.logger_provider.shutdown();

if let Err(e) = meter_shutdown_result {
return Err(Error::ShutdownError(e.to_string()));
}

if let Err(e) = logger_shutdown_result {
return Err(Error::ShutdownError(e.to_string()));
}
Ok(())
}
}