Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions lib/vector-common/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,11 @@ impl ShutdownSignal {
}
}

type IsInternal = bool;

#[derive(Debug, Default)]
pub struct SourceShutdownCoordinator {
shutdown_begun_triggers: HashMap<ComponentKey, Trigger>,
shutdown_begun_triggers: HashMap<ComponentKey, (IsInternal, Trigger)>,
shutdown_force_triggers: HashMap<ComponentKey, Trigger>,
shutdown_complete_tripwires: HashMap<ComponentKey, Tripwire>,
}
Expand All @@ -121,13 +123,14 @@ impl SourceShutdownCoordinator {
pub fn register_source(
&mut self,
id: &ComponentKey,
internal: bool,
) -> (ShutdownSignal, impl Future<Output = ()>) {
let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new();
let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new();
let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new();

self.shutdown_begun_triggers
.insert(id.clone(), shutdown_begun_trigger);
.insert(id.clone(), (internal, shutdown_begun_trigger));
self.shutdown_force_triggers
.insert(id.clone(), force_shutdown_trigger);
self.shutdown_complete_tripwires
Expand Down Expand Up @@ -201,13 +204,14 @@ impl SourceShutdownCoordinator {
/// Panics if this coordinator has had its triggers removed (ie
/// has been taken over with `Self::takeover_source`).
pub fn shutdown_all(self, deadline: Option<Instant>) -> impl Future<Output = ()> {
let mut complete_futures = Vec::new();
let mut internal_sources_complete_futures = Vec::new();
let mut external_sources_complete_futures = Vec::new();

let shutdown_begun_triggers = self.shutdown_begun_triggers;
let mut shutdown_complete_tripwires = self.shutdown_complete_tripwires;
let mut shutdown_force_triggers = self.shutdown_force_triggers;

for (id, trigger) in shutdown_begun_triggers {
for (id, (internal, trigger)) in shutdown_begun_triggers {
trigger.cancel();

let shutdown_complete_tripwire =
Expand All @@ -229,10 +233,16 @@ impl SourceShutdownCoordinator {
deadline,
);

complete_futures.push(source_complete);
if internal {
internal_sources_complete_futures.push(source_complete);
} else {
external_sources_complete_futures.push(source_complete);
}
}

futures::future::join_all(complete_futures).map(|_| ())
futures::future::join_all(external_sources_complete_futures)
.then(|_| futures::future::join_all(internal_sources_complete_futures))
.map(|_| ())
}

/// Sends the signal to the given source to begin shutting down. Returns a future that resolves
Expand All @@ -250,11 +260,12 @@ impl SourceShutdownCoordinator {
id: &ComponentKey,
deadline: Instant,
) -> impl Future<Output = bool> {
let begin_shutdown_trigger = self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
panic!(
let (_, begin_shutdown_trigger) =
self.shutdown_begun_triggers.remove(id).unwrap_or_else(|| {
panic!(
"shutdown_begun_trigger for source \"{id}\" not found in the ShutdownCoordinator"
)
});
});
// This is what actually triggers the source to begin shutting down.
begin_shutdown_trigger.cancel();

Expand Down Expand Up @@ -336,7 +347,7 @@ mod test {
let mut shutdown = SourceShutdownCoordinator::default();
let id = ComponentKey::from("test");

let (shutdown_signal, _) = shutdown.register_source(&id);
let (shutdown_signal, _) = shutdown.register_source(&id, false);
Comment on lines -339 to +350
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be a good way of testing that one set shuts down before the other does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do so if we refactored shutdown_all to provide better testing observability! I tried some approaches with the existing API, but I couldn't find a way. I will write a commit for this tomorrow and we can decide if it's worth it 🚀


let deadline = Instant::now() + Duration::from_secs(1);
let shutdown_complete = shutdown.shutdown_source(&id, deadline);
Expand All @@ -352,7 +363,7 @@ mod test {
let mut shutdown = SourceShutdownCoordinator::default();
let id = ComponentKey::from("test");

let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id);
let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(&id, false);

let deadline = Instant::now() + Duration::from_secs(1);
let shutdown_complete = shutdown.shutdown_source(&id, deadline);
Expand Down
2 changes: 1 addition & 1 deletion src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl SourceContext {
out: SourceSender,
) -> (Self, crate::shutdown::SourceShutdownCoordinator) {
let mut shutdown = crate::shutdown::SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(key);
let (shutdown_signal, _) = shutdown.register_source(key, false);
(
Self {
key: key.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ mod test {
source_id: &ComponentKey,
shutdown: &mut SourceShutdownCoordinator,
) -> (SocketAddr, JoinHandle<Result<(), ()>>) {
let (shutdown_signal, _) = shutdown.register_source(source_id);
let (shutdown_signal, _) = shutdown.register_source(source_id, false);
init_udp_inner(sender, source_id, shutdown_signal, None, false).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/sources/util/framestream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ mod test {
let source_id = ComponentKey::from(source_id);
let socket_path = frame_handler.socket_path();
let mut shutdown = SourceShutdownCoordinator::default();
let (shutdown_signal, _) = shutdown.register_source(&source_id);
let (shutdown_signal, _) = shutdown.register_source(&source_id, false);
let server = build_framestream_unix_source(frame_handler, shutdown_signal, pipeline)
.expect("Failed to build framestream unix source.");

Expand Down
7 changes: 5 additions & 2 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy<usize> = Lazy::new(|| {
.unwrap_or_else(crate::num_threads)
});

const INTERNAL_SOURCES: [&str; 2] = ["internal_logs", "internal_metrics"];

/// Builds only the new pieces, and doesn't check their topology.
pub async fn build_pieces(
config: &super::Config,
Expand Down Expand Up @@ -315,8 +317,9 @@ impl<'a> Builder<'a> {

let pipeline = builder.build();

let (shutdown_signal, force_shutdown_tripwire) =
self.shutdown_coordinator.register_source(key);
let (shutdown_signal, force_shutdown_tripwire) = self
.shutdown_coordinator
.register_source(key, INTERNAL_SOURCES.contains(&typetag));

let context = SourceContext {
key: key.clone(),
Expand Down