From d291b923fd2f9bd915cc2a5c5b67919764be64b6 Mon Sep 17 00:00:00 2001 From: Dominic Burkart Date: Fri, 23 Jun 2023 17:40:06 +0200 Subject: [PATCH 1/3] close internal sources after external ones --- lib/vector-common/src/shutdown.rs | 42 ++++++++++++++++++++++--------- src/config/source.rs | 2 +- src/sources/socket/mod.rs | 2 +- src/sources/util/framestream.rs | 2 +- src/topology/builder.rs | 7 ++++-- 5 files changed, 38 insertions(+), 17 deletions(-) diff --git a/lib/vector-common/src/shutdown.rs b/lib/vector-common/src/shutdown.rs index 79f58978bd6e9..fd133f12ea4af 100644 --- a/lib/vector-common/src/shutdown.rs +++ b/lib/vector-common/src/shutdown.rs @@ -6,11 +6,12 @@ use std::{ pin::Pin, sync::Arc, task::{ready, Context, Poll}, + time::Duration, }; use futures::{future, FutureExt}; use stream_cancel::{Trigger, Tripwire}; -use tokio::time::{timeout_at, Instant}; +use tokio::time::{sleep, timeout_at, Instant}; use crate::{config::ComponentKey, trigger::DisabledTrigger}; @@ -107,11 +108,14 @@ impl ShutdownSignal { } } +type IsInternal = bool; + #[derive(Debug, Default)] pub struct SourceShutdownCoordinator { - shutdown_begun_triggers: HashMap, + shutdown_begun_triggers: HashMap, shutdown_force_triggers: HashMap, shutdown_complete_tripwires: HashMap, + internal_telemetry_sources_shutdown_delay: Option, } impl SourceShutdownCoordinator { @@ -121,13 +125,14 @@ impl SourceShutdownCoordinator { pub fn register_source( &mut self, id: &ComponentKey, + internal: bool, ) -> (ShutdownSignal, impl Future) { let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new(); let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new(); let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new(); self.shutdown_begun_triggers - .insert(id.clone(), shutdown_begun_trigger); + .insert(id.clone(), (internal, shutdown_begun_trigger)); self.shutdown_force_triggers .insert(id.clone(), force_shutdown_trigger); self.shutdown_complete_tripwires @@ -201,13 +206,14 @@ impl SourceShutdownCoordinator { /// Panics if this coordinator has had its triggers removed (ie /// has been taken over with `Self::takeover_source`). pub fn shutdown_all(self, deadline: Option) -> impl Future { - let mut complete_futures = Vec::new(); + let mut internal_sources_complete_futures = Vec::new(); + let mut external_sources_complete_futures = Vec::new(); let shutdown_begun_triggers = self.shutdown_begun_triggers; let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires; let mut shutdown_force_triggers = self.shutdown_force_triggers; - for (id, trigger) in shutdown_begun_triggers { + for (id, (internal, trigger)) in shutdown_begun_triggers { trigger.cancel(); let shutdown_complete_tripwire = @@ -229,10 +235,21 @@ impl SourceShutdownCoordinator { deadline, ); - complete_futures.push(source_complete); + if internal { + internal_sources_complete_futures.push(source_complete); + } else { + external_sources_complete_futures.push(source_complete); + } } - futures::future::join_all(complete_futures).map(|_| ()) + futures::future::join_all(external_sources_complete_futures) + .then(move |_| async move { + if let Some(delay) = self.internal_telemetry_sources_shutdown_delay { + sleep(delay).await + } + }) + .then(|_| futures::future::join_all(internal_sources_complete_futures)) + .map(|_| ()) } /// Sends the signal to the given source to begin shutting down. Returns a future that resolves @@ -250,11 +267,12 @@ impl SourceShutdownCoordinator { id: &ComponentKey, deadline: Instant, ) -> impl Future { - let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { - panic!( + let (_, begin_shutdown_trigger) = + self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { + panic!( "shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator" ) - }); + }); // This is what actually triggers the source to begin shutting down. begin_shutdown_trigger.cancel(); @@ -336,7 +354,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (shutdown_signal, _) = shutdown.register_source(&id); + let (shutdown_signal, _) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); @@ -352,7 +370,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id); + let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); diff --git a/src/config/source.rs b/src/config/source.rs index 1353c18c05dc4..5e53ebbad1725 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -143,7 +143,7 @@ impl SourceContext { out: SourceSender, ) -> (Self, crate::shutdown::SourceShutdownCoordinator) { let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(key); + let (shutdown_signal, _) = shutdown.register_source(key, false); ( Self { key: key.clone(), diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..73e5aa601b0c0 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -872,7 +872,7 @@ mod test { source_id: &ComponentKey, shutdown: &mut SourceShutdownCoordinator, ) -> (SocketAddr, JoinHandle>) { - let (shutdown_signal, _) = shutdown.register_source(source_id); + let (shutdown_signal, _) = shutdown.register_source(source_id, false); init_udp_inner(sender, source_id, shutdown_signal, None, false).await } diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 194dd48432074..8bae468a85241 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -728,7 +728,7 @@ mod test { let source_id = ComponentKey::from(source_id); let socket_path = frame_handler.socket_path(); let mut shutdown = SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(&source_id); + let (shutdown_signal, _) = shutdown.register_source(&source_id, false); let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline) .expect("Failed to build framestream unix source."); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ba0d2dda8a761..9ab99c54799cf 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -70,6 +70,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { .unwrap_or_else(crate::num_threads) }); +const INTERNAL_SOURCES: [&str; 3] = ["internal_logs", "internal_metrics", "datadog_traces"]; + /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces( config: &super::Config, @@ -315,8 +317,9 @@ impl<'a> Builder<'a> { let pipeline = builder.build(); - let (shutdown_signal, force_shutdown_tripwire) = - self.shutdown_coordinator.register_source(key); + let (shutdown_signal, force_shutdown_tripwire) = self + .shutdown_coordinator + .register_source(key, INTERNAL_SOURCES.contains(&typetag)); let context = SourceContext { key: key.clone(), From f092bf448563a0d6c55608a4447303724be0b81a Mon Sep 17 00:00:00 2001 From: Dominic Burkart Date: Fri, 23 Jun 2023 18:02:10 +0200 Subject: [PATCH 2/3] don't implement configurable internal source delay shutdown for now --- lib/vector-common/src/shutdown.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/vector-common/src/shutdown.rs b/lib/vector-common/src/shutdown.rs index fd133f12ea4af..d1b35be5c6fb7 100644 --- a/lib/vector-common/src/shutdown.rs +++ b/lib/vector-common/src/shutdown.rs @@ -6,12 +6,11 @@ use std::{ pin::Pin, sync::Arc, task::{ready, Context, Poll}, - time::Duration, }; use futures::{future, FutureExt}; use stream_cancel::{Trigger, Tripwire}; -use tokio::time::{sleep, timeout_at, Instant}; +use tokio::time::{timeout_at, Instant}; use crate::{config::ComponentKey, trigger::DisabledTrigger}; @@ -115,7 +114,6 @@ pub struct SourceShutdownCoordinator { shutdown_begun_triggers: HashMap, shutdown_force_triggers: HashMap, shutdown_complete_tripwires: HashMap, - internal_telemetry_sources_shutdown_delay: Option, } impl SourceShutdownCoordinator { @@ -243,11 +241,6 @@ impl SourceShutdownCoordinator { } futures::future::join_all(external_sources_complete_futures) - .then(move |_| async move { - if let Some(delay) = self.internal_telemetry_sources_shutdown_delay { - sleep(delay).await - } - }) .then(|_| futures::future::join_all(internal_sources_complete_futures)) .map(|_| ()) } From 62db883f551d2f0d3ee2439b7dda77848e9c534d Mon Sep 17 00:00:00 2001 From: Dominic Burkart Date: Mon, 26 Jun 2023 15:02:55 +0200 Subject: [PATCH 3/3] datadog traces are not internal --- src/topology/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 9ab99c54799cf..0b153bd7d89b8 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -70,7 +70,7 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { .unwrap_or_else(crate::num_threads) }); -const INTERNAL_SOURCES: [&str; 3] = ["internal_logs", "internal_metrics", "datadog_traces"]; +const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"]; /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces(