diff --git a/lib/vector-common/src/shutdown.rs b/lib/vector-common/src/shutdown.rs index 79f58978bd6e9..d1b35be5c6fb7 100644 --- a/lib/vector-common/src/shutdown.rs +++ b/lib/vector-common/src/shutdown.rs @@ -107,9 +107,11 @@ impl ShutdownSignal { } } +type IsInternal = bool; + #[derive(Debug, Default)] pub struct SourceShutdownCoordinator { - shutdown_begun_triggers: HashMap, + shutdown_begun_triggers: HashMap, shutdown_force_triggers: HashMap, shutdown_complete_tripwires: HashMap, } @@ -121,13 +123,14 @@ impl SourceShutdownCoordinator { pub fn register_source( &mut self, id: &ComponentKey, + internal: bool, ) -> (ShutdownSignal, impl Future) { let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new(); let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new(); let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new(); self.shutdown_begun_triggers - .insert(id.clone(), shutdown_begun_trigger); + .insert(id.clone(), (internal, shutdown_begun_trigger)); self.shutdown_force_triggers .insert(id.clone(), force_shutdown_trigger); self.shutdown_complete_tripwires @@ -201,13 +204,14 @@ impl SourceShutdownCoordinator { /// Panics if this coordinator has had its triggers removed (ie /// has been taken over with `Self::takeover_source`). pub fn shutdown_all(self, deadline: Option) -> impl Future { - let mut complete_futures = Vec::new(); + let mut internal_sources_complete_futures = Vec::new(); + let mut external_sources_complete_futures = Vec::new(); let shutdown_begun_triggers = self.shutdown_begun_triggers; let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires; let mut shutdown_force_triggers = self.shutdown_force_triggers; - for (id, trigger) in shutdown_begun_triggers { + for (id, (internal, trigger)) in shutdown_begun_triggers { trigger.cancel(); let shutdown_complete_tripwire = @@ -229,10 +233,16 @@ impl SourceShutdownCoordinator { deadline, ); - complete_futures.push(source_complete); + if internal { + internal_sources_complete_futures.push(source_complete); + } else { + external_sources_complete_futures.push(source_complete); + } } - futures::future::join_all(complete_futures).map(|_| ()) + futures::future::join_all(external_sources_complete_futures) + .then(|_| futures::future::join_all(internal_sources_complete_futures)) + .map(|_| ()) } /// Sends the signal to the given source to begin shutting down. Returns a future that resolves @@ -250,11 +260,12 @@ impl SourceShutdownCoordinator { id: &ComponentKey, deadline: Instant, ) -> impl Future { - let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { - panic!( + let (_, begin_shutdown_trigger) = + self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| { + panic!( "shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator" ) - }); + }); // This is what actually triggers the source to begin shutting down. begin_shutdown_trigger.cancel(); @@ -336,7 +347,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (shutdown_signal, _) = shutdown.register_source(&id); + let (shutdown_signal, _) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); @@ -352,7 +363,7 @@ mod test { let mut shutdown = SourceShutdownCoordinator::default(); let id = ComponentKey::from("test"); - let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id); + let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false); let deadline = Instant::now() + Duration::from_secs(1); let shutdown_complete = shutdown.shutdown_source(&id, deadline); diff --git a/src/config/source.rs b/src/config/source.rs index 1353c18c05dc4..5e53ebbad1725 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -143,7 +143,7 @@ impl SourceContext { out: SourceSender, ) -> (Self, crate::shutdown::SourceShutdownCoordinator) { let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(key); + let (shutdown_signal, _) = shutdown.register_source(key, false); ( Self { key: key.clone(), diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 93366629f3420..73e5aa601b0c0 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -872,7 +872,7 @@ mod test { source_id: &ComponentKey, shutdown: &mut SourceShutdownCoordinator, ) -> (SocketAddr, JoinHandle>) { - let (shutdown_signal, _) = shutdown.register_source(source_id); + let (shutdown_signal, _) = shutdown.register_source(source_id, false); init_udp_inner(sender, source_id, shutdown_signal, None, false).await } diff --git a/src/sources/util/framestream.rs b/src/sources/util/framestream.rs index 194dd48432074..8bae468a85241 100644 --- a/src/sources/util/framestream.rs +++ b/src/sources/util/framestream.rs @@ -728,7 +728,7 @@ mod test { let source_id = ComponentKey::from(source_id); let socket_path = frame_handler.socket_path(); let mut shutdown = SourceShutdownCoordinator::default(); - let (shutdown_signal, _) = shutdown.register_source(&source_id); + let (shutdown_signal, _) = shutdown.register_source(&source_id, false); let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline) .expect("Failed to build framestream unix source."); diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ba0d2dda8a761..0b153bd7d89b8 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -70,6 +70,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { .unwrap_or_else(crate::num_threads) }); +const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"]; + /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces( config: &super::Config, @@ -315,8 +317,9 @@ impl<'a> Builder<'a> { let pipeline = builder.build(); - let (shutdown_signal, force_shutdown_tripwire) = - self.shutdown_coordinator.register_source(key); + let (shutdown_signal, force_shutdown_tripwire) = self + .shutdown_coordinator + .register_source(key, INTERNAL_SOURCES.contains(&typetag)); let context = SourceContext { key: key.clone(),