From 698b61e23ae6df0760d1d9d773b9e2dca2940415 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 3 Apr 2023 15:17:12 +0100 Subject: [PATCH 1/5] Split build_pieces up Signed-off-by: Stephen Wakely --- src/topology/builder.rs | 672 ++++++++++++++++++++++------------------ 1 file changed, 369 insertions(+), 303 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 820d4a2bb5636..32cdeecd2948c 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -142,12 +142,11 @@ pub struct Pieces { pub async fn build_pieces( config: &super::Config, diff: &ConfigDiff, - mut buffers: HashMap, + buffers: HashMap, ) -> Result> { let mut inputs = HashMap::new(); let mut outputs = HashMap::new(); let mut tasks = HashMap::new(); - let mut source_tasks = HashMap::new(); let mut healthchecks = HashMap::new(); let mut shutdown_coordinator = SourceShutdownCoordinator::default(); let mut detach_triggers = HashMap::new(); @@ -157,183 +156,245 @@ pub async fn build_pieces( let (enrichment_tables, enrichment_errors) = load_enrichment_tables(config, diff).await; errors.extend(enrichment_errors); - // Build sources - for (key, source) in config - .sources() - .filter(|(key, _)| diff.sources.contains_new(key)) - { - debug!(component = %key, "Building new source."); + let source_tasks = build_sources( + config, + diff, + &mut shutdown_coordinator, + &mut errors, + &mut outputs, + &mut tasks, + ) + .await; + + build_transforms( + config, + diff, + enrichment_tables, + &mut errors, + &mut inputs, + &mut outputs, + &mut tasks, + ) + .await; + + build_sinks( + config, + diff, + &mut errors, + buffers, + &mut inputs, + &mut healthchecks, + &mut tasks, + &mut detach_triggers, + ) + .await; - let typetag = source.inner.get_component_name(); - let source_outputs = source.inner.outputs(config.schema.log_namespace()); + // We should have all the data for the enrichment tables loaded now, so switch them over to + // readonly. + enrichment_tables.finish_load(); - let span = error_span!( - "source", - component_kind = "source", - component_id = %key.id(), - component_type = %source.inner.get_component_name(), - // maintained for compatibility - component_name = %key.id(), - ); - let _entered_span = span.enter(); + let mut finalized_outputs = HashMap::new(); + for (id, output) in outputs { + let entry = finalized_outputs + .entry(id.component) + .or_insert_with(HashMap::new); + entry.insert(id.port, output); + } - let task_name = format!( - ">> {} ({}, pump) >>", - source.inner.get_component_name(), - key.id() - ); + if errors.is_empty() { + let pieces = Pieces { + inputs, + outputs: finalized_outputs, + tasks, + source_tasks, + healthchecks, + shutdown_coordinator, + detach_triggers, + }; - let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); - let mut pumps = Vec::new(); - let mut controls = HashMap::new(); - let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); + Ok(pieces) + } else { + Err(errors) + } +} - for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); +async fn build_sinks( + config: &crate::config::Config, + diff: &ConfigDiff, + errors: &mut Vec, + mut buffers: HashMap, + inputs: &mut HashMap, Inputs)>, + healthchecks: &mut HashMap, + tasks: &mut HashMap, + detach_triggers: &mut HashMap, +) { + // Build sinks + for (key, sink) in config + .sinks() + .filter(|(key, _)| diff.sinks.contains_new(key)) + { + debug!(component = %key, "Building new sink."); - let (mut fanout, control) = Fanout::new(); - let pump = async move { - debug!("Source pump starting."); + let sink_inputs = &sink.inputs; + let healthcheck = sink.healthcheck(); + let enable_healthcheck = healthcheck.enabled && config.healthchecks.enabled; - while let Some(array) = rx.next().await { - fanout.send(array).await.map_err(|e| { - debug!("Source pump finished with an error."); - TaskError::wrapped(e) - })?; - } + let typetag = sink.inner.get_component_name(); + let input_type = sink.inner.input().data_type(); - debug!("Source pump finished normally."); - Ok(TaskOutput::Source) - }; + // At this point, we've validated that all transforms are valid, including any + // transform that mutates the schema provided by their sources. We can now validate the + // schema expectations of each individual sink. + if let Err(mut err) = schema::validate_sink_expectations(key, sink, config) { + errors.append(&mut err); + }; - pumps.push(pump.instrument(span.clone())); - controls.insert( - OutputId { - component: key.clone(), - port: output.port.clone(), - }, - control, + let (tx, rx) = if let Some(buffer) = buffers.remove(key) { + buffer + } else { + let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { + BufferType::Memory { .. } => "memory", + BufferType::DiskV2 { .. } => "disk", + }; + let buffer_span = error_span!( + "sink", + component_kind = "sink", + component_id = %key.id(), + component_type = typetag, + component_name = %key.id(), + buffer_type, ); - - let port = output.port.clone(); - if let Some(definition) = output.schema_definition(config.schema.enabled) { - schema_definitions.insert(port, definition); - } - } - - let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); - let pump = async move { - debug!("Source pump supervisor starting."); - - // Spawn all of the per-output pumps and then await their completion. - // - // If any of the pumps complete with an error, or panic/are cancelled, we return - // immediately. - let mut handles = FuturesUnordered::new(); - for pump in pumps { - handles.push(spawn_named(pump, task_name.as_ref())); - } - - let mut had_pump_error = false; - while let Some(output) = handles.try_next().await? { - if let Err(e) = output { - // Immediately send the error to the source's wrapper future, but ignore any - // errors during the send, since nested errors wouldn't make any sense here. - let _ = pump_error_tx.send(e); - had_pump_error = true; - break; + let buffer = sink + .buffer + .build(config.global.data_dir.clone(), key.to_string(), buffer_span) + .await; + match buffer { + Err(error) => { + errors.push(format!("Sink \"{}\": {}", key, error)); + continue; } + Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), } - - if had_pump_error { - debug!("Source pump supervisor task finished with an error."); - } else { - debug!("Source pump supervisor task finished normally."); - } - Ok(TaskOutput::Source) }; - let pump = Task::new(key.clone(), typetag, pump); - let pipeline = builder.build(); - - let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_source(key); - - let context = SourceContext { - key: key.clone(), + let cx = SinkContext { + healthcheck, globals: config.global.clone(), - shutdown: shutdown_signal, - out: pipeline, - proxy: ProxyConfig::merge_with_env(&config.global.proxy, &source.proxy), - acknowledgements: source.sink_acknowledgements, - schema_definitions, + proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), schema: config.schema, }; - let source = source.inner.build(context).await; - let server = match source { + + let (sink, healthcheck) = match sink.inner.build(cx).await { Err(error) => { - errors.push(format!("Source \"{}\": {}", key, error)); + errors.push(format!("Sink \"{}\": {}", key, error)); continue; } - Ok(server) => server, + Ok(built) => built, }; - // Build a wrapper future that drives the actual source future, but returns early if we've - // been signalled to forcefully shutdown, or if the source pump encounters an error. - // - // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully - // within the alloted time window. This can occur normally for certain sources, like stdin, - // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time - // to shutdown unless some input is given. - let server = async move { - debug!("Source starting."); - - let mut result = select! { - biased; - - // We've been told that we must forcefully shut down. - _ = force_shutdown_tripwire => Ok(()), + let (trigger, tripwire) = Tripwire::new(); - // The source pump encountered an error, which we're now bubbling up here to stop - // the source as well, since the source running makes no sense without the pump. - // - // We only match receiving a message, not the error of the sender being dropped, - // just to keep things simpler. - Ok(e) = &mut pump_error_rx => Err(e), + let sink = async move { + debug!("Sink starting."); - // The source finished normally. - result = server => result.map_err(|_| TaskError::Opaque), - }; + // Why is this Arc>> needed you ask. + // In case when this function build_pieces errors + // this future won't be run so this rx won't be taken + // which will enable us to reuse rx to rebuild + // old configuration by passing this Arc>> + // yet again. + let rx = rx + .lock() + .unwrap() + .take() + .expect("Task started but input has been taken."); - // Even though we already tried to receive any pump task error above, we may have exited - // on the source itself returning an error due to task scheduling, where the pump task - // encountered an error, sent it over the oneshot, but we were polling the source - // already and hit an error trying to send to the now-shutdown pump task. - // - // Since the error from the source is opaque at the moment (i.e. `()`), we try a final - // time to see if the pump task encountered an error, using _that_ instead if so, to - // propagate the true error that caused the source to have to stop. - if let Ok(e) = pump_error_rx.try_recv() { - result = Err(e); - } + let mut rx = wrap(rx); - match result { - Ok(()) => { - debug!("Source finished normally."); - Ok(TaskOutput::Source) - } - Err(e) => { - debug!("Source finished with an error."); - Err(e) - } - } + let events_received = register!(EventsReceived); + sink.run( + rx.by_ref() + .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) + .inspect(|events| { + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )) + }) + .take_until_if(tripwire), + ) + .await + .map(|_| { + debug!("Sink finished normally."); + TaskOutput::Sink(rx) + }) + .map_err(|_| { + debug!("Sink finished with an error."); + TaskError::Opaque + }) }; - let server = Task::new(key.clone(), typetag, server); - outputs.extend(controls); - tasks.insert(key.clone(), pump); - source_tasks.insert(key.clone(), server); + let task = Task::new(key.clone(), typetag, sink); + + let component_key = key.clone(); + let healthcheck_task = async move { + if enable_healthcheck { + let duration = Duration::from_secs(10); + timeout(duration, healthcheck) + .map(|result| match result { + Ok(Ok(_)) => { + info!("Healthcheck passed."); + Ok(TaskOutput::Healthcheck) + } + Ok(Err(error)) => { + error!( + msg = "Healthcheck failed.", + %error, + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(error)) + } + Err(e) => { + error!( + msg = "Healthcheck timed out.", + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(Box::new(e))) + } + }) + .await + } else { + info!("Healthcheck disabled."); + Ok(TaskOutput::Healthcheck) + } + }; + + let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); + + inputs.insert(key.clone(), (tx, sink_inputs.clone())); + healthchecks.insert(key.clone(), healthcheck_task); + tasks.insert(key.clone(), task); + detach_triggers.insert(key.clone(), trigger); } +} +async fn build_transforms( + config: &crate::config::Config, + diff: &ConfigDiff, + enrichment_tables: &enrichment::TableRegistry, + errors: &mut Vec, + inputs: &mut HashMap, Inputs)>, + outputs: &mut HashMap>, + tasks: &mut HashMap, +) { let mut definition_cache = HashMap::default(); // Build transforms @@ -412,191 +473,196 @@ pub async fn build_pieces( outputs.extend(transform_outputs); tasks.insert(key.clone(), transform_task); } +} - // Build sinks - for (key, sink) in config - .sinks() - .filter(|(key, _)| diff.sinks.contains_new(key)) +async fn build_sources( + config: &crate::config::Config, + diff: &ConfigDiff, + shutdown_coordinator: &mut SourceShutdownCoordinator, + errors: &mut Vec, + outputs: &mut HashMap>, + tasks: &mut HashMap, +) -> HashMap { + let mut source_tasks = HashMap::new(); + + // Build sources + for (key, source) in config + .sources() + .filter(|(key, _)| diff.sources.contains_new(key)) { - debug!(component = %key, "Building new sink."); + debug!(component = %key, "Building new source."); - let sink_inputs = &sink.inputs; - let healthcheck = sink.healthcheck(); - let enable_healthcheck = healthcheck.enabled && config.healthchecks.enabled; + let typetag = source.inner.get_component_name(); + let source_outputs = source.inner.outputs(config.schema.log_namespace()); - let typetag = sink.inner.get_component_name(); - let input_type = sink.inner.input().data_type(); + let span = error_span!( + "source", + component_kind = "source", + component_id = %key.id(), + component_type = %source.inner.get_component_name(), + // maintained for compatibility + component_name = %key.id(), + ); + let _entered_span = span.enter(); - // At this point, we've validated that all transforms are valid, including any - // transform that mutates the schema provided by their sources. We can now validate the - // schema expectations of each individual sink. - if let Err(mut err) = schema::validate_sink_expectations(key, sink, config) { - errors.append(&mut err); - }; + let task_name = format!( + ">> {} ({}, pump) >>", + source.inner.get_component_name(), + key.id() + ); - let (tx, rx) = if let Some(buffer) = buffers.remove(key) { - buffer - } else { - let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { - BufferType::Memory { .. } => "memory", - BufferType::DiskV2 { .. } => "disk", + let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); + let mut pumps = Vec::new(); + let mut controls = HashMap::new(); + let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); + + for output in source_outputs.into_iter() { + let mut rx = builder.add_source_output(output.clone()); + + let (mut fanout, control) = Fanout::new(); + let pump = async move { + debug!("Source pump starting."); + + while let Some(array) = rx.next().await { + fanout.send(array).await.map_err(|e| { + debug!("Source pump finished with an error."); + TaskError::wrapped(e) + })?; + } + + debug!("Source pump finished normally."); + Ok(TaskOutput::Source) }; - let buffer_span = error_span!( - "sink", - component_kind = "sink", - component_id = %key.id(), - component_type = typetag, - component_name = %key.id(), - buffer_type, + + pumps.push(pump.instrument(span.clone())); + controls.insert( + OutputId { + component: key.clone(), + port: output.port.clone(), + }, + control, ); - let buffer = sink - .buffer - .build(config.global.data_dir.clone(), key.to_string(), buffer_span) - .await; - match buffer { - Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); - continue; + + let port = output.port.clone(); + if let Some(definition) = output.schema_definition(config.schema.enabled) { + schema_definitions.insert(port, definition); + } + } + + let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); + let pump = async move { + debug!("Source pump supervisor starting."); + + // Spawn all of the per-output pumps and then await their completion. + // + // If any of the pumps complete with an error, or panic/are cancelled, we return + // immediately. + let mut handles = FuturesUnordered::new(); + for pump in pumps { + handles.push(spawn_named(pump, task_name.as_ref())); + } + + let mut had_pump_error = false; + while let Some(output) = handles.try_next().await? { + if let Err(e) = output { + // Immediately send the error to the source's wrapper future, but ignore any + // errors during the send, since nested errors wouldn't make any sense here. + let _ = pump_error_tx.send(e); + had_pump_error = true; + break; } - Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), } + + if had_pump_error { + debug!("Source pump supervisor task finished with an error."); + } else { + debug!("Source pump supervisor task finished normally."); + } + Ok(TaskOutput::Source) }; + let pump = Task::new(key.clone(), typetag, pump); - let cx = SinkContext { - healthcheck, + let pipeline = builder.build(); + + let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_source(key); + + let context = SourceContext { + key: key.clone(), globals: config.global.clone(), - proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), + shutdown: shutdown_signal, + out: pipeline, + proxy: ProxyConfig::merge_with_env(&config.global.proxy, &source.proxy), + acknowledgements: source.sink_acknowledgements, + schema_definitions, schema: config.schema, }; - - let (sink, healthcheck) = match sink.inner.build(cx).await { + let source = source.inner.build(context).await; + let server = match source { Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); + errors.push(format!("Source \"{}\": {}", key, error)); continue; } - Ok(built) => built, + Ok(server) => server, }; - let (trigger, tripwire) = Tripwire::new(); + // Build a wrapper future that drives the actual source future, but returns early if we've + // been signalled to forcefully shutdown, or if the source pump encounters an error. + // + // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully + // within the alloted time window. This can occur normally for certain sources, like stdin, + // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time + // to shutdown unless some input is given. + let server = async move { + debug!("Source starting."); - let sink = async move { - debug!("Sink starting."); + let mut result = select! { + biased; - // Why is this Arc>> needed you ask. - // In case when this function build_pieces errors - // this future won't be run so this rx won't be taken - // which will enable us to reuse rx to rebuild - // old configuration by passing this Arc>> - // yet again. - let rx = rx - .lock() - .unwrap() - .take() - .expect("Task started but input has been taken."); + // We've been told that we must forcefully shut down. + _ = force_shutdown_tripwire => Ok(()), - let mut rx = wrap(rx); + // The source pump encountered an error, which we're now bubbling up here to stop + // the source as well, since the source running makes no sense without the pump. + // + // We only match receiving a message, not the error of the sender being dropped, + // just to keep things simpler. + Ok(e) = &mut pump_error_rx => Err(e), - let events_received = register!(EventsReceived); - sink.run( - rx.by_ref() - .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) - .inspect(|events| { - events_received.emit(CountByteSize( - events.len(), - events.estimated_json_encoded_size_of(), - )) - }) - .take_until_if(tripwire), - ) - .await - .map(|_| { - debug!("Sink finished normally."); - TaskOutput::Sink(rx) - }) - .map_err(|_| { - debug!("Sink finished with an error."); - TaskError::Opaque - }) - }; + // The source finished normally. + result = server => result.map_err(|_| TaskError::Opaque), + }; - let task = Task::new(key.clone(), typetag, sink); + // Even though we already tried to receive any pump task error above, we may have exited + // on the source itself returning an error due to task scheduling, where the pump task + // encountered an error, sent it over the oneshot, but we were polling the source + // already and hit an error trying to send to the now-shutdown pump task. + // + // Since the error from the source is opaque at the moment (i.e. `()`), we try a final + // time to see if the pump task encountered an error, using _that_ instead if so, to + // propagate the true error that caused the source to have to stop. + if let Ok(e) = pump_error_rx.try_recv() { + result = Err(e); + } - let component_key = key.clone(); - let healthcheck_task = async move { - if enable_healthcheck { - let duration = Duration::from_secs(10); - timeout(duration, healthcheck) - .map(|result| match result { - Ok(Ok(_)) => { - info!("Healthcheck passed."); - Ok(TaskOutput::Healthcheck) - } - Ok(Err(error)) => { - error!( - msg = "Healthcheck failed.", - %error, - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(error)) - } - Err(e) => { - error!( - msg = "Healthcheck timed out.", - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(Box::new(e))) - } - }) - .await - } else { - info!("Healthcheck disabled."); - Ok(TaskOutput::Healthcheck) + match result { + Ok(()) => { + debug!("Source finished normally."); + Ok(TaskOutput::Source) + } + Err(e) => { + debug!("Source finished with an error."); + Err(e) + } } }; + let server = Task::new(key.clone(), typetag, server); - let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); - - inputs.insert(key.clone(), (tx, sink_inputs.clone())); - healthchecks.insert(key.clone(), healthcheck_task); - tasks.insert(key.clone(), task); - detach_triggers.insert(key.clone(), trigger); - } - - // We should have all the data for the enrichment tables loaded now, so switch them over to - // readonly. - enrichment_tables.finish_load(); - - let mut finalized_outputs = HashMap::new(); - for (id, output) in outputs { - let entry = finalized_outputs - .entry(id.component) - .or_insert_with(HashMap::new); - entry.insert(id.port, output); + outputs.extend(controls); + tasks.insert(key.clone(), pump); + source_tasks.insert(key.clone(), server); } - if errors.is_empty() { - let pieces = Pieces { - inputs, - outputs: finalized_outputs, - tasks, - source_tasks, - healthchecks, - shutdown_coordinator, - detach_triggers, - }; - - Ok(pieces) - } else { - Err(errors) - } + source_tasks } const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool { From 38e808a1ffc689b781d39e8679ced69d24533b46 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 3 Apr 2023 15:26:36 +0100 Subject: [PATCH 2/5] Reorder functions Signed-off-by: Stephen Wakely --- src/topology/builder.rs | 582 ++++++++++++++++++++-------------------- 1 file changed, 291 insertions(+), 291 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 32cdeecd2948c..4523bec9ce842 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -218,172 +218,194 @@ pub async fn build_pieces( } } -async fn build_sinks( +async fn build_sources( config: &crate::config::Config, diff: &ConfigDiff, + shutdown_coordinator: &mut SourceShutdownCoordinator, errors: &mut Vec, - mut buffers: HashMap, - inputs: &mut HashMap, Inputs)>, - healthchecks: &mut HashMap, + outputs: &mut HashMap>, tasks: &mut HashMap, - detach_triggers: &mut HashMap, -) { - // Build sinks - for (key, sink) in config - .sinks() - .filter(|(key, _)| diff.sinks.contains_new(key)) +) -> HashMap { + let mut source_tasks = HashMap::new(); + + // Build sources + for (key, source) in config + .sources() + .filter(|(key, _)| diff.sources.contains_new(key)) { - debug!(component = %key, "Building new sink."); + debug!(component = %key, "Building new source."); - let sink_inputs = &sink.inputs; - let healthcheck = sink.healthcheck(); - let enable_healthcheck = healthcheck.enabled && config.healthchecks.enabled; + let typetag = source.inner.get_component_name(); + let source_outputs = source.inner.outputs(config.schema.log_namespace()); - let typetag = sink.inner.get_component_name(); - let input_type = sink.inner.input().data_type(); + let span = error_span!( + "source", + component_kind = "source", + component_id = %key.id(), + component_type = %source.inner.get_component_name(), + // maintained for compatibility + component_name = %key.id(), + ); + let _entered_span = span.enter(); - // At this point, we've validated that all transforms are valid, including any - // transform that mutates the schema provided by their sources. We can now validate the - // schema expectations of each individual sink. - if let Err(mut err) = schema::validate_sink_expectations(key, sink, config) { - errors.append(&mut err); - }; + let task_name = format!( + ">> {} ({}, pump) >>", + source.inner.get_component_name(), + key.id() + ); - let (tx, rx) = if let Some(buffer) = buffers.remove(key) { - buffer - } else { - let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { - BufferType::Memory { .. } => "memory", - BufferType::DiskV2 { .. } => "disk", + let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); + let mut pumps = Vec::new(); + let mut controls = HashMap::new(); + let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); + + for output in source_outputs.into_iter() { + let mut rx = builder.add_source_output(output.clone()); + + let (mut fanout, control) = Fanout::new(); + let pump = async move { + debug!("Source pump starting."); + + while let Some(array) = rx.next().await { + fanout.send(array).await.map_err(|e| { + debug!("Source pump finished with an error."); + TaskError::wrapped(e) + })?; + } + + debug!("Source pump finished normally."); + Ok(TaskOutput::Source) }; - let buffer_span = error_span!( - "sink", - component_kind = "sink", - component_id = %key.id(), - component_type = typetag, - component_name = %key.id(), - buffer_type, + + pumps.push(pump.instrument(span.clone())); + controls.insert( + OutputId { + component: key.clone(), + port: output.port.clone(), + }, + control, ); - let buffer = sink - .buffer - .build(config.global.data_dir.clone(), key.to_string(), buffer_span) - .await; - match buffer { - Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); - continue; + + let port = output.port.clone(); + if let Some(definition) = output.schema_definition(config.schema.enabled) { + schema_definitions.insert(port, definition); + } + } + + let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); + let pump = async move { + debug!("Source pump supervisor starting."); + + // Spawn all of the per-output pumps and then await their completion. + // + // If any of the pumps complete with an error, or panic/are cancelled, we return + // immediately. + let mut handles = FuturesUnordered::new(); + for pump in pumps { + handles.push(spawn_named(pump, task_name.as_ref())); + } + + let mut had_pump_error = false; + while let Some(output) = handles.try_next().await? { + if let Err(e) = output { + // Immediately send the error to the source's wrapper future, but ignore any + // errors during the send, since nested errors wouldn't make any sense here. + let _ = pump_error_tx.send(e); + had_pump_error = true; + break; } - Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), } + + if had_pump_error { + debug!("Source pump supervisor task finished with an error."); + } else { + debug!("Source pump supervisor task finished normally."); + } + Ok(TaskOutput::Source) }; + let pump = Task::new(key.clone(), typetag, pump); - let cx = SinkContext { - healthcheck, + let pipeline = builder.build(); + + let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_source(key); + + let context = SourceContext { + key: key.clone(), globals: config.global.clone(), - proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), + shutdown: shutdown_signal, + out: pipeline, + proxy: ProxyConfig::merge_with_env(&config.global.proxy, &source.proxy), + acknowledgements: source.sink_acknowledgements, + schema_definitions, schema: config.schema, }; - - let (sink, healthcheck) = match sink.inner.build(cx).await { + let source = source.inner.build(context).await; + let server = match source { Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); + errors.push(format!("Source \"{}\": {}", key, error)); continue; } - Ok(built) => built, + Ok(server) => server, }; - let (trigger, tripwire) = Tripwire::new(); + // Build a wrapper future that drives the actual source future, but returns early if we've + // been signalled to forcefully shutdown, or if the source pump encounters an error. + // + // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully + // within the alloted time window. This can occur normally for certain sources, like stdin, + // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time + // to shutdown unless some input is given. + let server = async move { + debug!("Source starting."); - let sink = async move { - debug!("Sink starting."); + let mut result = select! { + biased; - // Why is this Arc>> needed you ask. - // In case when this function build_pieces errors - // this future won't be run so this rx won't be taken - // which will enable us to reuse rx to rebuild - // old configuration by passing this Arc>> - // yet again. - let rx = rx - .lock() - .unwrap() - .take() - .expect("Task started but input has been taken."); + // We've been told that we must forcefully shut down. + _ = force_shutdown_tripwire => Ok(()), - let mut rx = wrap(rx); + // The source pump encountered an error, which we're now bubbling up here to stop + // the source as well, since the source running makes no sense without the pump. + // + // We only match receiving a message, not the error of the sender being dropped, + // just to keep things simpler. + Ok(e) = &mut pump_error_rx => Err(e), - let events_received = register!(EventsReceived); - sink.run( - rx.by_ref() - .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) - .inspect(|events| { - events_received.emit(CountByteSize( - events.len(), - events.estimated_json_encoded_size_of(), - )) - }) - .take_until_if(tripwire), - ) - .await - .map(|_| { - debug!("Sink finished normally."); - TaskOutput::Sink(rx) - }) - .map_err(|_| { - debug!("Sink finished with an error."); - TaskError::Opaque - }) + // The source finished normally. + result = server => result.map_err(|_| TaskError::Opaque), + }; + + // Even though we already tried to receive any pump task error above, we may have exited + // on the source itself returning an error due to task scheduling, where the pump task + // encountered an error, sent it over the oneshot, but we were polling the source + // already and hit an error trying to send to the now-shutdown pump task. + // + // Since the error from the source is opaque at the moment (i.e. `()`), we try a final + // time to see if the pump task encountered an error, using _that_ instead if so, to + // propagate the true error that caused the source to have to stop. + if let Ok(e) = pump_error_rx.try_recv() { + result = Err(e); + } + + match result { + Ok(()) => { + debug!("Source finished normally."); + Ok(TaskOutput::Source) + } + Err(e) => { + debug!("Source finished with an error."); + Err(e) + } + } }; + let server = Task::new(key.clone(), typetag, server); - let task = Task::new(key.clone(), typetag, sink); + outputs.extend(controls); + tasks.insert(key.clone(), pump); + source_tasks.insert(key.clone(), server); + } - let component_key = key.clone(); - let healthcheck_task = async move { - if enable_healthcheck { - let duration = Duration::from_secs(10); - timeout(duration, healthcheck) - .map(|result| match result { - Ok(Ok(_)) => { - info!("Healthcheck passed."); - Ok(TaskOutput::Healthcheck) - } - Ok(Err(error)) => { - error!( - msg = "Healthcheck failed.", - %error, - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(error)) - } - Err(e) => { - error!( - msg = "Healthcheck timed out.", - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(Box::new(e))) - } - }) - .await - } else { - info!("Healthcheck disabled."); - Ok(TaskOutput::Healthcheck) - } - }; - - let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); - - inputs.insert(key.clone(), (tx, sink_inputs.clone())); - healthchecks.insert(key.clone(), healthcheck_task); - tasks.insert(key.clone(), task); - detach_triggers.insert(key.clone(), trigger); - } + source_tasks } async fn build_transforms( @@ -475,194 +497,172 @@ async fn build_transforms( } } -async fn build_sources( +async fn build_sinks( config: &crate::config::Config, diff: &ConfigDiff, - shutdown_coordinator: &mut SourceShutdownCoordinator, errors: &mut Vec, - outputs: &mut HashMap>, + mut buffers: HashMap, + inputs: &mut HashMap, Inputs)>, + healthchecks: &mut HashMap, tasks: &mut HashMap, -) -> HashMap { - let mut source_tasks = HashMap::new(); - - // Build sources - for (key, source) in config - .sources() - .filter(|(key, _)| diff.sources.contains_new(key)) + detach_triggers: &mut HashMap, +) { + // Build sinks + for (key, sink) in config + .sinks() + .filter(|(key, _)| diff.sinks.contains_new(key)) { - debug!(component = %key, "Building new source."); - - let typetag = source.inner.get_component_name(); - let source_outputs = source.inner.outputs(config.schema.log_namespace()); - - let span = error_span!( - "source", - component_kind = "source", - component_id = %key.id(), - component_type = %source.inner.get_component_name(), - // maintained for compatibility - component_name = %key.id(), - ); - let _entered_span = span.enter(); - - let task_name = format!( - ">> {} ({}, pump) >>", - source.inner.get_component_name(), - key.id() - ); - - let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); - let mut pumps = Vec::new(); - let mut controls = HashMap::new(); - let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); + debug!(component = %key, "Building new sink."); - for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); + let sink_inputs = &sink.inputs; + let healthcheck = sink.healthcheck(); + let enable_healthcheck = healthcheck.enabled && config.healthchecks.enabled; - let (mut fanout, control) = Fanout::new(); - let pump = async move { - debug!("Source pump starting."); + let typetag = sink.inner.get_component_name(); + let input_type = sink.inner.input().data_type(); - while let Some(array) = rx.next().await { - fanout.send(array).await.map_err(|e| { - debug!("Source pump finished with an error."); - TaskError::wrapped(e) - })?; - } + // At this point, we've validated that all transforms are valid, including any + // transform that mutates the schema provided by their sources. We can now validate the + // schema expectations of each individual sink. + if let Err(mut err) = schema::validate_sink_expectations(key, sink, config) { + errors.append(&mut err); + }; - debug!("Source pump finished normally."); - Ok(TaskOutput::Source) + let (tx, rx) = if let Some(buffer) = buffers.remove(key) { + buffer + } else { + let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { + BufferType::Memory { .. } => "memory", + BufferType::DiskV2 { .. } => "disk", }; - - pumps.push(pump.instrument(span.clone())); - controls.insert( - OutputId { - component: key.clone(), - port: output.port.clone(), - }, - control, + let buffer_span = error_span!( + "sink", + component_kind = "sink", + component_id = %key.id(), + component_type = typetag, + component_name = %key.id(), + buffer_type, ); - - let port = output.port.clone(); - if let Some(definition) = output.schema_definition(config.schema.enabled) { - schema_definitions.insert(port, definition); - } - } - - let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); - let pump = async move { - debug!("Source pump supervisor starting."); - - // Spawn all of the per-output pumps and then await their completion. - // - // If any of the pumps complete with an error, or panic/are cancelled, we return - // immediately. - let mut handles = FuturesUnordered::new(); - for pump in pumps { - handles.push(spawn_named(pump, task_name.as_ref())); - } - - let mut had_pump_error = false; - while let Some(output) = handles.try_next().await? { - if let Err(e) = output { - // Immediately send the error to the source's wrapper future, but ignore any - // errors during the send, since nested errors wouldn't make any sense here. - let _ = pump_error_tx.send(e); - had_pump_error = true; - break; + let buffer = sink + .buffer + .build(config.global.data_dir.clone(), key.to_string(), buffer_span) + .await; + match buffer { + Err(error) => { + errors.push(format!("Sink \"{}\": {}", key, error)); + continue; } + Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), } - - if had_pump_error { - debug!("Source pump supervisor task finished with an error."); - } else { - debug!("Source pump supervisor task finished normally."); - } - Ok(TaskOutput::Source) }; - let pump = Task::new(key.clone(), typetag, pump); - let pipeline = builder.build(); - - let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_source(key); - - let context = SourceContext { - key: key.clone(), + let cx = SinkContext { + healthcheck, globals: config.global.clone(), - shutdown: shutdown_signal, - out: pipeline, - proxy: ProxyConfig::merge_with_env(&config.global.proxy, &source.proxy), - acknowledgements: source.sink_acknowledgements, - schema_definitions, + proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), schema: config.schema, }; - let source = source.inner.build(context).await; - let server = match source { + + let (sink, healthcheck) = match sink.inner.build(cx).await { Err(error) => { - errors.push(format!("Source \"{}\": {}", key, error)); + errors.push(format!("Sink \"{}\": {}", key, error)); continue; } - Ok(server) => server, + Ok(built) => built, }; - // Build a wrapper future that drives the actual source future, but returns early if we've - // been signalled to forcefully shutdown, or if the source pump encounters an error. - // - // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully - // within the alloted time window. This can occur normally for certain sources, like stdin, - // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time - // to shutdown unless some input is given. - let server = async move { - debug!("Source starting."); + let (trigger, tripwire) = Tripwire::new(); - let mut result = select! { - biased; + let sink = async move { + debug!("Sink starting."); - // We've been told that we must forcefully shut down. - _ = force_shutdown_tripwire => Ok(()), + // Why is this Arc>> needed you ask. + // In case when this function build_pieces errors + // this future won't be run so this rx won't be taken + // which will enable us to reuse rx to rebuild + // old configuration by passing this Arc>> + // yet again. + let rx = rx + .lock() + .unwrap() + .take() + .expect("Task started but input has been taken."); - // The source pump encountered an error, which we're now bubbling up here to stop - // the source as well, since the source running makes no sense without the pump. - // - // We only match receiving a message, not the error of the sender being dropped, - // just to keep things simpler. - Ok(e) = &mut pump_error_rx => Err(e), + let mut rx = wrap(rx); - // The source finished normally. - result = server => result.map_err(|_| TaskError::Opaque), - }; + let events_received = register!(EventsReceived); + sink.run( + rx.by_ref() + .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) + .inspect(|events| { + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )) + }) + .take_until_if(tripwire), + ) + .await + .map(|_| { + debug!("Sink finished normally."); + TaskOutput::Sink(rx) + }) + .map_err(|_| { + debug!("Sink finished with an error."); + TaskError::Opaque + }) + }; - // Even though we already tried to receive any pump task error above, we may have exited - // on the source itself returning an error due to task scheduling, where the pump task - // encountered an error, sent it over the oneshot, but we were polling the source - // already and hit an error trying to send to the now-shutdown pump task. - // - // Since the error from the source is opaque at the moment (i.e. `()`), we try a final - // time to see if the pump task encountered an error, using _that_ instead if so, to - // propagate the true error that caused the source to have to stop. - if let Ok(e) = pump_error_rx.try_recv() { - result = Err(e); - } + let task = Task::new(key.clone(), typetag, sink); - match result { - Ok(()) => { - debug!("Source finished normally."); - Ok(TaskOutput::Source) - } - Err(e) => { - debug!("Source finished with an error."); - Err(e) - } + let component_key = key.clone(); + let healthcheck_task = async move { + if enable_healthcheck { + let duration = Duration::from_secs(10); + timeout(duration, healthcheck) + .map(|result| match result { + Ok(Ok(_)) => { + info!("Healthcheck passed."); + Ok(TaskOutput::Healthcheck) + } + Ok(Err(error)) => { + error!( + msg = "Healthcheck failed.", + %error, + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(error)) + } + Err(e) => { + error!( + msg = "Healthcheck timed out.", + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(Box::new(e))) + } + }) + .await + } else { + info!("Healthcheck disabled."); + Ok(TaskOutput::Healthcheck) } }; - let server = Task::new(key.clone(), typetag, server); - outputs.extend(controls); - tasks.insert(key.clone(), pump); - source_tasks.insert(key.clone(), server); - } + let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); - source_tasks + inputs.insert(key.clone(), (tx, sink_inputs.clone())); + healthchecks.insert(key.clone(), healthcheck_task); + tasks.insert(key.clone(), task); + detach_triggers.insert(key.clone(), trigger); + } } const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool { From bc0c9acfb707a52d82ccb197b12ab150b357f3c6 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 3 Apr 2023 15:30:36 +0100 Subject: [PATCH 3/5] Tidy up imports Signed-off-by: Stephen Wakely --- src/topology/builder.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 4523bec9ce842..404eb4d40e1a5 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -12,7 +12,7 @@ use once_cell::sync::Lazy; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, - sync::oneshot, + sync::{mpsc::UnboundedSender, oneshot}, time::{timeout, Duration}, }; use tracing::Instrument; @@ -219,11 +219,11 @@ pub async fn build_pieces( } async fn build_sources( - config: &crate::config::Config, + config: &super::Config, diff: &ConfigDiff, shutdown_coordinator: &mut SourceShutdownCoordinator, errors: &mut Vec, - outputs: &mut HashMap>, + outputs: &mut HashMap>, tasks: &mut HashMap, ) -> HashMap { let mut source_tasks = HashMap::new(); @@ -409,12 +409,12 @@ async fn build_sources( } async fn build_transforms( - config: &crate::config::Config, + config: &super::Config, diff: &ConfigDiff, enrichment_tables: &enrichment::TableRegistry, errors: &mut Vec, inputs: &mut HashMap, Inputs)>, - outputs: &mut HashMap>, + outputs: &mut HashMap>, tasks: &mut HashMap, ) { let mut definition_cache = HashMap::default(); @@ -498,7 +498,7 @@ async fn build_transforms( } async fn build_sinks( - config: &crate::config::Config, + config: &super::Config, diff: &ConfigDiff, errors: &mut Vec, mut buffers: HashMap, From 83e6cc06830ada9ebe7691dd4bfceeb9ca28ad92 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 3 Apr 2023 16:47:51 +0100 Subject: [PATCH 4/5] Allow clippy too many arguments Signed-off-by: Stephen Wakely --- src/topology/builder.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 404eb4d40e1a5..de85848d6ed16 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -497,6 +497,7 @@ async fn build_transforms( } } +#[allow(clippy::too_many_arguments)] async fn build_sinks( config: &super::Config, diff: &ConfigDiff, From 735cfd6c91c88d70800034732219660ae79e96aa Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 5 Apr 2023 12:50:26 +0100 Subject: [PATCH 5/5] Move build_* functions into a struct to reduce parameter numbers Signed-off-by: Stephen Wakely --- src/topology/builder.rs | 1054 +++++++++++++++++++-------------------- 1 file changed, 519 insertions(+), 535 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index de85848d6ed16..0984f7bb58292 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -72,600 +72,584 @@ static TRANSFORM_CONCURRENCY_LIMIT: Lazy = Lazy::new(|| { .unwrap_or_else(crate::num_threads) }); -pub(self) async fn load_enrichment_tables<'a>( +/// Builds only the new pieces, and doesn't check their topology. +pub async fn build_pieces( + config: &super::Config, + diff: &ConfigDiff, + buffers: HashMap, +) -> Result> { + Builder::new(config, diff, buffers).build().await +} + +struct Builder<'a> { config: &'a super::Config, diff: &'a ConfigDiff, -) -> (&'static enrichment::TableRegistry, Vec) { - let mut enrichment_tables = HashMap::new(); - - let mut errors = vec![]; - - // Build enrichment tables - 'tables: for (name, table) in config.enrichment_tables.iter() { - let table_name = name.to_string(); - if ENRICHMENT_TABLES.needs_reload(&table_name) { - let indexes = if !diff.enrichment_tables.is_added(name) { - // If this is an existing enrichment table, we need to store the indexes to reapply - // them again post load. - Some(ENRICHMENT_TABLES.index_fields(&table_name)) - } else { - None - }; + shutdown_coordinator: SourceShutdownCoordinator, + errors: Vec, + outputs: HashMap>, + tasks: HashMap, + buffers: HashMap, + inputs: HashMap, Inputs)>, + healthchecks: HashMap, + detach_triggers: HashMap, +} - let mut table = match table.inner.build(&config.global).await { - Ok(table) => table, - Err(error) => { - errors.push(format!("Enrichment Table \"{}\": {}", name, error)); - continue; - } - }; +impl<'a> Builder<'a> { + fn new( + config: &'a super::Config, + diff: &'a ConfigDiff, + buffers: HashMap, + ) -> Self { + Self { + config, + diff, + buffers, + shutdown_coordinator: SourceShutdownCoordinator::default(), + errors: vec![], + outputs: HashMap::new(), + tasks: HashMap::new(), + inputs: HashMap::new(), + healthchecks: HashMap::new(), + detach_triggers: HashMap::new(), + } + } + + /// Builds the new pieces of the topology found in `self.diff`. + async fn build(mut self) -> Result> { + let enrichment_tables = self.load_enrichment_tables().await; + let source_tasks = self.build_sources().await; + self.build_transforms(enrichment_tables).await; + self.build_sinks().await; + + // We should have all the data for the enrichment tables loaded now, so switch them over to + // readonly. + enrichment_tables.finish_load(); + + if self.errors.is_empty() { + Ok(Pieces { + inputs: self.inputs, + outputs: Self::finalize_outputs(self.outputs), + tasks: self.tasks, + source_tasks, + healthchecks: self.healthchecks, + shutdown_coordinator: self.shutdown_coordinator, + detach_triggers: self.detach_triggers, + }) + } else { + Err(self.errors) + } + } - if let Some(indexes) = indexes { - for (case, index) in indexes { - match table - .add_index(case, &index.iter().map(|s| s.as_ref()).collect::>()) - { - Ok(_) => (), - Err(error) => { - // If there is an error adding an index we do not want to use the reloaded - // data, the previously loaded data will still need to be used. - // Just report the error and continue. - error!(message = "Unable to add index to reloaded enrichment table.", + fn finalize_outputs( + outputs: HashMap>, + ) -> HashMap, UnboundedSender>> + { + let mut finalized_outputs = HashMap::new(); + for (id, output) in outputs { + let entry = finalized_outputs + .entry(id.component) + .or_insert_with(HashMap::new); + entry.insert(id.port, output); + } + + finalized_outputs + } + + /// Loads, or reloads the enrichment tables. + /// The tables are stored in the `ENRICHMENT_TABLES` global variable. + async fn load_enrichment_tables(&mut self) -> &'static enrichment::TableRegistry { + let mut enrichment_tables = HashMap::new(); + + // Build enrichment tables + 'tables: for (name, table) in self.config.enrichment_tables.iter() { + let table_name = name.to_string(); + if ENRICHMENT_TABLES.needs_reload(&table_name) { + let indexes = if !self.diff.enrichment_tables.is_added(name) { + // If this is an existing enrichment table, we need to store the indexes to reapply + // them again post load. + Some(ENRICHMENT_TABLES.index_fields(&table_name)) + } else { + None + }; + + let mut table = match table.inner.build(&self.config.global).await { + Ok(table) => table, + Err(error) => { + self.errors + .push(format!("Enrichment Table \"{}\": {}", name, error)); + continue; + } + }; + + if let Some(indexes) = indexes { + for (case, index) in indexes { + match table + .add_index(case, &index.iter().map(|s| s.as_ref()).collect::>()) + { + Ok(_) => (), + Err(error) => { + // If there is an error adding an index we do not want to use the reloaded + // data, the previously loaded data will still need to be used. + // Just report the error and continue. + error!(message = "Unable to add index to reloaded enrichment table.", table = ?name.to_string(), %error); - continue 'tables; + continue 'tables; + } } } } - } - enrichment_tables.insert(table_name, table); + enrichment_tables.insert(table_name, table); + } } - } - - ENRICHMENT_TABLES.load(enrichment_tables); - - (&ENRICHMENT_TABLES, errors) -} -pub struct Pieces { - pub(super) inputs: HashMap, Inputs)>, - pub(crate) outputs: HashMap, fanout::ControlChannel>>, - pub(super) tasks: HashMap, - pub(crate) source_tasks: HashMap, - pub(super) healthchecks: HashMap, - pub(crate) shutdown_coordinator: SourceShutdownCoordinator, - pub(crate) detach_triggers: HashMap, -} + ENRICHMENT_TABLES.load(enrichment_tables); -/// Builds only the new pieces, and doesn't check their topology. -pub async fn build_pieces( - config: &super::Config, - diff: &ConfigDiff, - buffers: HashMap, -) -> Result> { - let mut inputs = HashMap::new(); - let mut outputs = HashMap::new(); - let mut tasks = HashMap::new(); - let mut healthchecks = HashMap::new(); - let mut shutdown_coordinator = SourceShutdownCoordinator::default(); - let mut detach_triggers = HashMap::new(); - - let mut errors = vec![]; - - let (enrichment_tables, enrichment_errors) = load_enrichment_tables(config, diff).await; - errors.extend(enrichment_errors); - - let source_tasks = build_sources( - config, - diff, - &mut shutdown_coordinator, - &mut errors, - &mut outputs, - &mut tasks, - ) - .await; - - build_transforms( - config, - diff, - enrichment_tables, - &mut errors, - &mut inputs, - &mut outputs, - &mut tasks, - ) - .await; - - build_sinks( - config, - diff, - &mut errors, - buffers, - &mut inputs, - &mut healthchecks, - &mut tasks, - &mut detach_triggers, - ) - .await; - - // We should have all the data for the enrichment tables loaded now, so switch them over to - // readonly. - enrichment_tables.finish_load(); - - let mut finalized_outputs = HashMap::new(); - for (id, output) in outputs { - let entry = finalized_outputs - .entry(id.component) - .or_insert_with(HashMap::new); - entry.insert(id.port, output); + &ENRICHMENT_TABLES } - if errors.is_empty() { - let pieces = Pieces { - inputs, - outputs: finalized_outputs, - tasks, - source_tasks, - healthchecks, - shutdown_coordinator, - detach_triggers, - }; - - Ok(pieces) - } else { - Err(errors) - } -} + async fn build_sources(&mut self) -> HashMap { + let mut source_tasks = HashMap::new(); -async fn build_sources( - config: &super::Config, - diff: &ConfigDiff, - shutdown_coordinator: &mut SourceShutdownCoordinator, - errors: &mut Vec, - outputs: &mut HashMap>, - tasks: &mut HashMap, -) -> HashMap { - let mut source_tasks = HashMap::new(); - - // Build sources - for (key, source) in config - .sources() - .filter(|(key, _)| diff.sources.contains_new(key)) - { - debug!(component = %key, "Building new source."); - - let typetag = source.inner.get_component_name(); - let source_outputs = source.inner.outputs(config.schema.log_namespace()); - - let span = error_span!( - "source", - component_kind = "source", - component_id = %key.id(), - component_type = %source.inner.get_component_name(), - // maintained for compatibility - component_name = %key.id(), - ); - let _entered_span = span.enter(); - - let task_name = format!( - ">> {} ({}, pump) >>", - source.inner.get_component_name(), - key.id() - ); - - let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); - let mut pumps = Vec::new(); - let mut controls = HashMap::new(); - let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); - - for output in source_outputs.into_iter() { - let mut rx = builder.add_source_output(output.clone()); - - let (mut fanout, control) = Fanout::new(); - let pump = async move { - debug!("Source pump starting."); + for (key, source) in self + .config + .sources() + .filter(|(key, _)| self.diff.sources.contains_new(key)) + { + debug!(component = %key, "Building new source."); - while let Some(array) = rx.next().await { - fanout.send(array).await.map_err(|e| { - debug!("Source pump finished with an error."); - TaskError::wrapped(e) - })?; - } + let typetag = source.inner.get_component_name(); + let source_outputs = source.inner.outputs(self.config.schema.log_namespace()); - debug!("Source pump finished normally."); - Ok(TaskOutput::Source) - }; + let span = error_span!( + "source", + component_kind = "source", + component_id = %key.id(), + component_type = %source.inner.get_component_name(), + // maintained for compatibility + component_name = %key.id(), + ); + let _entered_span = span.enter(); - pumps.push(pump.instrument(span.clone())); - controls.insert( - OutputId { - component: key.clone(), - port: output.port.clone(), - }, - control, + let task_name = format!( + ">> {} ({}, pump) >>", + source.inner.get_component_name(), + key.id() ); - let port = output.port.clone(); - if let Some(definition) = output.schema_definition(config.schema.enabled) { - schema_definitions.insert(port, definition); - } - } + let mut builder = SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE); + let mut pumps = Vec::new(); + let mut controls = HashMap::new(); + let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); - let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); - let pump = async move { - debug!("Source pump supervisor starting."); + for output in source_outputs.into_iter() { + let mut rx = builder.add_source_output(output.clone()); - // Spawn all of the per-output pumps and then await their completion. - // - // If any of the pumps complete with an error, or panic/are cancelled, we return - // immediately. - let mut handles = FuturesUnordered::new(); - for pump in pumps { - handles.push(spawn_named(pump, task_name.as_ref())); - } + let (mut fanout, control) = Fanout::new(); + let pump = async move { + debug!("Source pump starting."); - let mut had_pump_error = false; - while let Some(output) = handles.try_next().await? { - if let Err(e) = output { - // Immediately send the error to the source's wrapper future, but ignore any - // errors during the send, since nested errors wouldn't make any sense here. - let _ = pump_error_tx.send(e); - had_pump_error = true; - break; - } - } + while let Some(array) = rx.next().await { + fanout.send(array).await.map_err(|e| { + debug!("Source pump finished with an error."); + TaskError::wrapped(e) + })?; + } - if had_pump_error { - debug!("Source pump supervisor task finished with an error."); - } else { - debug!("Source pump supervisor task finished normally."); - } - Ok(TaskOutput::Source) - }; - let pump = Task::new(key.clone(), typetag, pump); - - let pipeline = builder.build(); - - let (shutdown_signal, force_shutdown_tripwire) = shutdown_coordinator.register_source(key); - - let context = SourceContext { - key: key.clone(), - globals: config.global.clone(), - shutdown: shutdown_signal, - out: pipeline, - proxy: ProxyConfig::merge_with_env(&config.global.proxy, &source.proxy), - acknowledgements: source.sink_acknowledgements, - schema_definitions, - schema: config.schema, - }; - let source = source.inner.build(context).await; - let server = match source { - Err(error) => { - errors.push(format!("Source \"{}\": {}", key, error)); - continue; + debug!("Source pump finished normally."); + Ok(TaskOutput::Source) + }; + + pumps.push(pump.instrument(span.clone())); + controls.insert( + OutputId { + component: key.clone(), + port: output.port.clone(), + }, + control, + ); + + let port = output.port.clone(); + if let Some(definition) = output.schema_definition(self.config.schema.enabled) { + schema_definitions.insert(port, definition); + } } - Ok(server) => server, - }; - - // Build a wrapper future that drives the actual source future, but returns early if we've - // been signalled to forcefully shutdown, or if the source pump encounters an error. - // - // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully - // within the alloted time window. This can occur normally for certain sources, like stdin, - // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time - // to shutdown unless some input is given. - let server = async move { - debug!("Source starting."); - - let mut result = select! { - biased; - // We've been told that we must forcefully shut down. - _ = force_shutdown_tripwire => Ok(()), + let (pump_error_tx, mut pump_error_rx) = oneshot::channel(); + let pump = async move { + debug!("Source pump supervisor starting."); - // The source pump encountered an error, which we're now bubbling up here to stop - // the source as well, since the source running makes no sense without the pump. + // Spawn all of the per-output pumps and then await their completion. // - // We only match receiving a message, not the error of the sender being dropped, - // just to keep things simpler. - Ok(e) = &mut pump_error_rx => Err(e), + // If any of the pumps complete with an error, or panic/are cancelled, we return + // immediately. + let mut handles = FuturesUnordered::new(); + for pump in pumps { + handles.push(spawn_named(pump, task_name.as_ref())); + } - // The source finished normally. - result = server => result.map_err(|_| TaskError::Opaque), + let mut had_pump_error = false; + while let Some(output) = handles.try_next().await? { + if let Err(e) = output { + // Immediately send the error to the source's wrapper future, but ignore any + // errors during the send, since nested errors wouldn't make any sense here. + let _ = pump_error_tx.send(e); + had_pump_error = true; + break; + } + } + + if had_pump_error { + debug!("Source pump supervisor task finished with an error."); + } else { + debug!("Source pump supervisor task finished normally."); + } + Ok(TaskOutput::Source) + }; + let pump = Task::new(key.clone(), typetag, pump); + + let pipeline = builder.build(); + + let (shutdown_signal, force_shutdown_tripwire) = + self.shutdown_coordinator.register_source(key); + + let context = SourceContext { + key: key.clone(), + globals: self.config.global.clone(), + shutdown: shutdown_signal, + out: pipeline, + proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, &source.proxy), + acknowledgements: source.sink_acknowledgements, + schema_definitions, + schema: self.config.schema, + }; + let source = source.inner.build(context).await; + let server = match source { + Err(error) => { + self.errors.push(format!("Source \"{}\": {}", key, error)); + continue; + } + Ok(server) => server, }; - // Even though we already tried to receive any pump task error above, we may have exited - // on the source itself returning an error due to task scheduling, where the pump task - // encountered an error, sent it over the oneshot, but we were polling the source - // already and hit an error trying to send to the now-shutdown pump task. + // Build a wrapper future that drives the actual source future, but returns early if we've + // been signalled to forcefully shutdown, or if the source pump encounters an error. // - // Since the error from the source is opaque at the moment (i.e. `()`), we try a final - // time to see if the pump task encountered an error, using _that_ instead if so, to - // propagate the true error that caused the source to have to stop. - if let Ok(e) = pump_error_rx.try_recv() { - result = Err(e); - } - - match result { - Ok(()) => { - debug!("Source finished normally."); - Ok(TaskOutput::Source) + // The forceful shutdown will only resolve if the source itself doesn't shutdown gracefully + // within the alloted time window. This can occur normally for certain sources, like stdin, + // where the I/O is blocking (in a separate thread) and won't wake up to check if it's time + // to shutdown unless some input is given. + let server = async move { + debug!("Source starting."); + + let mut result = select! { + biased; + + // We've been told that we must forcefully shut down. + _ = force_shutdown_tripwire => Ok(()), + + // The source pump encountered an error, which we're now bubbling up here to stop + // the source as well, since the source running makes no sense without the pump. + // + // We only match receiving a message, not the error of the sender being dropped, + // just to keep things simpler. + Ok(e) = &mut pump_error_rx => Err(e), + + // The source finished normally. + result = server => result.map_err(|_| TaskError::Opaque), + }; + + // Even though we already tried to receive any pump task error above, we may have exited + // on the source itself returning an error due to task scheduling, where the pump task + // encountered an error, sent it over the oneshot, but we were polling the source + // already and hit an error trying to send to the now-shutdown pump task. + // + // Since the error from the source is opaque at the moment (i.e. `()`), we try a final + // time to see if the pump task encountered an error, using _that_ instead if so, to + // propagate the true error that caused the source to have to stop. + if let Ok(e) = pump_error_rx.try_recv() { + result = Err(e); } - Err(e) => { - debug!("Source finished with an error."); - Err(e) + + match result { + Ok(()) => { + debug!("Source finished normally."); + Ok(TaskOutput::Source) + } + Err(e) => { + debug!("Source finished with an error."); + Err(e) + } } - } - }; - let server = Task::new(key.clone(), typetag, server); + }; + let server = Task::new(key.clone(), typetag, server); - outputs.extend(controls); - tasks.insert(key.clone(), pump); - source_tasks.insert(key.clone(), server); + self.outputs.extend(controls); + self.tasks.insert(key.clone(), pump); + source_tasks.insert(key.clone(), server); + } + + source_tasks } - source_tasks -} + async fn build_transforms(&mut self, enrichment_tables: &enrichment::TableRegistry) { + let mut definition_cache = HashMap::default(); -async fn build_transforms( - config: &super::Config, - diff: &ConfigDiff, - enrichment_tables: &enrichment::TableRegistry, - errors: &mut Vec, - inputs: &mut HashMap, Inputs)>, - outputs: &mut HashMap>, - tasks: &mut HashMap, -) { - let mut definition_cache = HashMap::default(); - - // Build transforms - for (key, transform) in config - .transforms() - .filter(|(key, _)| diff.transforms.contains_new(key)) - { - debug!(component = %key, "Building new transform."); - - let input_definitions = - schema::input_definitions(&transform.inputs, config, &mut definition_cache); - - let merged_definition: Definition = input_definitions - .iter() - .map(|(_output_id, definition)| definition.clone()) - .reduce(Definition::merge) - // We may not have any definitions if all the inputs are from metrics sources. - .unwrap_or_else(Definition::any); - - let span = error_span!( - "transform", - component_kind = "transform", - component_id = %key.id(), - component_type = %transform.inner.get_component_name(), - // maintained for compatibility - component_name = %key.id(), - ); - - // Create a map of the outputs to the list of possible definitions from those outputs. - let schema_definitions = transform - .inner - .outputs(&input_definitions, config.schema.log_namespace()) - .into_iter() - .map(|output| (output.port, output.log_schema_definitions)) - .collect::>(); - - let context = TransformContext { - key: Some(key.clone()), - globals: config.global.clone(), - enrichment_tables: enrichment_tables.clone(), - schema_definitions, - merged_schema_definition: merged_definition.clone(), - schema: config.schema, - }; - - let node = TransformNode::from_parts( - key.clone(), - transform, - &input_definitions, - config.schema.log_namespace(), - ); - - let transform = match transform - .inner - .build(&context) - .instrument(span.clone()) - .await + for (key, transform) in self + .config + .transforms() + .filter(|(key, _)| self.diff.transforms.contains_new(key)) { - Err(error) => { - errors.push(format!("Transform \"{}\": {}", key, error)); - continue; - } - Ok(transform) => transform, - }; + debug!(component = %key, "Building new transform."); - let (input_tx, input_rx) = - TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block).await; + let input_definitions = + schema::input_definitions(&transform.inputs, self.config, &mut definition_cache); - inputs.insert(key.clone(), (input_tx, node.inputs.clone())); + let merged_definition: Definition = input_definitions + .iter() + .map(|(_output_id, definition)| definition.clone()) + .reduce(Definition::merge) + // We may not have any definitions if all the inputs are from metrics sources. + .unwrap_or_else(Definition::any); - let (transform_task, transform_outputs) = { - let _span = span.enter(); - build_transform(transform, node, input_rx) - }; + let span = error_span!( + "transform", + component_kind = "transform", + component_id = %key.id(), + component_type = %transform.inner.get_component_name(), + // maintained for compatibility + component_name = %key.id(), + ); + + // Create a map of the outputs to the list of possible definitions from those outputs. + let schema_definitions = transform + .inner + .outputs(&input_definitions, self.config.schema.log_namespace()) + .into_iter() + .map(|output| (output.port, output.log_schema_definitions)) + .collect::>(); + + let context = TransformContext { + key: Some(key.clone()), + globals: self.config.global.clone(), + enrichment_tables: enrichment_tables.clone(), + schema_definitions, + merged_schema_definition: merged_definition.clone(), + schema: self.config.schema, + }; + + let node = TransformNode::from_parts( + key.clone(), + transform, + &input_definitions, + self.config.schema.log_namespace(), + ); + + let transform = match transform + .inner + .build(&context) + .instrument(span.clone()) + .await + { + Err(error) => { + self.errors + .push(format!("Transform \"{}\": {}", key, error)); + continue; + } + Ok(transform) => transform, + }; + + let (input_tx, input_rx) = + TopologyBuilder::standalone_memory(TOPOLOGY_BUFFER_SIZE, WhenFull::Block).await; + + self.inputs + .insert(key.clone(), (input_tx, node.inputs.clone())); - outputs.extend(transform_outputs); - tasks.insert(key.clone(), transform_task); + let (transform_task, transform_outputs) = { + let _span = span.enter(); + build_transform(transform, node, input_rx) + }; + + self.outputs.extend(transform_outputs); + self.tasks.insert(key.clone(), transform_task); + } } -} -#[allow(clippy::too_many_arguments)] -async fn build_sinks( - config: &super::Config, - diff: &ConfigDiff, - errors: &mut Vec, - mut buffers: HashMap, - inputs: &mut HashMap, Inputs)>, - healthchecks: &mut HashMap, - tasks: &mut HashMap, - detach_triggers: &mut HashMap, -) { - // Build sinks - for (key, sink) in config - .sinks() - .filter(|(key, _)| diff.sinks.contains_new(key)) - { - debug!(component = %key, "Building new sink."); + async fn build_sinks(&mut self) { + for (key, sink) in self + .config + .sinks() + .filter(|(key, _)| self.diff.sinks.contains_new(key)) + { + debug!(component = %key, "Building new sink."); - let sink_inputs = &sink.inputs; - let healthcheck = sink.healthcheck(); - let enable_healthcheck = healthcheck.enabled && config.healthchecks.enabled; + let sink_inputs = &sink.inputs; + let healthcheck = sink.healthcheck(); + let enable_healthcheck = healthcheck.enabled && self.config.healthchecks.enabled; - let typetag = sink.inner.get_component_name(); - let input_type = sink.inner.input().data_type(); + let typetag = sink.inner.get_component_name(); + let input_type = sink.inner.input().data_type(); - // At this point, we've validated that all transforms are valid, including any - // transform that mutates the schema provided by their sources. We can now validate the - // schema expectations of each individual sink. - if let Err(mut err) = schema::validate_sink_expectations(key, sink, config) { - errors.append(&mut err); - }; + // At this point, we've validated that all transforms are valid, including any + // transform that mutates the schema provided by their sources. We can now validate the + // schema expectations of each individual sink. + if let Err(mut err) = schema::validate_sink_expectations(key, sink, self.config) { + self.errors.append(&mut err); + }; - let (tx, rx) = if let Some(buffer) = buffers.remove(key) { - buffer - } else { - let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { - BufferType::Memory { .. } => "memory", - BufferType::DiskV2 { .. } => "disk", + let (tx, rx) = if let Some(buffer) = self.buffers.remove(key) { + buffer + } else { + let buffer_type = match sink.buffer.stages().first().expect("cant ever be empty") { + BufferType::Memory { .. } => "memory", + BufferType::DiskV2 { .. } => "disk", + }; + let buffer_span = error_span!( + "sink", + component_kind = "sink", + component_id = %key.id(), + component_type = typetag, + component_name = %key.id(), + buffer_type, + ); + let buffer = sink + .buffer + .build( + self.config.global.data_dir.clone(), + key.to_string(), + buffer_span, + ) + .await; + match buffer { + Err(error) => { + self.errors.push(format!("Sink \"{}\": {}", key, error)); + continue; + } + Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), + } }; - let buffer_span = error_span!( - "sink", - component_kind = "sink", - component_id = %key.id(), - component_type = typetag, - component_name = %key.id(), - buffer_type, - ); - let buffer = sink - .buffer - .build(config.global.data_dir.clone(), key.to_string(), buffer_span) - .await; - match buffer { + + let cx = SinkContext { + healthcheck, + globals: self.config.global.clone(), + proxy: ProxyConfig::merge_with_env(&self.config.global.proxy, sink.proxy()), + schema: self.config.schema, + }; + + let (sink, healthcheck) = match sink.inner.build(cx).await { Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); + self.errors.push(format!("Sink \"{}\": {}", key, error)); continue; } - Ok((tx, rx)) => (tx, Arc::new(Mutex::new(Some(rx.into_stream())))), - } - }; - - let cx = SinkContext { - healthcheck, - globals: config.global.clone(), - proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), - schema: config.schema, - }; - - let (sink, healthcheck) = match sink.inner.build(cx).await { - Err(error) => { - errors.push(format!("Sink \"{}\": {}", key, error)); - continue; - } - Ok(built) => built, - }; - - let (trigger, tripwire) = Tripwire::new(); - - let sink = async move { - debug!("Sink starting."); - - // Why is this Arc>> needed you ask. - // In case when this function build_pieces errors - // this future won't be run so this rx won't be taken - // which will enable us to reuse rx to rebuild - // old configuration by passing this Arc>> - // yet again. - let rx = rx - .lock() - .unwrap() - .take() - .expect("Task started but input has been taken."); - - let mut rx = wrap(rx); - - let events_received = register!(EventsReceived); - sink.run( - rx.by_ref() - .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) - .inspect(|events| { - events_received.emit(CountByteSize( - events.len(), - events.estimated_json_encoded_size_of(), - )) - }) - .take_until_if(tripwire), - ) - .await - .map(|_| { - debug!("Sink finished normally."); - TaskOutput::Sink(rx) - }) - .map_err(|_| { - debug!("Sink finished with an error."); - TaskError::Opaque - }) - }; - - let task = Task::new(key.clone(), typetag, sink); - - let component_key = key.clone(); - let healthcheck_task = async move { - if enable_healthcheck { - let duration = Duration::from_secs(10); - timeout(duration, healthcheck) - .map(|result| match result { - Ok(Ok(_)) => { - info!("Healthcheck passed."); - Ok(TaskOutput::Healthcheck) - } - Ok(Err(error)) => { - error!( - msg = "Healthcheck failed.", - %error, - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(error)) - } - Err(e) => { - error!( - msg = "Healthcheck timed out.", - component_kind = "sink", - component_type = typetag, - component_id = %component_key.id(), - // maintained for compatibility - component_name = %component_key.id(), - ); - Err(TaskError::wrapped(Box::new(e))) - } - }) - .await - } else { - info!("Healthcheck disabled."); - Ok(TaskOutput::Healthcheck) - } - }; + Ok(built) => built, + }; + + let (trigger, tripwire) = Tripwire::new(); + + let sink = async move { + debug!("Sink starting."); + + // Why is this Arc>> needed you ask. + // In case when this function build_pieces errors + // this future won't be run so this rx won't be taken + // which will enable us to reuse rx to rebuild + // old configuration by passing this Arc>> + // yet again. + let rx = rx + .lock() + .unwrap() + .take() + .expect("Task started but input has been taken."); + + let mut rx = wrap(rx); + + let events_received = register!(EventsReceived); + sink.run( + rx.by_ref() + .filter(|events: &EventArray| ready(filter_events_type(events, input_type))) + .inspect(|events| { + events_received.emit(CountByteSize( + events.len(), + events.estimated_json_encoded_size_of(), + )) + }) + .take_until_if(tripwire), + ) + .await + .map(|_| { + debug!("Sink finished normally."); + TaskOutput::Sink(rx) + }) + .map_err(|_| { + debug!("Sink finished with an error."); + TaskError::Opaque + }) + }; + + let task = Task::new(key.clone(), typetag, sink); + + let component_key = key.clone(); + let healthcheck_task = async move { + if enable_healthcheck { + let duration = Duration::from_secs(10); + timeout(duration, healthcheck) + .map(|result| match result { + Ok(Ok(_)) => { + info!("Healthcheck passed."); + Ok(TaskOutput::Healthcheck) + } + Ok(Err(error)) => { + error!( + msg = "Healthcheck failed.", + %error, + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(error)) + } + Err(e) => { + error!( + msg = "Healthcheck timed out.", + component_kind = "sink", + component_type = typetag, + component_id = %component_key.id(), + // maintained for compatibility + component_name = %component_key.id(), + ); + Err(TaskError::wrapped(Box::new(e))) + } + }) + .await + } else { + info!("Healthcheck disabled."); + Ok(TaskOutput::Healthcheck) + } + }; - let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); + let healthcheck_task = Task::new(key.clone(), typetag, healthcheck_task); - inputs.insert(key.clone(), (tx, sink_inputs.clone())); - healthchecks.insert(key.clone(), healthcheck_task); - tasks.insert(key.clone(), task); - detach_triggers.insert(key.clone(), trigger); + self.inputs.insert(key.clone(), (tx, sink_inputs.clone())); + self.healthchecks.insert(key.clone(), healthcheck_task); + self.tasks.insert(key.clone(), task); + self.detach_triggers.insert(key.clone(), trigger); + } } } +pub struct Pieces { + pub(super) inputs: HashMap, Inputs)>, + pub(crate) outputs: HashMap, fanout::ControlChannel>>, + pub(super) tasks: HashMap, + pub(crate) source_tasks: HashMap, + pub(super) healthchecks: HashMap, + pub(crate) shutdown_coordinator: SourceShutdownCoordinator, + pub(crate) detach_triggers: HashMap, +} + const fn filter_events_type(events: &EventArray, data_type: DataType) -> bool { match events { EventArray::Logs(_) => data_type.contains(DataType::Log),