Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct ApplicationConfig {
}

pub struct Application {
pub require_healthy: Option<bool>,
pub root_opts: RootOpts,
pub config: ApplicationConfig,
pub signals: SignalPair,
}
Expand All @@ -73,6 +73,7 @@ impl ApplicationConfig {
&config_paths,
opts.watch_config,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
signal_handler,
)
Expand Down Expand Up @@ -211,7 +212,7 @@ impl Application {
Ok((
runtime,
Self {
require_healthy: opts.root.require_healthy,
root_opts: opts.root,
config,
signals,
},
Expand All @@ -227,7 +228,7 @@ impl Application {
handle.spawn(heartbeat::heartbeat());

let Self {
require_healthy,
root_opts,
config,
signals,
} = self;
Expand All @@ -237,7 +238,7 @@ impl Application {
api_server: config.setup_api(handle),
topology: config.topology,
config_paths: config.config_paths.clone(),
require_healthy,
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
});
Expand All @@ -248,6 +249,7 @@ impl Application {
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
topology_controller,
allow_empty_config: root_opts.allow_empty_config,
})
}
}
Expand All @@ -258,6 +260,7 @@ pub struct StartedApplication {
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
pub allow_empty_config: bool,
}

impl StartedApplication {
Expand All @@ -272,6 +275,7 @@ impl StartedApplication {
signals,
topology_controller,
internal_topologies,
allow_empty_config,
} = self;

let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
Expand All @@ -280,18 +284,20 @@ impl StartedApplication {
let mut signal_rx = signals.receiver;

let signal = loop {
let has_sources = !topology_controller.lock().await.topology.config.is_empty();
tokio::select! {
signal = signal_rx.recv() => if let Some(signal) = handle_signal(
signal,
&topology_controller,
&config_paths,
&mut signal_handler,
allow_empty_config,
).await {
break signal;
},
// Trigger graceful shutdown if a component crashed, or all sources have ended.
error = graceful_crash.next() => break SignalTo::Shutdown(error),
_ = TopologyController::sources_finished(topology_controller.clone()) => {
_ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
info!("All sources have finished.");
break SignalTo::Shutdown(None)
} ,
Expand All @@ -313,6 +319,7 @@ async fn handle_signal(
topology_controller: &SharedTopologyController,
config_paths: &[ConfigPath],
signal_handler: &mut SignalHandler,
allow_empty_config: bool,
) -> Option<SignalTo> {
match signal {
Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
Expand All @@ -335,6 +342,7 @@ async fn handle_signal(
let new_config = config::load_from_paths_with_provider_and_secrets(
&topology_controller.config_paths,
signal_handler,
allow_empty_config,
)
.await
.map_err(handle_config_errors)
Expand Down Expand Up @@ -479,6 +487,7 @@ pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
Expand All @@ -503,10 +512,13 @@ pub async fn load_configs(
#[cfg(not(feature = "enterprise-tests"))]
config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?;

let mut config =
config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler)
.await
.map_err(handle_config_errors)?;
let mut config = config::load_from_paths_with_provider_and_secrets(
&config_paths,
signal_handler,
allow_empty_config,
)
.await
.map_err(handle_config_errors)?;

config::init_telemetry(config.global.telemetry.clone(), true);

Expand Down
7 changes: 7 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ pub struct RootOpts {
/// default inherits the environment of the Vector process.
#[arg(long, env = "VECTOR_OPENSSL_NO_PROBE", default_value = "false")]
pub openssl_no_probe: bool,

/// Allow the configuration to run without any components. This is useful for loading in an
/// empty stub config that will later be replaced with actual components. Note that this is
/// likely not useful without also watching for config file changes as described in
/// `--watch-config`.
#[arg(long, env = "VECTOR_ALLOW_EMPTY_CONFIG", default_value = "false")]
pub allow_empty_config: bool,
}

impl RootOpts {
Expand Down
6 changes: 6 additions & 0 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ pub struct ConfigBuilder {
#[serde(default, skip)]
#[doc(hidden)]
pub graceful_shutdown_duration: Option<Duration>,

/// Allow the configuration to be empty, resulting in a topology with no components.
#[serde(default, skip)]
#[doc(hidden)]
pub allow_empty: bool,
}

#[cfg(feature = "enterprise")]
Expand Down Expand Up @@ -232,6 +237,7 @@ impl From<Config> for ConfigBuilder {
tests,
secret,
graceful_shutdown_duration,
allow_empty: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<
provider: _,
secret,
graceful_shutdown_duration,
allow_empty: _,
} = builder;

let graph = match Graph::new(&sources, &transforms, &sinks, schema) {
Expand Down
3 changes: 3 additions & 0 deletions src/config/loading/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String
pub async fn load_from_paths_with_provider_and_secrets(
config_paths: &[ConfigPath],
signal_handler: &mut signal::SignalHandler,
allow_empty: bool,
) -> Result<Config, Vec<String>> {
// Load secret backends first
let (mut secrets_backends_loader, secrets_warning) =
Expand All @@ -149,6 +150,8 @@ pub async fn load_from_paths_with_provider_and_secrets(
load_builder_from_paths(config_paths)?
};

builder.allow_empty = allow_empty;

validation::check_provider(&builder)?;
signal_handler.clear();

Expand Down
4 changes: 4 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ impl Config {
Default::default()
}

pub fn is_empty(&self) -> bool {
self.sources.is_empty()
}

pub fn sources(&self) -> impl Iterator<Item = (&ComponentKey, &SourceOuter)> {
self.sources.iter()
}
Expand Down
12 changes: 7 additions & 5 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result
pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
let mut errors = vec![];

if config.sources.is_empty() {
errors.push("No sources defined in the config.".to_owned());
}
if !config.allow_empty {
if config.sources.is_empty() {
errors.push("No sources defined in the config.".to_owned());
}

if config.sinks.is_empty() {
errors.push("No sinks defined in the config.".to_owned());
if config.sinks.is_empty() {
errors.push("No sinks defined in the config.".to_owned());
}
}

// Helper for below
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/cli.cue
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ cli: {
description: env_vars.VECTOR_OPENSSL_NO_PROBE.description
env_var: "VECTOR_OPENSSL_NO_PROBE"
}
"allow-empty-config": {
description: env_vars.VECTOR_ALLOW_EMPTY_CONFIG.description
env_var: "VECTOR_ALLOW_EMPTY_CONFIG"
}
}

_core_config_options: {
Expand Down Expand Up @@ -636,6 +640,12 @@ cli: {
"""
type: bool: default: false
}
VECTOR_ALLOW_EMPTY_CONFIG: {
description: """
Allow the configuration to run without any components. This is useful for loading in an empty stub config that will later be replaced with actual components. Note that this is likely not useful without also watching for config file changes as described in `--watch-config`.
"""
type: bool: default: false
}
}

// Helpers
Expand Down