diff --git a/src/app.rs b/src/app.rs index 12658a01115d3..8d2cefc80bf38 100644 --- a/src/app.rs +++ b/src/app.rs @@ -465,12 +465,14 @@ pub async fn load_configs( paths = ?config_paths.iter().map(<&PathBuf>::from).collect::>() ); + // config::init_log_schema should be called before initializing sources. + #[cfg(not(feature = "enterprise-tests"))] + config::init_log_schema(&config_paths, true).map_err(handle_config_errors)?; + let mut config = config::load_from_paths_with_provider_and_secrets(&config_paths, signal_handler) .await .map_err(handle_config_errors)?; - #[cfg(not(feature = "enterprise-tests"))] - config::init_log_schema(config.global.log_schema.clone(), true); config::init_telemetry(config.global.telemetry.clone(), true); diff --git a/src/config/mod.rs b/src/config/mod.rs index b4591199ef886..40d0733f15cb0 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -63,9 +63,19 @@ pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult}; pub use validation::warnings; pub use vars::{interpolate, ENVIRONMENT_VARIABLE_INTERPOLATION_REGEX}; pub use vector_core::config::{ - init_log_schema, init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId, + init_telemetry, log_schema, proxy::ProxyConfig, telemetry, LogSchema, OutputId, }; +/// Loads Log Schema from configurations and sets global schema. +/// Once this is done, configurations can be correctly loaded using +/// configured log schema defaults. +/// If deny is set, will panic if schema has already been set. +pub fn init_log_schema(config_paths: &[ConfigPath], deny_if_set: bool) -> Result<(), Vec> { + let (builder, _) = load_builder_from_paths(config_paths)?; + vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set); + Ok(()) +} + #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] pub enum ConfigPath { File(PathBuf, FormatHint), diff --git a/src/config/unit_test/mod.rs b/src/config/unit_test/mod.rs index 62b9dabcd70ad..3bfd52ad4b7ee 100644 --- a/src/config/unit_test/mod.rs +++ b/src/config/unit_test/mod.rs @@ -72,24 +72,11 @@ impl UnitTest { } } -/// Loads Log Schema from configurations and sets global schema. -/// Once this is done, configurations can be correctly loaded using -/// configured log schema defaults. -/// If deny is set, will panic if schema has already been set. -fn init_log_schema_from_paths( - config_paths: &[ConfigPath], - deny_if_set: bool, -) -> Result<(), Vec> { - let (builder, _) = config::loading::load_builder_from_paths(config_paths)?; - vector_core::config::init_log_schema(builder.global.log_schema, deny_if_set); - Ok(()) -} - pub async fn build_unit_tests_main( paths: &[ConfigPath], signal_handler: &mut signal::SignalHandler, ) -> Result, Vec> { - init_log_schema_from_paths(paths, false)?; + config::init_log_schema(paths, false)?; let (mut secrets_backends_loader, _) = loading::load_secret_backends_from_paths(paths)?; let (config_builder, _) = if secrets_backends_loader.has_secrets_to_retrieve() { let resolved_secrets = secrets_backends_loader diff --git a/src/validate.rs b/src/validate.rs index f11d41d1845f8..0803e11423a2a 100644 --- a/src/validate.rs +++ b/src/validate.rs @@ -135,10 +135,12 @@ pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option { fmt.title(format!("Failed to load {:?}", &paths_list)); fmt.sub_error(errors); }; + config::init_log_schema(&paths, true) + .map_err(&mut report_error) + .ok()?; let (builder, load_warnings) = config::load_builder_from_paths(&paths) .map_err(&mut report_error) .ok()?; - config::init_log_schema(builder.global.log_schema.clone(), true); // Build let (config, build_warnings) = builder diff --git a/tests/integration/shutdown.rs b/tests/integration/shutdown.rs index 1df88120bfb99..e4f0f75fa7739 100644 --- a/tests/integration/shutdown.rs +++ b/tests/integration/shutdown.rs @@ -1,4 +1,5 @@ use std::{ + fs::create_dir, fs::read_dir, io::Write, net::SocketAddr, @@ -200,6 +201,77 @@ fn log_schema() { assert_eq!(event["test_msg"], json!("42")); } +#[test] +fn log_schema_multiple_config_files() { + // Vector command + let mut cmd = Command::cargo_bin("vector").unwrap(); + + let config_dir = create_directory(); + + let sinks_config_dir = config_dir.join("sinks"); + create_dir(sinks_config_dir.clone()).unwrap(); + + let sources_config_dir = config_dir.join("sources"); + create_dir(sources_config_dir.clone()).unwrap(); + + let input_dir = create_directory(); + let input_file = input_dir.join("input_file"); + + overwrite_file( + config_dir.join("vector.toml"), + r#" + data_dir = "${VECTOR_DATA_DIR}" + log_schema.host_key = "test_host" + "#, + ); + + overwrite_file( + sources_config_dir.join("in_file.toml"), + r#" + type = "file" + include = ["${VECTOR_TEST_INPUT_FILE}"] + "#, + ); + + overwrite_file( + sinks_config_dir.join("out_console.toml"), + r#" + inputs = ["in_file"] + type = "console" + encoding.codec = "json" + "#, + ); + + overwrite_file( + input_file.clone(), + r#"42 + "#, + ); + + cmd.arg("--quiet") + .env("VECTOR_CONFIG_DIR", config_dir) + .env("VECTOR_DATA_DIR", create_directory()) + .env("VECTOR_TEST_INPUT_FILE", input_file.clone()); + + // Run vector + let vector = cmd.stdout(std::process::Stdio::piped()).spawn().unwrap(); + + // Give vector time to start. + sleep(STARTUP_TIME); + + // Signal shutdown + kill(Pid::from_raw(vector.id() as i32), Signal::SIGTERM).unwrap(); + + // Wait for shutdown + let output = vector.wait_with_output().unwrap(); + assert!(output.status.success(), "Vector didn't exit successfully."); + + // Output + let event: Value = serde_json::from_slice(output.stdout.as_slice()).unwrap(); + assert_eq!(event["message"], json!("42")); + assert_eq!(event["test_host"], json!("runner")); +} + #[test] fn configuration_path_recomputed() { // Directory with configuration files