Skip to content

Commit

Permalink
[next] refactor: update runtime manager (#711)
Browse files Browse the repository at this point in the history
* refactor: allow multiple runtimes to run at the same time

This is needed so that a new deployment can enter the loading state
without killing currently running runtimes.

* refactor: extra comment
  • Loading branch information
chesedo authored Mar 14, 2023
1 parent 9f73d61 commit add6a8e
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 153 deletions.
2 changes: 1 addition & 1 deletion deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl Built {
let runtime_client = runtime_manager
.lock()
.await
.get_runtime_client(legacy_runtime_path.clone())
.get_runtime_client(self.id, legacy_runtime_path.clone())
.await
.map_err(Error::Runtime)?;

Expand Down
248 changes: 96 additions & 152 deletions deployer/src/runtime_manager.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
use std::{convert::TryInto, path::PathBuf, sync::Arc};
use std::{collections::HashMap, convert::TryInto, path::PathBuf, sync::Arc};

use anyhow::Context;
use shuttle_proto::runtime::{
self, runtime_client::RuntimeClient, StopRequest, SubscribeLogsRequest,
};
use tokio::{process, sync::Mutex};
use tonic::transport::Channel;
use tracing::{debug, info, instrument, trace};
use tracing::{debug, info, trace};
use uuid::Uuid;

use crate::deployment::deploy_layer;

const MANIFEST_DIR: &str = env!("CARGO_MANIFEST_DIR");

/// Manager that can start up mutliple runtimes. This is needed so that two runtimes can be up when a new deployment is made:
/// One runtime for the new deployment being loaded; another for the currently active deployment
#[derive(Clone)]
pub struct RuntimeManager {
legacy: Option<RuntimeClient<Channel>>,
legacy_process: Option<Arc<std::sync::Mutex<process::Child>>>,
next: Option<RuntimeClient<Channel>>,
next_process: Option<Arc<std::sync::Mutex<process::Child>>>,
runtimes: Arc<std::sync::Mutex<HashMap<Uuid, (process::Child, RuntimeClient<Channel>)>>>,
artifacts_path: PathBuf,
provisioner_address: String,
log_sender: crossbeam_channel::Sender<deploy_layer::Log>,
Expand All @@ -31,10 +30,7 @@ impl RuntimeManager {
log_sender: crossbeam_channel::Sender<deploy_layer::Log>,
) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self {
legacy: None,
legacy_process: None,
next: None,
next_process: None,
runtimes: Arc::new(std::sync::Mutex::new(HashMap::new())),
artifacts_path,
provisioner_address,
log_sender,
Expand All @@ -43,157 +39,109 @@ impl RuntimeManager {

pub async fn get_runtime_client(
&mut self,
id: Uuid,
legacy_runtime_path: Option<PathBuf>,
) -> anyhow::Result<RuntimeClient<Channel>> {
if legacy_runtime_path.is_none() {
debug!("Getting shuttle-next runtime client");

Self::get_runtime_client_helper(
&mut self.next,
&mut self.next_process,
None,
self.artifacts_path.clone(),
&self.provisioner_address,
self.log_sender.clone(),
)
.await
} else {
debug!("Getting legacy runtime client");

Self::get_runtime_client_helper(
&mut self.legacy,
&mut self.legacy_process,
legacy_runtime_path,
self.artifacts_path.clone(),
&self.provisioner_address,
self.log_sender.clone(),
)
.await
}
}
trace!("making new client");

/// Send a kill / stop signal for a deployment to any runtimes currently running
pub async fn kill(&mut self, id: &Uuid) -> bool {
let success_legacy = if let Some(legacy_client) = &mut self.legacy {
trace!(%id, "sending stop signal to legacy for deployment");
let port = portpicker::pick_unused_port().context("failed to find available port")?;
let is_next = legacy_runtime_path.is_none();

let stop_request = tonic::Request::new(StopRequest {});
let response = legacy_client.stop(stop_request).await.unwrap();
let get_runtime_executable = || {
if let Some(legacy_runtime) = legacy_runtime_path {
debug!(
"Starting legacy runtime at: {}",
legacy_runtime
.clone()
.into_os_string()
.into_string()
.unwrap_or_default()
);
legacy_runtime
} else {
if cfg!(debug_assertions) {
debug!("Installing shuttle-next runtime in debug mode from local source");
// If we're running deployer natively, install shuttle-runtime using the
// version of runtime from the calling repo.
let path = std::fs::canonicalize(format!("{MANIFEST_DIR}/../runtime"));

// The path will not be valid if we are in a deployer container, in which
// case we don't try to install and use the one installed in deploy.sh.
if let Ok(path) = path {
std::process::Command::new("cargo")
.arg("install")
.arg("shuttle-runtime")
.arg("--path")
.arg(path)
.arg("--bin")
.arg("shuttle-next")
.arg("--features")
.arg("next")
.output()
.expect("failed to install the local version of shuttle-runtime");
}
}

response.into_inner().success
} else {
trace!("no legacy client running");
true
debug!("Returning path to shuttle-next runtime",);
// If we're in a deployer built with the containerfile, the runtime will have
// been installed in deploy.sh.
home::cargo_home()
.expect("failed to find path to cargo home")
.join("bin/shuttle-next")
}
};

let success_next = if let Some(next_client) = &mut self.next {
trace!(%id, "sending stop signal to next for deployment");
let (process, runtime_client) = runtime::start(
is_next,
runtime::StorageManagerType::Artifacts(self.artifacts_path.clone()),
&self.provisioner_address,
port,
get_runtime_executable,
)
.await
.context("failed to start shuttle runtime")?;

let sender = self.log_sender.clone();
let mut stream = runtime_client
.clone()
.subscribe_logs(tonic::Request::new(SubscribeLogsRequest {}))
.await
.context("subscribing to runtime logs stream")?
.into_inner();

let stop_request = tonic::Request::new(StopRequest {});
let response = next_client.stop(stop_request).await.unwrap();
tokio::spawn(async move {
while let Ok(Some(log)) = stream.message().await {
if let Ok(log) = log.try_into() {
sender.send(log).expect("to send log to persistence");
}
}
});

response.into_inner().success
} else {
trace!("no next client running");
true
};
self.runtimes
.lock()
.unwrap()
.insert(id, (process, runtime_client.clone()));

success_legacy && success_next
Ok(runtime_client)
}

#[instrument(skip(runtime_option, process_option, log_sender))]
async fn get_runtime_client_helper(
runtime_option: &mut Option<RuntimeClient<Channel>>,
process_option: &mut Option<Arc<std::sync::Mutex<process::Child>>>,
legacy_runtime_path: Option<PathBuf>,
artifacts_path: PathBuf,
provisioner_address: &str,
log_sender: crossbeam_channel::Sender<deploy_layer::Log>,
) -> anyhow::Result<RuntimeClient<Channel>> {
if let Some(runtime_client) = runtime_option {
trace!("returning previous client");
Ok(runtime_client.clone())
} else {
trace!("making new client");

let port = portpicker::pick_unused_port().context("failed to find available port")?;
let is_next = legacy_runtime_path.is_none();

let get_runtime_executable = || {
if let Some(legacy_runtime) = legacy_runtime_path {
debug!(
"Starting legacy runtime at: {}",
legacy_runtime
.clone()
.into_os_string()
.into_string()
.unwrap_or_default()
);
legacy_runtime
} else {
if cfg!(debug_assertions) {
debug!("Installing shuttle-next runtime in debug mode from local source");
// If we're running deployer natively, install shuttle-runtime using the
// version of runtime from the calling repo.
let path = std::fs::canonicalize(format!("{MANIFEST_DIR}/../runtime"));

// The path will not be valid if we are in a deployer container, in which
// case we don't try to install and use the one installed in deploy.sh.
if let Ok(path) = path {
std::process::Command::new("cargo")
.arg("install")
.arg("shuttle-runtime")
.arg("--path")
.arg(path)
.arg("--bin")
.arg("shuttle-next")
.arg("--features")
.arg("next")
.output()
.expect("failed to install the local version of shuttle-runtime");
}
}
/// Send a kill / stop signal for a deployment to its running runtime
pub async fn kill(&mut self, id: &Uuid) -> bool {
let value = self.runtimes.lock().unwrap().remove(id);

debug!("Returning path to shuttle-next runtime",);
// If we're in a deployer built with the containerfile, the runtime will have
// been installed in deploy.sh.
home::cargo_home()
.expect("failed to find path to cargo home")
.join("bin/shuttle-next")
}
};

let (process, runtime_client) = runtime::start(
is_next,
runtime::StorageManagerType::Artifacts(artifacts_path),
provisioner_address,
port,
get_runtime_executable,
)
.await
.context("failed to start shuttle runtime")?;

let sender = log_sender;
let mut stream = runtime_client
.clone()
.subscribe_logs(tonic::Request::new(SubscribeLogsRequest {}))
.await
.context("subscribing to runtime logs stream")?
.into_inner();

tokio::spawn(async move {
while let Ok(Some(log)) = stream.message().await {
if let Ok(log) = log.try_into() {
sender.send(log).expect("to send log to persistence");
}
}
});
if let Some((mut process, mut runtime_client)) = value {
trace!(%id, "sending stop signal for deployment");

let stop_request = tonic::Request::new(StopRequest {});
let response = runtime_client.stop(stop_request).await.unwrap();

*runtime_option = Some(runtime_client.clone());
*process_option = Some(Arc::new(std::sync::Mutex::new(process)));
let result = response.into_inner().success;
let _ = process.start_kill();

// Safe to unwrap as it was just set
Ok(runtime_client)
result
} else {
trace!("no client running");
true
}
}
}
Expand All @@ -202,12 +150,8 @@ impl Drop for RuntimeManager {
fn drop(&mut self) {
info!("runtime manager shutting down");

if let Some(ref process) = self.legacy_process.take() {
let _ = process.lock().unwrap().start_kill();
}

if let Some(ref process) = self.next_process.take() {
let _ = process.lock().unwrap().start_kill();
for (process, _runtime_client) in self.runtimes.lock().unwrap().values_mut() {
let _ = process.start_kill();
}
}
}

0 comments on commit add6a8e

Please sign in to comment.