diff --git a/rust/otap-dataflow/crates/admin/src/pipeline_group.rs b/rust/otap-dataflow/crates/admin/src/pipeline_group.rs index 719696cf58..b3a049a037 100644 --- a/rust/otap-dataflow/crates/admin/src/pipeline_group.rs +++ b/rust/otap-dataflow/crates/admin/src/pipeline_group.rs @@ -8,6 +8,16 @@ //! - Query parameters: //! - `wait` (bool, default: false) - if true, block until all pipelines have stopped //! - `timeout_secs` (u64, default: 60) - maximum seconds to wait when `wait=true` +//! +//! Example (fire-and-forget): +//! ```sh +//! curl -X POST http://localhost:8080/pipeline-groups/shutdown +//! ``` +//! Example (wait for graceful shutdown with 30s timeout): +//! ```sh +//! curl -X POST "http://localhost:8080/pipeline-groups/shutdown?wait=true&timeout_secs=30" +//! ``` +//! //! - 200 OK if `wait=true` and all pipelines stopped successfully //! - 202 Accepted if the stop request was accepted and is being processed (async operation) //! - 400 Bad Request if the pipeline is already stopped (ToDo) diff --git a/rust/otap-dataflow/crates/otap/src/otap_receiver.rs b/rust/otap-dataflow/crates/otap/src/otap_receiver.rs index 9bda5cc61f..d7cd201d1e 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_receiver.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_receiver.rs @@ -319,6 +319,7 @@ impl shared::Receiver for OTAPReceiver { loop { match ctrl_msg_recv.recv().await { Ok(NodeControlMsg::Shutdown { deadline, .. }) => { + otap_df_telemetry::otel_info!("otap.receiver.shutdown"); let snapshot = self.metrics.snapshot(); _ = telemetry_cancel_handle.cancel().await; return Ok(TerminalState::new(deadline, [snapshot])); diff --git a/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs b/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs index 559ec057c5..8427a2ef25 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_exporter.rs @@ -119,9 +119,8 @@ impl Exporter for OTLPExporter { effect_handler: EffectHandler, ) -> Result { otel_info!( - "exporter.start", - grpc_endpoint = self.config.grpc.grpc_endpoint.as_str(), - message = "Starting OTLP Exporter" + "otlp.exporter.grpc.start", + grpc_endpoint = self.config.grpc.grpc_endpoint.as_str() ); self.config.grpc.log_proxy_info(); @@ -195,7 +194,7 @@ impl Exporter for OTLPExporter { msg } else if inflight_exports.is_empty() { let msg = msg_chan.recv().await?; - otel_debug!("exporter.receive", "Received message from pipeline"); + otel_debug!("otlp.exporter.grpc.receive"); msg } else { let completion_fut = inflight_exports.next_completion().fuse(); @@ -217,7 +216,7 @@ impl Exporter for OTLPExporter { } msg = recv_fut => { let msg = msg?; - otel_debug!("exporter.receive", "Received message from pipeline"); + otel_debug!("otlp.exporter.grpc.receive"); msg }, } @@ -225,6 +224,7 @@ impl Exporter for OTLPExporter { match msg { Message::Control(NodeControlMsg::Shutdown { deadline, .. }) => { + otel_info!("otlp.exporter.shutdown"); debug_assert!( pending_msg.is_none(), "pending message should have been drained before shutdown" diff --git a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs index 97d411e314..c7a187e676 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs @@ -383,6 +383,7 @@ impl OTLPReceiver { ) -> Result, Error> { match msg { NodeControlMsg::Shutdown { deadline, .. } => { + otap_df_telemetry::otel_info!("otlp.receiver.shutdown"); let snapshot = self.metrics.lock().snapshot(); if let Some(handle) = telemetry_cancel_handle.take() { _ = handle.cancel().await;