Skip to content
92 changes: 67 additions & 25 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
#![allow(missing_docs)]
use std::{
collections::HashMap, num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration,
};
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};

use exitcode::ExitCode;
use futures::StreamExt;
#[cfg(feature = "enterprise")]
use futures_util::future::BoxFuture;
use once_cell::race::OnceNonZeroUsize;
use openssl::provider::Provider;
use tokio::{
runtime::{self, Runtime},
sync::mpsc,
};
use tokio::runtime::{self, Runtime};
use tokio_stream::wrappers::UnboundedReceiverStream;

#[cfg(feature = "enterprise")]
Expand All @@ -28,9 +23,10 @@ use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
config::{self, Config, ConfigPath},
heartbeat,
signal::{ShutdownError, SignalHandler, SignalPair, SignalRx, SignalTo},
signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
topology::{
self, ReloadOutcome, RunningTopology, SharedTopologyController, TopologyController,
ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
TopologyController,
},
trace,
};
Expand All @@ -50,8 +46,8 @@ use tokio::sync::broadcast::error::RecvError;
pub struct ApplicationConfig {
pub config_paths: Vec<config::ConfigPath>,
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub internal_topologies: Vec<RunningTopology>,
#[cfg(feature = "api")]
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -99,30 +95,33 @@ impl ApplicationConfig {
#[cfg(feature = "enterprise")]
let enterprise = build_enterprise(&mut config, config_paths.clone())?;

let diff = config::ConfigDiff::initial(&config);
let pieces = topology::build_or_log_errors(&config, &diff, HashMap::new())
.await
.ok_or(exitcode::CONFIG)?;

#[cfg(feature = "api")]
let api = config.api;

let result = topology::start_validated(config, diff, pieces).await;
let (topology, (graceful_crash_sender, graceful_crash_receiver)) =
result.ok_or(exitcode::CONFIG)?;
let (topology, graceful_crash_receiver) = RunningTopology::start_init_validated(config)
.await
.ok_or(exitcode::CONFIG)?;

Ok(Self {
config_paths,
topology,
graceful_crash_sender,
graceful_crash_receiver,
internal_topologies: Vec::new(),
#[cfg(feature = "api")]
api,
#[cfg(feature = "enterprise")]
enterprise,
})
}

pub async fn add_internal_config(&mut self, config: Config) -> Result<(), ExitCode> {
let Some((topology, _)) = RunningTopology::start_init_validated(config).await else {
return Err(exitcode::CONFIG);
};
self.internal_topologies.push(topology);
Ok(())
}

/// Configure the API server, if applicable
#[cfg(feature = "api")]
pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
Expand All @@ -145,8 +144,9 @@ impl ApplicationConfig {
let error = error.to_string();
error!("An error occurred that Vector couldn't handle: {}.", error);
_ = self
.graceful_crash_sender
.send(ShutdownError::ApiFailed { error });
.topology
.abort_tx
.send(crate::signal::ShutdownError::ApiFailed { error });
None
}
}
Expand All @@ -157,6 +157,30 @@ impl ApplicationConfig {
}
}

#[cfg(all(feature = "sources-internal_metrics", feature = "sinks-console"))]
fn internal_config() -> Config {
use crate::sinks::console;
let mut config = crate::config::ConfigBuilder::default();
config.add_source(
"_internal_metrics",
crate::sources::internal_metrics::InternalMetricsConfig::default(),
);
config.add_sink(
"_internal_console",
&["_internal_metrics"],
console::ConsoleSinkConfig {
target: console::Target::Stdout,
encoding: crate::codecs::EncodingConfigWithFraming::new(
None,
codecs::encoding::SerializerConfig::Text(Default::default()),
Default::default(),
),
acknowledgements: None.into(),
},
);
config.build().expect("Could not build internal config")
}

impl Application {
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));
Expand All @@ -165,8 +189,15 @@ impl Application {
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare()
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
#[allow(unused_mut)]
Self::prepare().and_then(|(runtime, mut app)| {
#[cfg(all(feature = "sources-internal_metrics", feature = "sinks-console"))]
{
let config = internal_config();
runtime.block_on(app.config.add_internal_config(config))?;
}
app.start(runtime.handle()).map(|app| (runtime, app))
})
}

pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
Expand Down Expand Up @@ -254,6 +285,7 @@ impl Application {

Ok(StartedApplication {
config_paths: config.config_paths,
internal_topologies: config.internal_topologies,
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
topology_controller,
Expand All @@ -264,7 +296,8 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub internal_topologies: Vec<RunningTopology>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
Expand All @@ -282,6 +315,7 @@ impl StartedApplication {
signals,
topology_controller,
openssl_providers,
internal_topologies,
} = self;

let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
Expand Down Expand Up @@ -314,6 +348,7 @@ impl StartedApplication {
signal_rx,
topology_controller,
openssl_providers,
internal_topologies,
}
}
}
Expand Down Expand Up @@ -369,6 +404,7 @@ pub struct FinishedApplication {
pub signal_rx: SignalRx,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
pub internal_topologies: Vec<RunningTopology>,
}

impl FinishedApplication {
Expand All @@ -378,6 +414,7 @@ impl FinishedApplication {
signal_rx,
topology_controller,
openssl_providers,
internal_topologies,
} = self;

// At this point, we'll have the only reference to the shared topology controller and can
Expand All @@ -392,6 +429,11 @@ impl FinishedApplication {
SignalTo::Quit => Self::quit(),
_ => unreachable!(),
};

for topology in internal_topologies {
topology.stop().await;
}

drop(openssl_providers);
status
}
Expand Down
14 changes: 4 additions & 10 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use vector_core::{event::Event, EstimatedJsonEncodedSizeOf};
use crate::{
codecs::Encoder,
components::validation::{RunnerMetrics, TestCase},
config::{ConfigBuilder, ConfigDiff},
topology,
config::ConfigBuilder,
topology::RunningTopology,
};

use super::{
Expand Down Expand Up @@ -471,7 +471,6 @@ fn spawn_component_topology(
.build()
.expect("config should not have any errors");
config.healthchecks.set_require_healthy(Some(true));
let config_diff = ConfigDiff::initial(&config);

_ = std::thread::spawn(move || {
let test_runtime = Builder::new_current_thread()
Expand All @@ -482,13 +481,8 @@ fn spawn_component_topology(
test_runtime.block_on(async move {
debug!("Building component topology...");

let pieces = topology::build_or_log_errors(&config, &config_diff, HashMap::new())
.await
.unwrap();
let (topology, (_, mut crash_rx)) =
topology::start_validated(config, config_diff, pieces)
.await
.unwrap();
let (topology, mut crash_rx) =
RunningTopology::start_init_validated(config).await.unwrap();

debug!("Component topology built and spawned.");
topology_started.mark_as_done();
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ mod tests {
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c2),
topology::builder::build_pieces(&c, &diff, HashMap::new()).await,
topology::TopologyPieces::build(&c, &diff, HashMap::new()).await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
(_, Err(errors)) => Err(errors),
Expand Down
11 changes: 4 additions & 7 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ use crate::{
},
event::{Event, LogEvent, Value},
signal,
topology::{
self,
builder::{self, Pieces},
},
topology::{builder::TopologyPieces, RunningTopology},
};

pub struct UnitTest {
pub name: String,
config: Config,
pieces: Pieces,
pieces: TopologyPieces,
test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
}

Expand All @@ -49,7 +46,7 @@ pub struct UnitTestResult {
impl UnitTest {
pub async fn run(self) -> UnitTestResult {
let diff = config::ConfigDiff::initial(&self.config);
let (topology, _) = topology::start_validated(self.config, diff, self.pieces)
let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
.await
.unwrap();
topology.sources_finished().await;
Expand Down Expand Up @@ -422,7 +419,7 @@ async fn build_unit_test(
}
let config = config_builder.build()?;
let diff = config::ConfigDiff::initial(&config);
let pieces = builder::build_pieces(&config, &diff, HashMap::new()).await?;
let pieces = TopologyPieces::build(&config, &diff, HashMap::new()).await?;

Ok(UnitTest {
name: test.name,
Expand Down
11 changes: 2 additions & 9 deletions src/sinks/datadog/traces/apm_stats/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ use tokio::time::{sleep, Duration};

use crate::{
config::ConfigBuilder,
signal::ShutdownError,
sinks::datadog::traces::{apm_stats::StatsPayload, DatadogTracesConfig},
sources::datadog_agent::DatadogAgentConfig,
test_util::{start_topology, trace_init},
topology::RunningTopology,
topology::{RunningTopology, ShutdownErrorReceiver},
};

/// The port on which the Agent will send traces to vector, and vector `datadog_agent` source will
Expand Down Expand Up @@ -320,13 +319,7 @@ fn validate_stats(agent_stats: &StatsPayload, vector_stats: &StatsPayload) {
/// This creates a scenario where the stats payload that is output by the sink after processing the
/// *second* batch of events (the second event) *should* contain the aggregated statistics of both
/// of the trace events. i.e , the hit count for that bucket should be equal to "2" , not "1".
async fn start_vector() -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
async fn start_vector() -> (RunningTopology, ShutdownErrorReceiver) {
let dd_agent_address = format!("0.0.0.0:{}", vector_receive_port());

let source_config = toml::from_str::<DatadogAgentConfig>(&format!(
Expand Down
21 changes: 4 additions & 17 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ use vector_core::event::{BatchNotifier, Event, EventArray, LogEvent};
use zstd::Decoder as ZstdDecoder;

use crate::{
config::{Config, ConfigDiff, GenerateConfig},
signal::ShutdownError,
topology::{self, RunningTopology},
config::{Config, GenerateConfig},
topology::{RunningTopology, ShutdownErrorReceiver},
trace,
};

Expand Down Expand Up @@ -681,21 +680,9 @@ impl CountReceiver<Event> {
pub async fn start_topology(
mut config: Config,
require_healthy: impl Into<Option<bool>>,
) -> (
RunningTopology,
(
tokio::sync::mpsc::UnboundedSender<ShutdownError>,
tokio::sync::mpsc::UnboundedReceiver<ShutdownError>,
),
) {
) -> (RunningTopology, ShutdownErrorReceiver) {
config.healthchecks.set_require_healthy(require_healthy);
let diff = ConfigDiff::initial(&config);
let pieces = topology::build_or_log_errors(&config, &diff, HashMap::new())
.await
.unwrap();
topology::start_validated(config, diff, pieces)
.await
.unwrap()
RunningTopology::start_init_validated(config).await.unwrap()
}

/// Collect the first `n` events from a stream while a future is spawned
Expand Down
Loading