diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 052473746f..37dbea8781 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -92,7 +92,7 @@ impl Controller { // Start the metrics aggregation let telemetry_registry = telemetry_system.registry(); - let (internal_collector, otel_shutdown_handle) = telemetry_system.into_parts(); + let internal_collector = telemetry_system.collector(); let metrics_agg_handle = spawn_thread_local_task("metrics-aggregator", move |cancellation_token| { internal_collector.run(cancellation_token) @@ -256,7 +256,7 @@ impl Controller { handle.shutdown_and_join()?; } obs_state_join_handle.shutdown_and_join()?; - otel_shutdown_handle.shutdown()?; + telemetry_system.shutdown()?; Ok(()) } diff --git a/rust/otap-dataflow/crates/engine/src/testing/processor.rs b/rust/otap-dataflow/crates/engine/src/testing/processor.rs index 477a475eaf..9f7679ff6a 100644 --- a/rust/otap-dataflow/crates/engine/src/testing/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/testing/processor.rs @@ -266,7 +266,7 @@ impl TestPhase { { let metrics_reporter = self.metrics_system.reporter(); // Spawn metrics collection loop - let (collector, _) = self.metrics_system.into_parts(); + let collector = self.metrics_system.collector(); let metrics_collection_handle = self.rt.spawn(collector.run_collection_loop()); // The entire scenario is run to completion before the validation phase diff --git a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs index 76595c2da7..5e5394eae7 100644 --- a/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs @@ -1507,7 +1507,7 @@ mod test { // Run everything on the local task set, including the metrics collector let _ = rt.block_on(local.run_until(async move { // Start collector in background - let (collector, _) = metrics_system.into_parts(); + let collector = metrics_system.collector(); let _handle = tokio::task::spawn_local(collector.run_collection_loop()); tokio::join!( diff --git a/rust/otap-dataflow/crates/otap/src/signal_type_router.rs b/rust/otap-dataflow/crates/otap/src/signal_type_router.rs index 0dd69fc4d7..8daa3ed238 100644 --- a/rust/otap-dataflow/crates/otap/src/signal_type_router.rs +++ b/rust/otap-dataflow/crates/otap/src/signal_type_router.rs @@ -397,7 +397,7 @@ mod tests { let telemetry_registry = telemetry.registry(); let reporter = telemetry.reporter(); let collector_task = tokio::task::spawn_local(async move { - let (collector, _) = telemetry.into_parts(); + let collector = telemetry.collector(); let _ = collector.run_collection_loop().await; }); (telemetry_registry, reporter, collector_task) diff --git a/rust/otap-dataflow/crates/telemetry/src/collector.rs b/rust/otap-dataflow/crates/telemetry/src/collector.rs index 3b59171b8f..d5c214df2e 100644 --- a/rust/otap-dataflow/crates/telemetry/src/collector.rs +++ b/rust/otap-dataflow/crates/telemetry/src/collector.rs @@ -3,6 +3,8 @@ //! Task periodically collecting the internal signals emitted by the engine and the pipelines. +use std::sync::Arc; + use otap_df_config::pipeline::service::telemetry::TelemetryConfig; use tokio_util::sync::CancellationToken; @@ -44,7 +46,7 @@ impl InternalCollector { /// Collects metrics from the reporting channel and aggregates them into the `registry`. /// The collection runs indefinitely until the metrics channel is closed. /// Returns the pipeline instance when the loop ends (or None if no pipeline was configured). - pub async fn run_collection_loop(self) -> Result<(), Error> { + pub async fn run_collection_loop(self: Arc) -> Result<(), Error> { loop { match self.metrics_receiver.recv_async().await { Ok(metrics) => { @@ -63,15 +65,17 @@ impl InternalCollector { /// /// This method starts the internal signal collection loop and listens for a shutdown signal. /// It returns when either the collection loop ends (Ok/Err) or the shutdown signal fires. - pub async fn run(self, cancel: CancellationToken) -> Result<(), Error> { + pub async fn run(self: Arc, cancel: CancellationToken) -> Result<(), Error> { tokio::select! { - res = self.run_collection_loop() => { - res - } + biased; + _ = cancel.cancelled() => { // Shutdown requested; cancel the collection loop by dropping its future. Ok(()) } + res = self.run_collection_loop() => { + res + } } } } @@ -224,7 +228,7 @@ mod tests { // Close immediately drop(_reporter); - collector.run_collection_loop().await.unwrap(); + Arc::new(collector).run_collection_loop().await.unwrap(); } #[tokio::test] @@ -239,7 +243,7 @@ mod tests { let (collector, reporter) = InternalCollector::new(&config, telemetry_registry.clone()); - let handle = tokio::spawn(async move { collector.run_collection_loop().await }); + let handle = tokio::spawn(async move { Arc::new(collector).run_collection_loop().await }); // Send two snapshots that should be accumulated: [10,20] + [5,15] => [15,35] reporter @@ -289,7 +293,7 @@ mod tests { let key = metric_set.key; let (collector, reporter) = InternalCollector::new(&config, telemetry_registry.clone()); - let handle = tokio::spawn(async move { collector.run_collection_loop().await }); + let handle = tokio::spawn(async move { Arc::new(collector).run_collection_loop().await }); reporter .report_snapshot(create_test_snapshot( diff --git a/rust/otap-dataflow/crates/telemetry/src/lib.rs b/rust/otap-dataflow/crates/telemetry/src/lib.rs index b78c867140..b1db218d3c 100644 --- a/rust/otap-dataflow/crates/telemetry/src/lib.rs +++ b/rust/otap-dataflow/crates/telemetry/src/lib.rs @@ -84,7 +84,7 @@ pub struct InternalTelemetrySystem { registry: TelemetryRegistryHandle, /// The process collecting and processing internal signals. - collector: collector::InternalCollector, + collector: Arc, /// The process reporting metrics to an external system. reporter: reporter::MetricsReporter, @@ -131,7 +131,7 @@ impl InternalTelemetrySystem { Ok(Self { registry: telemetry_registry, - collector, + collector: Arc::new(collector), reporter, dispatcher, meter_provider, @@ -146,6 +146,12 @@ impl InternalTelemetrySystem { self.registry.clone() } + /// Returns a shareable/cloneable handle to the internal metrics collector. + #[must_use] + pub fn collector(&self) -> Arc { + self.collector.clone() + } + /// Returns a shareable/cloneable handle to the metrics reporter. #[must_use] pub fn reporter(&self) -> reporter::MetricsReporter { @@ -158,26 +164,8 @@ impl InternalTelemetrySystem { self.dispatcher.clone() } - /// Splits the system into the collection loop and a shutdown handle. - /// - /// Returns a tuple of: - /// - The collector that runs the aggregation loop - /// - A shutdown handle for the OTel SDK providers - /// - /// This allows the caller to run the collector in a separate task while - /// retaining the ability to shutdown the OTel providers. - #[must_use] - pub fn into_parts(self) -> (collector::InternalCollector, ShutdownHandle) { - let shutdown_handle = ShutdownHandle { - meter_provider: self.meter_provider, - logger_provider: self.logger_provider, - _otel_runtime: self._otel_runtime, - }; - (self.collector, shutdown_handle) - } - /// Shuts down the OpenTelemetry SDK providers. - pub fn shutdown(&self) -> Result<(), Error> { + pub fn shutdown(self) -> Result<(), Error> { let meter_shutdown_result = self.meter_provider.shutdown(); let logger_shutdown_result = self.logger_provider.shutdown(); @@ -197,36 +185,3 @@ impl Default for InternalTelemetrySystem { Self::new(&TelemetryConfig::default()).expect("default telemetry config should be valid") } } - -/// Handle for shutting down telemetry providers. -/// -/// This handle is returned by [`InternalTelemetrySystem::into_parts`] and -/// holds ownership of the OTel SDK providers and runtime. Call [`shutdown`](Self::shutdown) -/// to gracefully shut down the providers before dropping. -pub struct ShutdownHandle { - /// OTel SDK meter provider for metrics export. - meter_provider: SdkMeterProvider, - - /// OTel SDK logger provider for logs export. - logger_provider: SdkLoggerProvider, - - /// Tokio runtime for OTLP exporters. - _otel_runtime: Option, -} - -impl ShutdownHandle { - /// Shuts down the OpenTelemetry SDK providers. - pub fn shutdown(&self) -> Result<(), Error> { - let meter_shutdown_result = self.meter_provider.shutdown(); - let logger_shutdown_result = self.logger_provider.shutdown(); - - if let Err(e) = meter_shutdown_result { - return Err(Error::ShutdownError(e.to_string())); - } - - if let Err(e) = logger_shutdown_result { - return Err(Error::ShutdownError(e.to_string())); - } - Ok(()) - } -}