From 5610013efac438252176a11bec79e274382d4063 Mon Sep 17 00:00:00 2001 From: Bruce Guenter Date: Thu, 19 Oct 2023 12:08:58 -0600 Subject: [PATCH] enhancement(topology): Add internal-only topology (DRAFT) --- src/app.rs | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/src/app.rs b/src/app.rs index 18ac0c1798571..dfaa22f2eeddc 100644 --- a/src/app.rs +++ b/src/app.rs @@ -49,6 +49,7 @@ use tokio::sync::broadcast::error::RecvError; pub struct ApplicationConfig { pub config_paths: Vec, + pub internal: RunningTopology, pub topology: RunningTopology, pub graceful_crash_sender: mpsc::UnboundedSender, pub graceful_crash_receiver: mpsc::UnboundedReceiver, @@ -65,6 +66,24 @@ pub struct Application { pub openssl_providers: Option>, } +const INTERNAL_CONFIG: &str = r#" +sources: + _internal_metrics: + type: internal_metrics +sinks: + _internal_console: + type: console + inputs: [_internal_metrics] + encoding: + codec: text +"#; + +pub struct RunningConfig { + pub topology: RunningTopology, + pub graceful_crash_sender: mpsc::UnboundedSender, + pub graceful_crash_receiver: mpsc::UnboundedReceiver, +} + impl ApplicationConfig { pub async fn from_opts( opts: &RootOpts, @@ -104,6 +123,19 @@ impl ApplicationConfig { .await .ok_or(exitcode::CONFIG)?; + // TODO: Cleanup duplication + let internal_config = config::load_from_str(INTERNAL_CONFIG, config::Format::Yaml) + .expect("Invalid internal config"); + let internal_diff = config::ConfigDiff::initial(&internal_config); + let internal_pieces = + topology::build_or_log_errors(&internal_config, &internal_diff, HashMap::new()) + .await + .ok_or(exitcode::CONFIG)?; + let (internal_topology, (_internal_crash_sender, _internal_crash_receiver)) = + topology::start_validated(internal_config, internal_diff, internal_pieces) + .await + .ok_or(exitcode::CONFIG)?; + #[cfg(feature = "api")] let api = config.api; @@ -113,6 +145,7 @@ impl ApplicationConfig { Ok(Self { config_paths, + internal: internal_topology, topology, graceful_crash_sender, graceful_crash_receiver, @@ -251,11 +284,21 @@ impl Application { #[cfg(feature = "enterprise")] enterprise_reporter: config.enterprise, }); + let internal_controller = SharedTopologyController::new(TopologyController { + #[cfg(feature = "api")] + api_server: None, + topology: config.internal, + config_paths: Vec::new(), + require_healthy: Some(true), + #[cfg(feature = "enterprise")] + enterprise_reporter: None, + }); Ok(StartedApplication { config_paths: config.config_paths, graceful_crash_receiver: config.graceful_crash_receiver, signals, + internal_controller, topology_controller, openssl_providers, }) @@ -266,6 +309,7 @@ pub struct StartedApplication { pub config_paths: Vec, pub graceful_crash_receiver: mpsc::UnboundedReceiver, pub signals: SignalPair, + pub internal_controller: SharedTopologyController, pub topology_controller: SharedTopologyController, pub openssl_providers: Option>, } @@ -280,6 +324,7 @@ impl StartedApplication { config_paths, graceful_crash_receiver, signals, + internal_controller, topology_controller, openssl_providers, } = self; @@ -312,6 +357,7 @@ impl StartedApplication { FinishedApplication { signal, signal_rx, + internal_controller, topology_controller, openssl_providers, } @@ -367,6 +413,7 @@ async fn handle_signal( pub struct FinishedApplication { pub signal: SignalTo, pub signal_rx: SignalRx, + pub internal_controller: SharedTopologyController, pub topology_controller: SharedTopologyController, pub openssl_providers: Option>, } @@ -376,6 +423,7 @@ impl FinishedApplication { let FinishedApplication { signal, signal_rx, + internal_controller, topology_controller, openssl_providers, } = self; @@ -392,6 +440,13 @@ impl FinishedApplication { SignalTo::Quit => Self::quit(), _ => unreachable!(), }; + + let internal_controller = internal_controller + .try_into_inner() + .expect("fail to unwrap internal controller") + .into_inner(); + internal_controller.stop().await; + drop(openssl_providers); status }