Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use tokio::sync::broadcast::error::RecvError;

pub struct ApplicationConfig {
pub config_paths: Vec<config::ConfigPath>,
pub internal: RunningTopology,
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
Expand All @@ -65,6 +66,24 @@ pub struct Application {
pub openssl_providers: Option<Vec<Provider>>,
}

const INTERNAL_CONFIG: &str = r#"
sources:
_internal_metrics:
type: internal_metrics
sinks:
_internal_console:
type: console
inputs: [_internal_metrics]
encoding:
codec: text
"#;

pub struct RunningConfig {
pub topology: RunningTopology,
pub graceful_crash_sender: mpsc::UnboundedSender<ShutdownError>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
}

impl ApplicationConfig {
pub async fn from_opts(
opts: &RootOpts,
Expand Down Expand Up @@ -104,6 +123,19 @@ impl ApplicationConfig {
.await
.ok_or(exitcode::CONFIG)?;

// TODO: Cleanup duplication
let internal_config = config::load_from_str(INTERNAL_CONFIG, config::Format::Yaml)
.expect("Invalid internal config");
let internal_diff = config::ConfigDiff::initial(&internal_config);
let internal_pieces =
topology::build_or_log_errors(&internal_config, &internal_diff, HashMap::new())
.await
.ok_or(exitcode::CONFIG)?;
let (internal_topology, (_internal_crash_sender, _internal_crash_receiver)) =
topology::start_validated(internal_config, internal_diff, internal_pieces)
.await
.ok_or(exitcode::CONFIG)?;

#[cfg(feature = "api")]
let api = config.api;

Expand All @@ -113,6 +145,7 @@ impl ApplicationConfig {

Ok(Self {
config_paths,
internal: internal_topology,
topology,
graceful_crash_sender,
graceful_crash_receiver,
Expand Down Expand Up @@ -251,11 +284,21 @@ impl Application {
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
});
let internal_controller = SharedTopologyController::new(TopologyController {
#[cfg(feature = "api")]
api_server: None,
topology: config.internal,
config_paths: Vec::new(),
require_healthy: Some(true),
#[cfg(feature = "enterprise")]
enterprise_reporter: None,
});

Ok(StartedApplication {
config_paths: config.config_paths,
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
internal_controller,
topology_controller,
openssl_providers,
})
Expand All @@ -266,6 +309,7 @@ pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub graceful_crash_receiver: mpsc::UnboundedReceiver<ShutdownError>,
pub signals: SignalPair,
pub internal_controller: SharedTopologyController,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
}
Expand All @@ -280,6 +324,7 @@ impl StartedApplication {
config_paths,
graceful_crash_receiver,
signals,
internal_controller,
topology_controller,
openssl_providers,
} = self;
Expand Down Expand Up @@ -312,6 +357,7 @@ impl StartedApplication {
FinishedApplication {
signal,
signal_rx,
internal_controller,
topology_controller,
openssl_providers,
}
Expand Down Expand Up @@ -367,6 +413,7 @@ async fn handle_signal(
pub struct FinishedApplication {
pub signal: SignalTo,
pub signal_rx: SignalRx,
pub internal_controller: SharedTopologyController,
pub topology_controller: SharedTopologyController,
pub openssl_providers: Option<Vec<Provider>>,
}
Expand All @@ -376,6 +423,7 @@ impl FinishedApplication {
let FinishedApplication {
signal,
signal_rx,
internal_controller,
topology_controller,
openssl_providers,
} = self;
Expand All @@ -392,6 +440,13 @@ impl FinishedApplication {
SignalTo::Quit => Self::quit(),
_ => unreachable!(),
};

let internal_controller = internal_controller
.try_into_inner()
.expect("fail to unwrap internal controller")
.into_inner();
internal_controller.stop().await;

drop(openssl_providers);
status
}
Expand Down