From 9b30aded3a8ad21dcb14aad065714fc1d0e93805 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Wed, 11 Jan 2023 10:08:21 +0200 Subject: [PATCH 01/13] feat: propagate next runtime --- deployer/src/deployment/deploy_layer.rs | 1 + deployer/src/deployment/queue.rs | 29 ++++++++++++++----------- deployer/src/deployment/run.rs | 3 +++ deployer/src/lib.rs | 1 + deployer/src/persistence/deployment.rs | 1 + deployer/src/persistence/mod.rs | 5 ++++- proto/Cargo.toml | 2 +- 7 files changed, 27 insertions(+), 15 deletions(-) diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index afe795a72..76146b521 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -833,6 +833,7 @@ mod tests { service_name: "run-test".to_string(), service_id: Uuid::new_v4(), tracing_context: Default::default(), + is_next: false, }) .await; diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index c3bcd1e52..21b354807 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -200,7 +200,7 @@ impl Queued { }); let project_path = project_path.canonicalize()?; - let so_path = build_deployment(self.id, &project_path, tx.clone()).await?; + let runtime = build_deployment(self.id, &project_path, tx.clone()).await?; if self.will_run_tests { info!( @@ -213,13 +213,14 @@ impl Queued { info!("Moving built library"); - store_lib(&storage_manager, so_path, &self.id).await?; + store_lib(&storage_manager, &runtime, &self.id).await?; let built = Built { id: self.id, service_name: self.service_name, service_id: self.service_id, tracing_context: Default::default(), + is_next: matches!(runtime, Runtime::Next(_)), }; Ok(built) @@ -310,15 +311,10 @@ async fn build_deployment( deployment_id: Uuid, project_path: &Path, tx: crossbeam_channel::Sender<Message>, -) -> Result<PathBuf> { - let runtime_path = build_crate(deployment_id, project_path, true, tx) +) -> Result<Runtime> { + build_crate(deployment_id, project_path, true, tx) .await - .map_err(|e| Error::Build(e.into()))?; - - match runtime_path { - Runtime::Legacy(so_path) => Ok(so_path), - Runtime::Next(_) => todo!(), - } + .map_err(|e| Error::Build(e.into())) } #[instrument(skip(project_path, tx))] @@ -381,12 +377,17 @@ async fn run_pre_deploy_tests( } /// Store 'so' file in the libs folder -#[instrument(skip(storage_manager, so_path, id))] +#[instrument(skip(storage_manager, runtime, id))] async fn store_lib( storage_manager: &ArtifactsStorageManager, - so_path: impl AsRef<Path>, + runtime: &Runtime, id: &Uuid, ) -> Result<()> { + let so_path = match runtime { + Runtime::Next(path) => path, + Runtime::Legacy(path) => path, + }; + let new_so_path = storage_manager.deployment_library_path(id)?; fs::rename(so_path, new_so_path).await?; @@ -399,6 +400,7 @@ mod tests { use std::{collections::BTreeMap, fs::File, io::Write, path::Path}; use shuttle_common::storage_manager::ArtifactsStorageManager; + use shuttle_service::loader::Runtime; use tempdir::TempDir; use tokio::fs; use uuid::Uuid; @@ -533,11 +535,12 @@ ff0e55bda1ff01000000000000000000e0079c01ff12a55500280000", let build_p = storage_manager.builds_path().unwrap(); let so_path = build_p.join("xyz.so"); + let runtime = Runtime::Legacy(so_path.clone()); let id = Uuid::new_v4(); fs::write(&so_path, "barfoo").await.unwrap(); - super::store_lib(&storage_manager, &so_path, &id) + super::store_lib(&storage_manager, &runtime, &id) .await .unwrap(); diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 7b14a4e7b..58cb0773b 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -169,6 +169,7 @@ pub struct Built { pub service_name: String, pub service_id: Uuid, pub tracing_context: HashMap<String, String>, + pub is_next: bool, } impl Built { @@ -494,6 +495,7 @@ mod tests { service_name: "test".to_string(), service_id: Uuid::new_v4(), tracing_context: Default::default(), + is_next: false, }; let (_kill_send, kill_recv) = broadcast::channel(1); @@ -554,6 +556,7 @@ mod tests { service_name: crate_name.to_string(), service_id: Uuid::new_v4(), tracing_context: Default::default(), + is_next: false, }, storage_manager, ) diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index cb9b3bf0d..bd130a08d 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -44,6 +44,7 @@ pub async fn start(persistence: Persistence, runtime_client: RuntimeClient<Chann service_name: existing_deployment.service_name, service_id: existing_deployment.service_id, tracing_context: Default::default(), + is_next: existing_deployment.is_next, }; deployment_manager.run_push(built).await; } diff --git a/deployer/src/persistence/deployment.rs b/deployer/src/persistence/deployment.rs index 03d210066..2a3e0adb7 100644 --- a/deployer/src/persistence/deployment.rs +++ b/deployer/src/persistence/deployment.rs @@ -64,4 +64,5 @@ pub struct DeploymentRunnable { pub id: Uuid, pub service_name: String, pub service_id: Uuid, + pub is_next: bool, } diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index bc9ce7054..e9762b885 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -265,7 +265,7 @@ impl Persistence { pub async fn get_all_runnable_deployments(&self) -> Result<Vec<DeploymentRunnable>> { sqlx::query_as( - r#"SELECT d.id, service_id, s.name AS service_name + r#"SELECT d.id, service_id, s.name AS service_name, d.is_next FROM deployments AS d JOIN services AS s ON s.id = d.service_id WHERE state = ? @@ -731,16 +731,19 @@ mod tests { id: id_1, service_name: "foo".to_string(), service_id: foo_id, + is_next: false, }, DeploymentRunnable { id: id_2, service_name: "bar".to_string(), service_id: bar_id, + is_next: true, }, DeploymentRunnable { id: id_3, service_name: "foo".to_string(), service_id: foo_id, + is_next: false, }, ] ); diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 1b7be9abb..e33f9cd77 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -18,7 +18,7 @@ uuid = { workspace = true, features = ["v4"] } [dependencies.shuttle-common] workspace = true -features = ["models"] +features = ["axum-wasm", "models"] [build-dependencies] tonic-build = "0.8.3" From 32c2b53f97396b0efb081df29e6638604f6790fc Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Wed, 11 Jan 2023 12:34:06 +0200 Subject: [PATCH 02/13] feat: store is_next in DB --- deployer/migrations/0001_next.sql | 1 + deployer/src/deployment/deploy_layer.rs | 80 +++++++------------------ deployer/src/deployment/mod.rs | 22 +++++-- deployer/src/deployment/queue.rs | 25 ++++++-- deployer/src/deployment/run.rs | 39 ++++++++++-- deployer/src/handlers/mod.rs | 1 + deployer/src/lib.rs | 1 + deployer/src/persistence/deployment.rs | 16 ++++- deployer/src/persistence/mod.rs | 73 ++++++++++++++++++---- 9 files changed, 173 insertions(+), 85 deletions(-) create mode 100644 deployer/migrations/0001_next.sql diff --git a/deployer/migrations/0001_next.sql b/deployer/migrations/0001_next.sql new file mode 100644 index 000000000..338e8e56b --- /dev/null +++ b/deployer/migrations/0001_next.sql @@ -0,0 +1 @@ +ALTER TABLE deployments ADD is_next BOOLEAN DEFAULT 0 NOT NULL; diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index 76146b521..f86da76f8 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -23,8 +23,8 @@ use chrono::{DateTime, Utc}; use serde_json::json; use shuttle_common::{tracing::JsonVisitor, STATE_MESSAGE}; use shuttle_proto::runtime; -use std::{net::SocketAddr, str::FromStr, time::SystemTime}; -use tracing::{error, field::Visit, span, warn, Metadata, Subscriber}; +use std::{str::FromStr, time::SystemTime}; +use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; @@ -63,8 +63,6 @@ pub struct Log { pub fields: serde_json::Value, pub r#type: LogType, - - pub address: Option<String>, } impl From<Log> for persistence::Log { @@ -106,23 +104,10 @@ impl From<Log> for shuttle_common::LogItem { impl From<Log> for DeploymentState { fn from(log: Log) -> Self { - let address = if let Some(address_str) = log.address { - match SocketAddr::from_str(&address_str) { - Ok(address) => Some(address), - Err(err) => { - error!(error = %err, "failed to convert to [SocketAddr]"); - None - } - } - } else { - None - }; - Self { id: log.id, state: log.state, last_update: log.timestamp, - address, } } } @@ -139,7 +124,6 @@ impl From<runtime::LogItem> for Log { target: log.target, fields: serde_json::from_slice(&log.fields).unwrap(), r#type: LogType::Event, - address: None, } } } @@ -230,7 +214,6 @@ where .unwrap_or_else(|| metadata.target().to_string()), fields: serde_json::Value::Object(visitor.fields), r#type: LogType::Event, - address: None, }); break; } @@ -274,7 +257,6 @@ where target: metadata.target().to_string(), fields: Default::default(), r#type: LogType::State, - address: details.address.clone(), }); extensions.insert::<ScopeDetails>(details); @@ -286,7 +268,6 @@ where struct ScopeDetails { id: Uuid, state: State, - address: Option<String>, } impl From<&tracing::Level> for LogLevel { @@ -314,9 +295,6 @@ impl NewStateVisitor { /// Field containing the deployment state identifier const STATE_IDENT: &'static str = "state"; - /// Field containing the deployment address identifier - const ADDRESS_IDENT: &'static str = "address"; - fn is_valid(metadata: &Metadata) -> bool { metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() @@ -330,8 +308,6 @@ impl Visit for NewStateVisitor { self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); - } else if field.name() == Self::ADDRESS_IDENT { - self.details.address = Some(format!("{value:?}")); } } } @@ -340,11 +316,13 @@ impl Visit for NewStateVisitor { mod tests { use std::{ fs::read_dir, + net::SocketAddr, path::PathBuf, sync::{Arc, Mutex}, time::Duration, }; + use crate::persistence::DeploymentUpdater; use axum::body::Bytes; use ctor::ctor; use flate2::{write::GzEncoder, Compression}; @@ -383,7 +361,6 @@ mod tests { struct StateLog { id: Uuid, state: State, - has_address: bool, } impl From<Log> for StateLog { @@ -391,7 +368,6 @@ mod tests { Self { id: log.id, state: log.state, - has_address: log.address.is_some(), } } } @@ -449,6 +425,22 @@ mod tests { } } + #[derive(Clone)] + struct StubDeploymentUpdater; + + #[async_trait::async_trait] + impl DeploymentUpdater for StubDeploymentUpdater { + type Err = std::io::Error; + + async fn set_address(&self, _id: &Uuid, _address: &SocketAddr) -> Result<(), Self::Err> { + Ok(()) + } + + async fn set_is_next(&self, _id: &Uuid, _is_next: bool) -> Result<(), Self::Err> { + Ok(()) + } + } + #[derive(Clone)] struct StubActiveDeploymentGetter; @@ -527,27 +519,22 @@ mod tests { StateLog { id, state: State::Queued, - has_address: false, }, StateLog { id, state: State::Building, - has_address: false, }, StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Running, - has_address: true, }, ] ); @@ -577,32 +564,26 @@ mod tests { StateLog { id, state: State::Queued, - has_address: false, }, StateLog { id, state: State::Building, - has_address: false, }, StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Running, - has_address: true, }, StateLog { id, state: State::Stopped, - has_address: false, }, ] ); @@ -639,32 +620,26 @@ mod tests { StateLog { id, state: State::Queued, - has_address: false, }, StateLog { id, state: State::Building, - has_address: false, }, StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Running, - has_address: true, }, StateLog { id, state: State::Completed, - has_address: false, }, ] ); @@ -712,32 +687,26 @@ mod tests { StateLog { id, state: State::Queued, - has_address: false, }, StateLog { id, state: State::Building, - has_address: false, }, StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Running, - has_address: true, }, StateLog { id, state: State::Crashed, - has_address: false, }, ] ); @@ -785,27 +754,22 @@ mod tests { StateLog { id, state: State::Queued, - has_address: false, }, StateLog { id, state: State::Building, - has_address: false, }, StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Crashed, - has_address: false, }, ] ); @@ -855,17 +819,14 @@ mod tests { StateLog { id, state: State::Built, - has_address: false, }, StateLog { id, state: State::Loading, - has_address: true, }, StateLog { id, state: State::Crashed, - has_address: false, }, ] ); @@ -907,6 +868,7 @@ mod tests { .artifacts_path(PathBuf::from("/tmp")) .secret_getter(StubSecretGetter) .runtime(get_runtime_client().await) + .deployment_updater(StubDeploymentUpdater) .queue_client(StubBuildQueueClient) .build() } diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index 0048573ae..571328ee0 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -13,7 +13,7 @@ use tonic::transport::Channel; use tracing::{instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::persistence::{SecretGetter, SecretRecorder, State}; +use crate::persistence::{DeploymentUpdater, SecretGetter, SecretRecorder, State}; use tokio::sync::{broadcast, mpsc}; use uuid::Uuid; @@ -23,21 +23,23 @@ const QUEUE_BUFFER_SIZE: usize = 100; const RUN_BUFFER_SIZE: usize = 100; const KILL_BUFFER_SIZE: usize = 10; -pub struct DeploymentManagerBuilder<LR, SR, ADG, SG, QC> { +pub struct DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { build_log_recorder: Option<LR>, secret_recorder: Option<SR>, active_deployment_getter: Option<ADG>, artifacts_path: Option<PathBuf>, runtime_client: Option<RuntimeClient<Channel>>, + deployment_updater: Option<DU>, secret_getter: Option<SG>, queue_client: Option<QC>, } -impl<LR, SR, ADG, SG, QC> DeploymentManagerBuilder<LR, SR, ADG, SG, QC> +impl<LR, SR, ADG, DU, SG, QC> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> where LR: LogRecorder, SR: SecretRecorder, ADG: ActiveDeploymentsGetter, + DU: DeploymentUpdater, SG: SecretGetter, QC: BuildQueueClient, { @@ -83,6 +85,12 @@ where self } + pub fn deployment_updater(mut self, deployment_updater: DU) -> Self { + self.deployment_updater = Some(deployment_updater); + + self + } + /// Creates two Tokio tasks, one for building queued services, the other for /// executing/deploying built services. Two multi-producer, single consumer /// channels are also created which are for moving on-going service @@ -98,6 +106,9 @@ where let artifacts_path = self.artifacts_path.expect("artifacts path to be set"); let queue_client = self.queue_client.expect("a queue client to be set"); let runtime_client = self.runtime_client.expect("a runtime client to be set"); + let deployment_updater = self + .deployment_updater + .expect("a deployment updater to be set"); let secret_getter = self.secret_getter.expect("a secret getter to be set"); let (queue_send, queue_recv) = mpsc::channel(QUEUE_BUFFER_SIZE); @@ -110,6 +121,7 @@ where tokio::spawn(queue::task( queue_recv, run_send_clone, + deployment_updater.clone(), build_log_recorder, secret_recorder, storage_manager.clone(), @@ -118,6 +130,7 @@ where tokio::spawn(run::task( run_recv, runtime_client, + deployment_updater, kill_send.clone(), active_deployment_getter, secret_getter, @@ -158,13 +171,14 @@ pub struct DeploymentManager { impl DeploymentManager { /// Create a new deployment manager. Manages one or more 'pipelines' for /// processing service building, loading, and deployment. - pub fn builder<LR, SR, ADG, SG, QC>() -> DeploymentManagerBuilder<LR, SR, ADG, SG, QC> { + pub fn builder<LR, SR, ADG, DU, SG, QC>() -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { DeploymentManagerBuilder { build_log_recorder: None, secret_recorder: None, active_deployment_getter: None, artifacts_path: None, runtime_client: None, + deployment_updater: None, secret_getter: None, queue_client: None, } diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 21b354807..9e871986c 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -2,7 +2,7 @@ use super::deploy_layer::{Log, LogRecorder, LogType}; use super::gateway_client::BuildQueueClient; use super::{Built, QueueReceiver, RunSender, State}; use crate::error::{Error, Result, TestError}; -use crate::persistence::{LogLevel, SecretRecorder}; +use crate::persistence::{DeploymentUpdater, LogLevel, SecretRecorder}; use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager}; use cargo::util::interning::InternedString; @@ -34,6 +34,7 @@ use tokio::fs; pub async fn task( mut recv: QueueReceiver, run_send: RunSender, + deployment_updater: impl DeploymentUpdater, log_recorder: impl LogRecorder, secret_recorder: impl SecretRecorder, storage_manager: ArtifactsStorageManager, @@ -46,6 +47,7 @@ pub async fn task( info!("Queued deployment at the front of the queue: {id}"); + let deployment_updater = deployment_updater.clone(); let run_send_cloned = run_send.clone(); let log_recorder = log_recorder.clone(); let secret_recorder = secret_recorder.clone(); @@ -71,7 +73,12 @@ pub async fn task( } match queued - .handle(storage_manager, log_recorder, secret_recorder) + .handle( + storage_manager, + deployment_updater, + log_recorder, + secret_recorder, + ) .await { Ok(built) => promote_to_run(built, run_send_cloned).await, @@ -144,10 +151,11 @@ pub struct Queued { } impl Queued { - #[instrument(skip(self, storage_manager, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] + #[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] async fn handle( self, storage_manager: ArtifactsStorageManager, + deployment_updater: impl DeploymentUpdater, log_recorder: impl LogRecorder, secret_recorder: impl SecretRecorder, ) -> Result<Built> { @@ -180,7 +188,6 @@ impl Queued { target: String::new(), fields: json!({ "build_line": line }), r#type: LogType::Event, - address: None, }, message => Log { id, @@ -192,7 +199,6 @@ impl Queued { target: String::new(), fields: serde_json::to_value(message).unwrap(), r#type: LogType::Event, - address: None, }, }; log_recorder.record(log); @@ -215,12 +221,19 @@ impl Queued { store_lib(&storage_manager, &runtime, &self.id).await?; + let is_next = matches!(runtime, Runtime::Next(_)); + + deployment_updater + .set_is_next(&id, is_next) + .await + .map_err(|e| Error::Build(Box::new(e)))?; + let built = Built { id: self.id, service_name: self.service_name, service_id: self.service_id, tracing_context: Default::default(), - is_next: matches!(runtime, Runtime::Next(_)), + is_next, }; Ok(built) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 58cb0773b..da91c660c 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -21,7 +21,7 @@ use uuid::Uuid; use super::{KillReceiver, KillSender, RunReceiver, State}; use crate::{ error::{Error, Result}, - persistence::SecretGetter, + persistence::{DeploymentUpdater, SecretGetter}, }; /// Run a task which takes runnable deploys from a channel and starts them up on our runtime @@ -29,6 +29,7 @@ use crate::{ pub async fn task( mut recv: RunReceiver, runtime_client: RuntimeClient<Channel>, + deployment_updater: impl DeploymentUpdater, kill_send: KillSender, active_deployment_getter: impl ActiveDeploymentsGetter, secret_getter: impl SecretGetter, @@ -41,6 +42,7 @@ pub async fn task( info!("Built deployment at the front of run queue: {id}"); + let deployment_updater = deployment_updater.clone(); let kill_send = kill_send.clone(); let kill_recv = kill_send.subscribe(); let secret_getter = secret_getter.clone(); @@ -86,6 +88,7 @@ pub async fn task( storage_manager, secret_getter, runtime_client, + deployment_updater, kill_recv, old_deployments_killer, cleanup, @@ -173,13 +176,14 @@ pub struct Built { } impl Built { - #[instrument(skip(self, storage_manager, secret_getter, runtime_client, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(skip(self, storage_manager, secret_getter, runtime_client, deployment_updater, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, storage_manager: ArtifactsStorageManager, secret_getter: impl SecretGetter, runtime_client: RuntimeClient<Channel>, + deployment_updater: impl DeploymentUpdater, kill_recv: KillReceiver, kill_old_deployments: impl futures::Future<Output = Result<()>>, cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>) @@ -216,6 +220,7 @@ impl Built { self.service_name, runtime_client, address, + deployment_updater, kill_recv, cleanup, )); @@ -257,17 +262,20 @@ async fn load( } } -#[instrument(skip(runtime_client, _kill_recv, _cleanup), fields(state = %State::Running))] +#[instrument(skip(runtime_client, deployment_updater, _kill_recv, _cleanup), fields(state = %State::Running))] async fn run( id: Uuid, service_name: String, mut runtime_client: RuntimeClient<Channel>, address: SocketAddr, + deployment_updater: impl DeploymentUpdater, _kill_recv: KillReceiver, _cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>) + Send + 'static, ) { + deployment_updater.set_address(&id, &address).await.unwrap(); + let start_request = tonic::Request::new(StartRequest { deployment_id: id.as_bytes().to_vec(), service_name, @@ -282,7 +290,7 @@ async fn run( #[cfg(test)] mod tests { - use std::{path::PathBuf, process::Command, time::Duration}; + use std::{net::SocketAddr, path::PathBuf, process::Command, time::Duration}; use async_trait::async_trait; use shuttle_common::storage_manager::ArtifactsStorageManager; @@ -298,7 +306,7 @@ mod tests { use crate::{ error::Error, - persistence::{Secret, SecretGetter}, + persistence::{DeploymentUpdater, Secret, SecretGetter}, }; use super::Built; @@ -338,6 +346,22 @@ mod tests { StubSecretGetter } + #[derive(Clone)] + struct StubDeploymentUpdater; + + #[async_trait] + impl DeploymentUpdater for StubDeploymentUpdater { + type Err = std::io::Error; + + async fn set_address(&self, _id: &Uuid, _address: &SocketAddr) -> Result<(), Self::Err> { + Ok(()) + } + + async fn set_is_next(&self, _id: &Uuid, _is_next: bool) -> Result<(), Self::Err> { + Ok(()) + } + } + // This test uses the kill signal to make sure a service does stop when asked to #[tokio::test] async fn can_be_killed() { @@ -364,6 +388,7 @@ mod tests { storage_manager, secret_getter, get_runtime_client().await, + StubDeploymentUpdater, kill_recv, kill_old_deployments(), handle_cleanup, @@ -409,6 +434,7 @@ mod tests { storage_manager, secret_getter, get_runtime_client().await, + StubDeploymentUpdater, kill_recv, kill_old_deployments(), handle_cleanup, @@ -448,6 +474,7 @@ mod tests { storage_manager, secret_getter, get_runtime_client().await, + StubDeploymentUpdater, kill_recv, kill_old_deployments(), handle_cleanup, @@ -475,6 +502,7 @@ mod tests { storage_manager, secret_getter, get_runtime_client().await, + StubDeploymentUpdater, kill_recv, kill_old_deployments(), handle_cleanup, @@ -508,6 +536,7 @@ mod tests { storage_manager, secret_getter, get_runtime_client().await, + StubDeploymentUpdater, kill_recv, kill_old_deployments(), handle_cleanup, diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 848dbff7f..f3def2080 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -224,6 +224,7 @@ async fn post_service( state: State::Queued, last_update: Utc::now(), address: None, + is_next: false, }; let mut data = Vec::new(); diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index bd130a08d..ad3d2111e 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -30,6 +30,7 @@ pub async fn start(persistence: Persistence, runtime_client: RuntimeClient<Chann .active_deployment_getter(persistence.clone()) .artifacts_path(args.artifacts_path) .runtime(runtime_client) + .deployment_updater(persistence.clone()) .secret_getter(persistence.clone()) .queue_client(GatewayClient::new(args.gateway_uri)) .build(); diff --git a/deployer/src/persistence/deployment.rs b/deployer/src/persistence/deployment.rs index 2a3e0adb7..b7e50f8b8 100644 --- a/deployer/src/persistence/deployment.rs +++ b/deployer/src/persistence/deployment.rs @@ -1,5 +1,6 @@ use std::{net::SocketAddr, str::FromStr}; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use sqlx::{sqlite::SqliteRow, FromRow, Row}; use tracing::error; @@ -14,6 +15,7 @@ pub struct Deployment { pub state: State, pub last_update: DateTime<Utc>, pub address: Option<SocketAddr>, + pub is_next: bool, } impl FromRow<'_, SqliteRow> for Deployment { @@ -36,6 +38,7 @@ impl FromRow<'_, SqliteRow> for Deployment { state: row.try_get("state")?, last_update: row.try_get("last_update")?, address, + is_next: row.try_get("is_next")?, }) } } @@ -51,12 +54,23 @@ impl From<Deployment> for shuttle_common::models::deployment::Response { } } +/// Update the details of a deployment +#[async_trait] +pub trait DeploymentUpdater: Clone + Send + Sync + 'static { + type Err: std::error::Error + Send; + + /// Set the address for a deployment + async fn set_address(&self, id: &Uuid, address: &SocketAddr) -> Result<(), Self::Err>; + + /// Set if a deployment is build on shuttle-next + async fn set_is_next(&self, id: &Uuid, is_next: bool) -> Result<(), Self::Err>; +} + #[derive(Debug, PartialEq, Eq)] pub struct DeploymentState { pub id: Uuid, pub state: State, pub last_update: DateTime<Utc>, - pub address: Option<SocketAddr>, } #[derive(sqlx::FromRow, Debug, PartialEq, Eq)] diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index e9762b885..96ab11349 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -27,7 +27,7 @@ use tracing::{error, info, instrument, trace}; use uuid::Uuid; use self::deployment::DeploymentRunnable; -pub use self::deployment::{Deployment, DeploymentState}; +pub use self::deployment::{Deployment, DeploymentState, DeploymentUpdater}; pub use self::error::Error as PersistenceError; pub use self::log::{Level as LogLevel, Log}; pub use self::resource::{Resource, ResourceRecorder, Type as ResourceType}; @@ -158,13 +158,14 @@ impl Persistence { let deployment = deployment.into(); sqlx::query( - "INSERT INTO deployments (id, service_id, state, last_update, address) VALUES (?, ?, ?, ?, ?)", + "INSERT INTO deployments (id, service_id, state, last_update, address, is_next) VALUES (?, ?, ?, ?, ?, ?)", ) .bind(deployment.id) .bind(deployment.service_id) .bind(deployment.state) .bind(deployment.last_update) .bind(deployment.address.map(|socket| socket.to_string())) + .bind(deployment.is_next) .execute(&self.pool) .await .map(|_| ()) @@ -304,12 +305,9 @@ impl Persistence { async fn update_deployment(pool: &SqlitePool, state: impl Into<DeploymentState>) -> Result<()> { let state = state.into(); - // TODO: Handle moving to 'active_deployments' table for State::Running. - - sqlx::query("UPDATE deployments SET state = ?, last_update = ?, address = ? WHERE id = ?") + sqlx::query("UPDATE deployments SET state = ?, last_update = ? WHERE id = ?") .bind(state.state) .bind(state.last_update) - .bind(state.address.map(|socket| socket.to_string())) .bind(state.id) .execute(pool) .await @@ -442,6 +440,31 @@ impl AddressGetter for Persistence { } } +#[async_trait::async_trait] +impl DeploymentUpdater for Persistence { + type Err = Error; + + async fn set_address(&self, id: &Uuid, address: &SocketAddr) -> Result<()> { + sqlx::query("UPDATE deployments SET address = ? WHERE id = ?") + .bind(address.to_string()) + .bind(id) + .execute(&self.pool) + .await + .map(|_| ()) + .map_err(Error::from) + } + + async fn set_is_next(&self, id: &Uuid, is_next: bool) -> Result<()> { + sqlx::query("UPDATE deployments SET is_next = ? WHERE id = ?") + .bind(is_next) + .bind(id) + .execute(&self.pool) + .await + .map(|_| ()) + .map_err(Error::from) + } +} + #[async_trait::async_trait] impl ActiveDeploymentsGetter for Persistence { type Err = Error; @@ -493,7 +516,9 @@ mod tests { state: State::Queued, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 43, 33).unwrap(), address: None, + is_next: false, }; + let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 12345); p.insert_deployment(deployment.clone()).await.unwrap(); assert_eq!(p.get_deployment(&id).await.unwrap().unwrap(), deployment); @@ -504,13 +529,18 @@ mod tests { id, state: State::Built, last_update: Utc::now(), - address: None, }, ) .await .unwrap(); + + p.set_address(&id, &address).await.unwrap(); + p.set_is_next(&id, true).await.unwrap(); + let update = p.get_deployment(&id).await.unwrap().unwrap(); assert_eq!(update.state, State::Built); + assert_eq!(update.address, Some(address)); + assert_eq!(update.is_next, true); assert_ne!( update.last_update, Utc.with_ymd_and_hms(2022, 4, 25, 4, 43, 33).unwrap() @@ -530,6 +560,7 @@ mod tests { state: State::Crashed, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 7, 29, 35).unwrap(), address: None, + is_next: false, }; let deployment_stopped = Deployment { id: Uuid::new_v4(), @@ -537,6 +568,7 @@ mod tests { state: State::Stopped, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 7, 49, 35).unwrap(), address: None, + is_next: false, }; let deployment_other = Deployment { id: Uuid::new_v4(), @@ -544,6 +576,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 7, 39, 39).unwrap(), address: None, + is_next: false, }; let deployment_running = Deployment { id: Uuid::new_v4(), @@ -551,6 +584,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 7, 48, 29).unwrap(), address: Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9876)), + is_next: true, }; for deployment in [ @@ -590,6 +624,7 @@ mod tests { state: State::Crashed, last_update: Utc::now(), address: None, + is_next: false, }; let deployment_stopped = Deployment { id: Uuid::new_v4(), @@ -597,6 +632,7 @@ mod tests { state: State::Stopped, last_update: Utc::now(), address: None, + is_next: false, }; let deployment_running = Deployment { id: Uuid::new_v4(), @@ -604,6 +640,7 @@ mod tests { state: State::Running, last_update: Utc::now(), address: Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9876)), + is_next: false, }; let deployment_queued = Deployment { id: queued_id, @@ -611,6 +648,7 @@ mod tests { state: State::Queued, last_update: Utc::now(), address: None, + is_next: false, }; let deployment_building = Deployment { id: building_id, @@ -618,6 +656,7 @@ mod tests { state: State::Building, last_update: Utc::now(), address: None, + is_next: false, }; let deployment_built = Deployment { id: built_id, @@ -625,6 +664,7 @@ mod tests { state: State::Built, last_update: Utc::now(), address: None, + is_next: true, }; let deployment_loading = Deployment { id: loading_id, @@ -632,6 +672,7 @@ mod tests { state: State::Loading, last_update: Utc::now(), address: None, + is_next: false, }; for deployment in [ @@ -690,6 +731,7 @@ mod tests { state: State::Built, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 29, 33).unwrap(), address: None, + is_next: false, }, Deployment { id: id_1, @@ -697,6 +739,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 29, 44).unwrap(), address: None, + is_next: false, }, Deployment { id: id_2, @@ -704,6 +747,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 33, 48).unwrap(), address: None, + is_next: true, }, Deployment { id: Uuid::new_v4(), @@ -711,6 +755,7 @@ mod tests { state: State::Crashed, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 38, 52).unwrap(), address: None, + is_next: true, }, Deployment { id: id_3, @@ -718,6 +763,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 42, 32).unwrap(), address: None, + is_next: false, }, ] { p.insert_deployment(deployment).await.unwrap(); @@ -762,6 +808,7 @@ mod tests { state: State::Running, last_update: Utc::now(), address: None, + is_next: true, }, Deployment { id: Uuid::new_v4(), @@ -769,6 +816,7 @@ mod tests { state: State::Running, last_update: Utc::now(), address: None, + is_next: false, }, ]; @@ -878,7 +926,6 @@ mod tests { target: "tests::log_recorder_event".to_string(), fields: json!({"message": "job queued"}), r#type: deploy_layer::LogType::Event, - address: None, }; p.record(event); @@ -913,6 +960,7 @@ mod tests { state: State::Queued, // Should be different from the state recorded below last_update: Utc.with_ymd_and_hms(2022, 4, 29, 2, 39, 39).unwrap(), address: None, + is_next: false, }) .await .unwrap(); @@ -926,7 +974,6 @@ mod tests { target: String::new(), fields: serde_json::Value::Null, r#type: deploy_layer::LogType::State, - address: Some("127.0.0.1:12345".to_string()), }; p.record(state); @@ -952,7 +999,8 @@ mod tests { service_id, state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 29, 2, 39, 59).unwrap(), - address: Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 12345)), + address: None, + is_next: false, } ); } @@ -1129,6 +1177,7 @@ mod tests { state: State::Built, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 29, 33).unwrap(), address: None, + is_next: false, }, Deployment { id: Uuid::new_v4(), @@ -1136,6 +1185,7 @@ mod tests { state: State::Stopped, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 29, 44).unwrap(), address: None, + is_next: false, }, Deployment { id: id_1, @@ -1143,6 +1193,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 33, 48).unwrap(), address: None, + is_next: false, }, Deployment { id: Uuid::new_v4(), @@ -1150,6 +1201,7 @@ mod tests { state: State::Crashed, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 38, 52).unwrap(), address: None, + is_next: false, }, Deployment { id: id_2, @@ -1157,6 +1209,7 @@ mod tests { state: State::Running, last_update: Utc.with_ymd_and_hms(2022, 4, 25, 4, 42, 32).unwrap(), address: None, + is_next: true, }, ] { p.insert_deployment(deployment).await.unwrap(); From c15d07dfa7c926b3dcd0828b3b689fb201bdc559 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Wed, 11 Jan 2023 16:28:15 +0200 Subject: [PATCH 03/13] feat: runtime manager to allow deployer to start up both runtimes --- cargo-shuttle/src/lib.rs | 1 + deployer/src/deployment/deploy_layer.rs | 17 ++-- deployer/src/deployment/mod.rs | 26 +++--- deployer/src/deployment/run.rs | 46 +++++----- deployer/src/lib.rs | 15 ++-- deployer/src/main.rs | 36 ++------ deployer/src/runtime_manager.rs | 109 ++++++++++++++++++++++++ proto/src/lib.rs | 12 ++- runtime/src/args.rs | 4 + runtime/src/main.rs | 2 +- 10 files changed, 190 insertions(+), 78 deletions(-) create mode 100644 deployer/src/runtime_manager.rs diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 56e4edea9..2ae96c5f5 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -416,6 +416,7 @@ impl Shuttle { is_wasm, runtime::StorageManagerType::WorkingDir(working_directory.to_path_buf()), &format!("http://localhost:{}", run_args.port + 1), + run_args.port + 2, ) .await .map_err(|err| { diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index f86da76f8..98ebab13e 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -322,13 +322,12 @@ mod tests { time::Duration, }; - use crate::persistence::DeploymentUpdater; + use crate::{persistence::DeploymentUpdater, RuntimeManager}; use axum::body::Bytes; use ctor::ctor; use flate2::{write::GzEncoder, Compression}; - use shuttle_proto::runtime::runtime_client::RuntimeClient; + use tempdir::TempDir; use tokio::{select, time::sleep}; - use tonic::transport::Channel; use tracing_subscriber::prelude::*; use uuid::Uuid; @@ -399,10 +398,12 @@ mod tests { } } - async fn get_runtime_client() -> RuntimeClient<Channel> { - RuntimeClient::connect("http://127.0.0.1:6001") - .await - .unwrap() + fn get_runtime_manager() -> Arc<tokio::sync::Mutex<RuntimeManager>> { + let tmp_dir = TempDir::new("shuttle_run_test").unwrap(); + let path = tmp_dir.into_path(); + let (tx, _rx) = crossbeam_channel::unbounded(); + + RuntimeManager::new(&[0u8; 8], path, "http://provisioner:8000".to_string(), tx) } #[async_trait::async_trait] @@ -867,7 +868,7 @@ mod tests { .active_deployment_getter(StubActiveDeploymentGetter) .artifacts_path(PathBuf::from("/tmp")) .secret_getter(StubSecretGetter) - .runtime(get_runtime_client().await) + .runtime(get_runtime_manager()) .deployment_updater(StubDeploymentUpdater) .queue_client(StubBuildQueueClient) .build() diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index 571328ee0..ce2e4e0d3 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -3,18 +3,19 @@ pub mod gateway_client; mod queue; mod run; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; pub use queue::Queued; pub use run::{ActiveDeploymentsGetter, Built}; use shuttle_common::storage_manager::ArtifactsStorageManager; -use shuttle_proto::runtime::runtime_client::RuntimeClient; -use tonic::transport::Channel; use tracing::{instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -use crate::persistence::{DeploymentUpdater, SecretGetter, SecretRecorder, State}; -use tokio::sync::{broadcast, mpsc}; +use crate::{ + persistence::{DeploymentUpdater, SecretGetter, SecretRecorder, State}, + RuntimeManager, +}; +use tokio::sync::{broadcast, mpsc, Mutex}; use uuid::Uuid; use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient}; @@ -28,7 +29,7 @@ pub struct DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { secret_recorder: Option<SR>, active_deployment_getter: Option<ADG>, artifacts_path: Option<PathBuf>, - runtime_client: Option<RuntimeClient<Channel>>, + runtime_manager: Option<Arc<Mutex<RuntimeManager>>>, deployment_updater: Option<DU>, secret_getter: Option<SG>, queue_client: Option<QC>, @@ -79,8 +80,8 @@ where self } - pub fn runtime(mut self, runtime_client: RuntimeClient<Channel>) -> Self { - self.runtime_client = Some(runtime_client); + pub fn runtime(mut self, runtime_manager: Arc<Mutex<RuntimeManager>>) -> Self { + self.runtime_manager = Some(runtime_manager); self } @@ -105,7 +106,7 @@ where .expect("an active deployment getter to be set"); let artifacts_path = self.artifacts_path.expect("artifacts path to be set"); let queue_client = self.queue_client.expect("a queue client to be set"); - let runtime_client = self.runtime_client.expect("a runtime client to be set"); + let runtime_manager = self.runtime_manager.expect("a runtime manager to be set"); let deployment_updater = self .deployment_updater .expect("a deployment updater to be set"); @@ -129,7 +130,7 @@ where )); tokio::spawn(run::task( run_recv, - runtime_client, + runtime_manager, deployment_updater, kill_send.clone(), active_deployment_getter, @@ -171,13 +172,14 @@ pub struct DeploymentManager { impl DeploymentManager { /// Create a new deployment manager. Manages one or more 'pipelines' for /// processing service building, loading, and deployment. - pub fn builder<LR, SR, ADG, DU, SG, QC>() -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { + pub fn builder<'a, LR, SR, ADG, DU, SG, QC>( + ) -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { DeploymentManagerBuilder { build_log_recorder: None, secret_recorder: None, active_deployment_getter: None, artifacts_path: None, - runtime_client: None, + runtime_manager: None, deployment_updater: None, secret_getter: None, queue_client: None, diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index da91c660c..21ce0b800 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -3,6 +3,7 @@ use std::{ net::{Ipv4Addr, SocketAddr}, path::PathBuf, str::FromStr, + sync::Arc, }; use async_trait::async_trait; @@ -12,7 +13,7 @@ use shuttle_common::project::ProjectName as ServiceName; use shuttle_common::storage_manager::ArtifactsStorageManager; use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest}; -use tokio::task::JoinError; +use tokio::{sync::Mutex, task::JoinError}; use tonic::transport::Channel; use tracing::{debug_span, error, info, instrument, trace, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -22,13 +23,14 @@ use super::{KillReceiver, KillSender, RunReceiver, State}; use crate::{ error::{Error, Result}, persistence::{DeploymentUpdater, SecretGetter}, + RuntimeManager, }; /// Run a task which takes runnable deploys from a channel and starts them up on our runtime /// A deploy is killed when it receives a signal from the kill channel pub async fn task( mut recv: RunReceiver, - runtime_client: RuntimeClient<Channel>, + runtime_manager: Arc<Mutex<RuntimeManager>>, deployment_updater: impl DeploymentUpdater, kill_send: KillSender, active_deployment_getter: impl ActiveDeploymentsGetter, @@ -73,7 +75,7 @@ pub async fn task( Err(err) if err.is_cancelled() => stopped_cleanup(&id), Err(err) => start_crashed_cleanup(&id, err), }; - let runtime_client = runtime_client.clone(); + let runtime_manager = runtime_manager.clone(); tokio::spawn(async move { let parent_cx = global::get_text_map_propagator(|propagator| { @@ -87,7 +89,7 @@ pub async fn task( .handle( storage_manager, secret_getter, - runtime_client, + runtime_manager, deployment_updater, kill_recv, old_deployments_killer, @@ -176,13 +178,13 @@ pub struct Built { } impl Built { - #[instrument(skip(self, storage_manager, secret_getter, runtime_client, deployment_updater, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(skip(self, storage_manager, secret_getter, runtime_manager, deployment_updater, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, storage_manager: ArtifactsStorageManager, secret_getter: impl SecretGetter, - runtime_client: RuntimeClient<Channel>, + runtime_manager: Arc<Mutex<RuntimeManager>>, deployment_updater: impl DeploymentUpdater, kill_recv: KillReceiver, kill_old_deployments: impl futures::Future<Output = Result<()>>, @@ -202,6 +204,11 @@ impl Built { }; let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); + let runtime_client = runtime_manager + .lock() + .await + .get_runtime_client(self.is_next) + .await; kill_old_deployments.await?; @@ -290,23 +297,22 @@ async fn run( #[cfg(test)] mod tests { - use std::{net::SocketAddr, path::PathBuf, process::Command, time::Duration}; + use std::{net::SocketAddr, path::PathBuf, process::Command, sync::Arc, time::Duration}; use async_trait::async_trait; use shuttle_common::storage_manager::ArtifactsStorageManager; - use shuttle_proto::runtime::runtime_client::RuntimeClient; use tempdir::TempDir; use tokio::{ - sync::{broadcast, oneshot}, + sync::{broadcast, oneshot, Mutex}, task::JoinError, time::sleep, }; - use tonic::transport::Channel; use uuid::Uuid; use crate::{ error::Error, persistence::{DeploymentUpdater, Secret, SecretGetter}, + RuntimeManager, }; use super::Built; @@ -324,10 +330,12 @@ mod tests { Ok(()) } - async fn get_runtime_client() -> RuntimeClient<Channel> { - RuntimeClient::connect("http://127.0.0.1:6001") - .await - .unwrap() + fn get_runtime_manager() -> Arc<Mutex<RuntimeManager>> { + let tmp_dir = TempDir::new("shuttle_run_test").unwrap(); + let path = tmp_dir.into_path(); + let (tx, _rx) = crossbeam_channel::unbounded(); + + RuntimeManager::new(&[0u8; 8], path, "http://provisioner:8000".to_string(), tx) } #[derive(Clone)] @@ -387,7 +395,7 @@ mod tests { .handle( storage_manager, secret_getter, - get_runtime_client().await, + get_runtime_manager(), StubDeploymentUpdater, kill_recv, kill_old_deployments(), @@ -433,7 +441,7 @@ mod tests { .handle( storage_manager, secret_getter, - get_runtime_client().await, + get_runtime_manager(), StubDeploymentUpdater, kill_recv, kill_old_deployments(), @@ -473,7 +481,7 @@ mod tests { .handle( storage_manager, secret_getter, - get_runtime_client().await, + get_runtime_manager(), StubDeploymentUpdater, kill_recv, kill_old_deployments(), @@ -501,7 +509,7 @@ mod tests { .handle( storage_manager, secret_getter, - get_runtime_client().await, + get_runtime_manager(), StubDeploymentUpdater, kill_recv, kill_old_deployments(), @@ -535,7 +543,7 @@ mod tests { .handle( storage_manager, secret_getter, - get_runtime_client().await, + get_runtime_manager(), StubDeploymentUpdater, kill_recv, kill_old_deployments(), diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index ad3d2111e..e5ae62450 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, net::SocketAddr}; +use std::{convert::Infallible, net::SocketAddr, sync::Arc}; pub use args::Args; pub use deployment::deploy_layer::DeployLayer; @@ -10,8 +10,8 @@ use hyper::{ }; pub use persistence::Persistence; use proxy::AddressGetter; -use shuttle_proto::runtime::runtime_client::RuntimeClient; -use tonic::transport::Channel; +pub use runtime_manager::RuntimeManager; +use tokio::sync::Mutex; use tracing::{error, info}; use crate::deployment::gateway_client::GatewayClient; @@ -22,14 +22,19 @@ mod error; mod handlers; mod persistence; mod proxy; +mod runtime_manager; -pub async fn start(persistence: Persistence, runtime_client: RuntimeClient<Channel>, args: Args) { +pub async fn start( + persistence: Persistence, + runtime_manager: Arc<Mutex<RuntimeManager>>, + args: Args, +) { let deployment_manager = DeploymentManager::builder() .build_log_recorder(persistence.clone()) .secret_recorder(persistence.clone()) .active_deployment_getter(persistence.clone()) .artifacts_path(args.artifacts_path) - .runtime(runtime_client) + .runtime(runtime_manager) .deployment_updater(persistence.clone()) .secret_getter(persistence.clone()) .queue_client(GatewayClient::new(args.gateway_uri)) diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 846f5c30c..74fa4bd3d 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -2,8 +2,7 @@ use std::process::exit; use clap::Parser; use opentelemetry::global; -use shuttle_deployer::{start, start_proxy, Args, DeployLayer, Persistence}; -use shuttle_proto::runtime::{self, SubscribeLogsRequest}; +use shuttle_deployer::{start, start_proxy, Args, DeployLayer, Persistence, RuntimeManager}; use tokio::select; use tracing::{error, trace}; use tracing_subscriber::prelude::*; @@ -40,41 +39,20 @@ async fn main() { .with(opentelemetry) .init(); - let (mut runtime, mut runtime_client) = runtime::start( + let runtime_manager = RuntimeManager::new( BINARY_BYTES, - false, - runtime::StorageManagerType::Artifacts(args.artifacts_path.clone()), - &args.provisioner_address.uri().to_string(), - ) - .await - .unwrap(); - - let sender = persistence.get_log_sender(); - let mut stream = runtime_client - .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) - .await - .unwrap() - .into_inner(); - - let logs_task = tokio::spawn(async move { - while let Some(log) = stream.message().await.unwrap() { - sender.send(log.into()).expect("to send log to persistence"); - } - }); + args.artifacts_path.clone(), + args.provisioner_address.uri().to_string(), + persistence.get_log_sender(), + ); select! { _ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => { error!("Proxy stopped.") }, - _ = start(persistence, runtime_client, args) => { + _ = start(persistence, runtime_manager, args) => { error!("Deployment service stopped.") }, - _ = runtime.wait() => { - error!("Legacy runtime stopped.") - }, - _ = logs_task => { - error!("Logs task stopped") - }, } exit(1); diff --git a/deployer/src/runtime_manager.rs b/deployer/src/runtime_manager.rs new file mode 100644 index 000000000..a21a773cd --- /dev/null +++ b/deployer/src/runtime_manager.rs @@ -0,0 +1,109 @@ +use std::{path::PathBuf, sync::Arc}; + +use shuttle_proto::runtime::{self, runtime_client::RuntimeClient, SubscribeLogsRequest}; +use tokio::sync::Mutex; +use tonic::transport::Channel; + +use crate::deployment::deploy_layer; + +#[derive(Clone)] +pub struct RuntimeManager { + legacy: Option<RuntimeClient<Channel>>, + next: Option<RuntimeClient<Channel>>, + binary_bytes: Vec<u8>, + artifacts_path: PathBuf, + provisioner_address: String, + log_sender: crossbeam_channel::Sender<deploy_layer::Log>, +} + +impl RuntimeManager { + pub fn new( + binary_bytes: &[u8], + artifacts_path: PathBuf, + provisioner_address: String, + log_sender: crossbeam_channel::Sender<deploy_layer::Log>, + ) -> Arc<Mutex<Self>> { + Arc::new(Mutex::new(Self { + legacy: None, + next: None, + binary_bytes: binary_bytes.to_vec(), + artifacts_path, + provisioner_address, + log_sender, + })) + } + + pub async fn get_runtime_client(&mut self, is_next: bool) -> RuntimeClient<Channel> { + if is_next { + self.get_next_runtime_client().await + } else { + self.get_legacy_runtime_client().await + } + } + + async fn get_legacy_runtime_client(&mut self) -> RuntimeClient<Channel> { + if let Some(ref runtime_client) = self.legacy { + runtime_client.clone() + } else { + let (_runtime, mut runtime_client) = runtime::start( + &self.binary_bytes, + false, + runtime::StorageManagerType::Artifacts(self.artifacts_path.clone()), + &self.provisioner_address, + 6001, + ) + .await + .unwrap(); + + let sender = self.log_sender.clone(); + let mut stream = runtime_client + .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) + .await + .unwrap() + .into_inner(); + + tokio::spawn(async move { + while let Some(log) = stream.message().await.unwrap() { + sender.send(log.into()).expect("to send log to persistence"); + } + }); + + self.legacy = Some(runtime_client.clone()); + + runtime_client + } + } + + async fn get_next_runtime_client(&mut self) -> RuntimeClient<Channel> { + if let Some(ref runtime_client) = self.next { + runtime_client.clone() + } else { + let (_runtime, mut runtime_client) = runtime::start( + &self.binary_bytes, + true, + runtime::StorageManagerType::Artifacts(self.artifacts_path.clone()), + &self.provisioner_address, + 6002, + ) + .await + .unwrap(); + + let sender = self.log_sender.clone(); + let mut stream = runtime_client + .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) + .await + .unwrap() + .into_inner(); + + tokio::spawn(async move { + while let Some(log) = stream.message().await.unwrap() { + sender.send(log.into()).expect("to send log to persistence"); + } + }); + + self.next = Some(runtime_client.clone()); + + runtime_client + } + } +} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index ae78a8487..248d8cba6 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -244,6 +244,7 @@ pub mod runtime { wasm: bool, storage_manager_type: StorageManagerType, provisioner_address: &str, + port: u16, ) -> anyhow::Result<(process::Child, runtime_client::RuntimeClient<Channel>)> { let runtime_flag = if wasm { "--axum" } else { "--legacy" }; @@ -252,11 +253,13 @@ pub mod runtime { StorageManagerType::WorkingDir(path) => ("working-dir", path), }; - let runtime_executable = get_runtime_executable(binary_bytes); + let runtime_executable = get_runtime_executable(binary_bytes, runtime_flag); let runtime = process::Command::new(runtime_executable) .args([ runtime_flag, + "--port", + &port.to_string(), "--provisioner-address", provisioner_address, "--storage-manager-type", @@ -272,7 +275,7 @@ pub mod runtime { tokio::time::sleep(Duration::from_secs(2)).await; info!("connecting runtime client"); - let conn = Endpoint::new("http://127.0.0.1:6001") + let conn = Endpoint::new(format!("http://127.0.0.1:{port}")) .context("creating runtime client endpoint")? .connect_timeout(Duration::from_secs(5)); @@ -283,10 +286,11 @@ pub mod runtime { Ok((runtime, runtime_client)) } - fn get_runtime_executable(binary_bytes: &[u8]) -> PathBuf { + fn get_runtime_executable(binary_bytes: &[u8], variant: &str) -> PathBuf { let tmp_dir = temp_dir(); - let path = tmp_dir.join("shuttle-runtime"); + // Give it a unique name based on the variant to allow both variants to start up at the same time + let path = tmp_dir.join(format!("shuttle-runtime{variant}")); let mut open_options = OpenOptions::new(); open_options.write(true).create(true).truncate(true); diff --git a/runtime/src/args.rs b/runtime/src/args.rs index 016121d4b..57e78eced 100644 --- a/runtime/src/args.rs +++ b/runtime/src/args.rs @@ -5,6 +5,10 @@ use tonic::transport::Endpoint; #[derive(Parser, Debug)] pub struct Args { + /// Port to start runtime on + #[arg(long)] + pub port: u16, + /// Address to reach provisioner at #[arg(long, default_value = "http://localhost:5000")] pub provisioner_address: Endpoint, diff --git a/runtime/src/main.rs b/runtime/src/main.rs index b7d9f3c3e..8a3a5f803 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -27,7 +27,7 @@ async fn main() { trace!(args = ?args, "parsed args"); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001); + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), args.port); let provisioner_address = args.provisioner_address; let mut server_builder = From 4d0701542f187c4ea6414a88c2dc4d9be066cb2c Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Thu, 12 Jan 2023 16:01:03 +0200 Subject: [PATCH 04/13] feat: make sure tests run --- codegen/src/next/mod.rs | 1 + tmp/axum-wasm/Cargo.toml | 4 ++++ tmp/axum-wasm/src/lib.rs | 24 ++++++++++++++++++++++++ 3 files changed, 29 insertions(+) diff --git a/codegen/src/next/mod.rs b/codegen/src/next/mod.rs index f91c19928..c74f8b43a 100644 --- a/codegen/src/next/mod.rs +++ b/codegen/src/next/mod.rs @@ -259,6 +259,7 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream { quote!( #app + #[cfg(not(test))] #[no_mangle] #[allow(non_snake_case)] pub extern "C" fn __SHUTTLE_Axum_call( diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index 2f5171163..9a4f79f08 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -27,3 +27,7 @@ version = "0.8.0" path = "../../common" features = ["axum-wasm"] version = "0.8.0" + +[dev-dependencies] +tokio = { version = "1.22.0", features = ["macros", "rt-multi-thread"] } +hyper = "0.14.23" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index b4620386a..5efa55846 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -13,3 +13,27 @@ shuttle_codegen::app! { "Goodbye, World!" } } + +#[cfg(test)] +mod tests { + use crate::__app; + use http::Request; + use hyper::Method; + + #[tokio::test] + async fn hello() { + let request = Request::builder() + .uri("http://local.test/hello") + .method(Method::GET) + .body(axum::body::boxed(axum::body::Body::empty())) + .unwrap(); + + let response = __app(request).await; + + assert!(response.status().is_success()); + + let body = &hyper::body::to_bytes(response.into_body()).await.unwrap(); + + assert_eq!(body, "Hello, World!"); + } +} From 675727f8df67b61bc18ef189c13b8f7fc70321f3 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Thu, 12 Jan 2023 16:19:19 +0200 Subject: [PATCH 05/13] refactor: better migration query --- deployer/migrations/0001_next.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployer/migrations/0001_next.sql b/deployer/migrations/0001_next.sql index 338e8e56b..42b89c217 100644 --- a/deployer/migrations/0001_next.sql +++ b/deployer/migrations/0001_next.sql @@ -1 +1 @@ -ALTER TABLE deployments ADD is_next BOOLEAN DEFAULT 0 NOT NULL; +ALTER TABLE deployments ADD COLUMN is_next BOOLEAN DEFAULT 0 NOT NULL; From 4bda4606e55a5caa0eb3b316533bd5ed49aba74f Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Thu, 12 Jan 2023 16:19:47 +0200 Subject: [PATCH 06/13] refactor: handle runtime errors better --- deployer/src/deployment/run.rs | 3 +- deployer/src/error.rs | 2 + deployer/src/runtime_manager.rs | 82 +++++++++++++++------------------ 3 files changed, 42 insertions(+), 45 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 21ce0b800..2e1a4c665 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -208,7 +208,8 @@ impl Built { .lock() .await .get_runtime_client(self.is_next) - .await; + .await + .map_err(|e| Error::Runtime(e))?; kill_old_deployments.await?; diff --git a/deployer/src/error.rs b/deployer/src/error.rs index 1adba5ae2..cdc9c1ed3 100644 --- a/deployer/src/error.rs +++ b/deployer/src/error.rs @@ -30,6 +30,8 @@ pub enum Error { OldCleanup(#[source] Box<dyn StdError + Send>), #[error("Gateway client error: {0}")] GatewayClient(#[from] gateway_client::Error), + #[error("Failed to get runtime: {0}")] + Runtime(#[source] anyhow::Error), } #[derive(Error, Debug)] diff --git a/deployer/src/runtime_manager.rs b/deployer/src/runtime_manager.rs index a21a773cd..5a8749df1 100644 --- a/deployer/src/runtime_manager.rs +++ b/deployer/src/runtime_manager.rs @@ -1,5 +1,6 @@ use std::{path::PathBuf, sync::Arc}; +use anyhow::Context; use shuttle_proto::runtime::{self, runtime_client::RuntimeClient, SubscribeLogsRequest}; use tokio::sync::Mutex; use tonic::transport::Channel; @@ -33,66 +34,59 @@ impl RuntimeManager { })) } - pub async fn get_runtime_client(&mut self, is_next: bool) -> RuntimeClient<Channel> { + pub async fn get_runtime_client( + &mut self, + is_next: bool, + ) -> anyhow::Result<RuntimeClient<Channel>> { if is_next { - self.get_next_runtime_client().await - } else { - self.get_legacy_runtime_client().await - } - } - - async fn get_legacy_runtime_client(&mut self) -> RuntimeClient<Channel> { - if let Some(ref runtime_client) = self.legacy { - runtime_client.clone() - } else { - let (_runtime, mut runtime_client) = runtime::start( + Self::get_runtime_client_helper( + &mut self.next, + 6002, &self.binary_bytes, - false, - runtime::StorageManagerType::Artifacts(self.artifacts_path.clone()), + self.artifacts_path.clone(), &self.provisioner_address, + self.log_sender.clone(), + ) + .await + } else { + Self::get_runtime_client_helper( + &mut self.legacy, 6001, + &self.binary_bytes, + self.artifacts_path.clone(), + &self.provisioner_address, + self.log_sender.clone(), ) .await - .unwrap(); - - let sender = self.log_sender.clone(); - let mut stream = runtime_client - .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) - .await - .unwrap() - .into_inner(); - - tokio::spawn(async move { - while let Some(log) = stream.message().await.unwrap() { - sender.send(log.into()).expect("to send log to persistence"); - } - }); - - self.legacy = Some(runtime_client.clone()); - - runtime_client } } - async fn get_next_runtime_client(&mut self) -> RuntimeClient<Channel> { - if let Some(ref runtime_client) = self.next { - runtime_client.clone() + async fn get_runtime_client_helper( + runtime_option: &mut Option<RuntimeClient<Channel>>, + port: u16, + binary_bytes: &[u8], + artifacts_path: PathBuf, + provisioner_address: &str, + log_sender: crossbeam_channel::Sender<deploy_layer::Log>, + ) -> anyhow::Result<RuntimeClient<Channel>> { + if let Some(ref runtime_client) = runtime_option { + Ok(runtime_client.clone()) } else { let (_runtime, mut runtime_client) = runtime::start( - &self.binary_bytes, + binary_bytes, true, - runtime::StorageManagerType::Artifacts(self.artifacts_path.clone()), - &self.provisioner_address, - 6002, + runtime::StorageManagerType::Artifacts(artifacts_path), + provisioner_address, + port, ) .await - .unwrap(); + .context("failed to start shuttle runtime")?; - let sender = self.log_sender.clone(); + let sender = log_sender; let mut stream = runtime_client .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) .await - .unwrap() + .context("subscribing to runtime logs stream")? .into_inner(); tokio::spawn(async move { @@ -101,9 +95,9 @@ impl RuntimeManager { } }); - self.next = Some(runtime_client.clone()); + *runtime_option = Some(runtime_client.clone()); - runtime_client + Ok(runtime_client) } } } From 2db8cac3131b2727029792b2bd6ee8585a9dc9f7 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Fri, 13 Jan 2023 08:16:56 +0200 Subject: [PATCH 07/13] feat: shutdown runtimes --- deployer/src/runtime_manager.rs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/deployer/src/runtime_manager.rs b/deployer/src/runtime_manager.rs index 5a8749df1..cfc8feb4f 100644 --- a/deployer/src/runtime_manager.rs +++ b/deployer/src/runtime_manager.rs @@ -2,7 +2,7 @@ use std::{path::PathBuf, sync::Arc}; use anyhow::Context; use shuttle_proto::runtime::{self, runtime_client::RuntimeClient, SubscribeLogsRequest}; -use tokio::sync::Mutex; +use tokio::{process, sync::Mutex}; use tonic::transport::Channel; use crate::deployment::deploy_layer; @@ -10,7 +10,9 @@ use crate::deployment::deploy_layer; #[derive(Clone)] pub struct RuntimeManager { legacy: Option<RuntimeClient<Channel>>, + legacy_process: Option<Arc<std::sync::Mutex<process::Child>>>, next: Option<RuntimeClient<Channel>>, + next_process: Option<Arc<std::sync::Mutex<process::Child>>>, binary_bytes: Vec<u8>, artifacts_path: PathBuf, provisioner_address: String, @@ -26,7 +28,9 @@ impl RuntimeManager { ) -> Arc<Mutex<Self>> { Arc::new(Mutex::new(Self { legacy: None, + legacy_process: None, next: None, + next_process: None, binary_bytes: binary_bytes.to_vec(), artifacts_path, provisioner_address, @@ -41,6 +45,7 @@ impl RuntimeManager { if is_next { Self::get_runtime_client_helper( &mut self.next, + &mut self.next_process, 6002, &self.binary_bytes, self.artifacts_path.clone(), @@ -51,6 +56,7 @@ impl RuntimeManager { } else { Self::get_runtime_client_helper( &mut self.legacy, + &mut self.legacy_process, 6001, &self.binary_bytes, self.artifacts_path.clone(), @@ -63,6 +69,7 @@ impl RuntimeManager { async fn get_runtime_client_helper( runtime_option: &mut Option<RuntimeClient<Channel>>, + process_option: &mut Option<Arc<std::sync::Mutex<process::Child>>>, port: u16, binary_bytes: &[u8], artifacts_path: PathBuf, @@ -72,7 +79,7 @@ impl RuntimeManager { if let Some(ref runtime_client) = runtime_option { Ok(runtime_client.clone()) } else { - let (_runtime, mut runtime_client) = runtime::start( + let (process, mut runtime_client) = runtime::start( binary_bytes, true, runtime::StorageManagerType::Artifacts(artifacts_path), @@ -96,8 +103,21 @@ impl RuntimeManager { }); *runtime_option = Some(runtime_client.clone()); + *process_option = Some(Arc::new(std::sync::Mutex::new(process))); Ok(runtime_client) } } } + +impl Drop for RuntimeManager { + fn drop(&mut self) { + if let Some(ref process) = self.legacy_process { + let _ = process.lock().unwrap().start_kill(); + } + + if let Some(ref process) = self.next_process { + let _ = process.lock().unwrap().start_kill(); + } + } +} From e1aa18f9c979614d53523c5320f1fed5b4a55dd2 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Fri, 13 Jan 2023 10:22:30 +0200 Subject: [PATCH 08/13] bug: missing so --- deployer/src/deployment/run.rs | 37 +++++++++++++++++------------- deployer/src/error.rs | 4 +--- runtime/src/legacy/error.rs | 7 ------ runtime/src/legacy/mod.rs | 41 +++++++++++++--------------------- 4 files changed, 39 insertions(+), 50 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 2e1a4c665..237f1ba51 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -2,20 +2,20 @@ use std::{ collections::HashMap, net::{Ipv4Addr, SocketAddr}, path::PathBuf, - str::FromStr, sync::Arc, }; use async_trait::async_trait; use opentelemetry::global; use portpicker::pick_unused_port; -use shuttle_common::project::ProjectName as ServiceName; use shuttle_common::storage_manager::ArtifactsStorageManager; -use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest}; +use shuttle_proto::runtime::{ + runtime_client::RuntimeClient, LoadRequest, StartRequest, StopRequest, +}; -use tokio::{sync::Mutex, task::JoinError}; +use tokio::sync::Mutex; use tonic::transport::Channel; -use tracing::{debug_span, error, info, instrument, trace, Instrument}; +use tracing::{debug, debug_span, error, info, instrument, trace, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; @@ -222,7 +222,7 @@ impl Built { secret_getter, runtime_client.clone(), ) - .await; + .await?; tokio::spawn(run( self.id, self.service_name, @@ -243,7 +243,7 @@ async fn load( so_path: PathBuf, secret_getter: impl SecretGetter, mut runtime_client: RuntimeClient<Channel>, -) { +) -> Result<()> { info!( "loading project from: {}", so_path.clone().into_os_string().into_string().unwrap() @@ -262,11 +262,19 @@ async fn load( service_name: service_name.clone(), secrets, }); - info!("loading service"); + + debug!("loading service"); let response = runtime_client.load(load_request).await; - if let Err(e) = response { - info!("failed to load service: {}", e); + match response { + Ok(response) => { + info!(response = ?response.into_inner(), "loading response: "); + Ok(()) + } + Err(error) => { + error!(%error, "failed to load service"); + Err(Error::Load(error.to_string())) + } } } @@ -336,7 +344,9 @@ mod tests { let path = tmp_dir.into_path(); let (tx, _rx) = crossbeam_channel::unbounded(); - RuntimeManager::new(&[0u8; 8], path, "http://provisioner:8000".to_string(), tx) + let file = std::fs::read("../target/debug/shuttle-runtime").unwrap(); + + RuntimeManager::new(&file, path, "http://provisioner:8000".to_string(), tx) } #[derive(Clone)] @@ -553,10 +563,7 @@ mod tests { .await; assert!( - matches!( - result, - Err(Error::Load(shuttle_service::loader::LoaderError::Load(_))) - ), + matches!(result, Err(Error::Load(_))), "expected missing 'so' error: {:?}", result ); diff --git a/deployer/src/error.rs b/deployer/src/error.rs index cdc9c1ed3..0f9ad03cc 100644 --- a/deployer/src/error.rs +++ b/deployer/src/error.rs @@ -2,8 +2,6 @@ use std::error::Error as StdError; use std::io; use thiserror::Error; -use shuttle_service::loader::LoaderError; - use cargo::util::errors::CargoTestError; use crate::deployment::gateway_client; @@ -15,7 +13,7 @@ pub enum Error { #[error("Build error: {0}")] Build(#[source] Box<dyn StdError + Send>), #[error("Load error: {0}")] - Load(#[from] LoaderError), + Load(String), #[error("Prepare to run error: {0}")] PrepareRun(String), #[error("Run error: {0}")] diff --git a/runtime/src/legacy/error.rs b/runtime/src/legacy/error.rs index 9c57cd4e2..4fb96d813 100644 --- a/runtime/src/legacy/error.rs +++ b/runtime/src/legacy/error.rs @@ -1,14 +1,7 @@ -use shuttle_service::loader::LoaderError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { - #[error("Load error: {0}")] - Load(#[from] LoaderError), - #[error("Run error: {0}")] - Run(#[from] shuttle_service::Error), #[error("Start error: {0}")] Start(#[from] shuttle_service::error::CustomError), } - -pub type Result<T> = std::result::Result<T, Error>; diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index c3d98c44f..bb725a08b 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -3,7 +3,6 @@ use std::{ iter::FromIterator, net::{Ipv4Addr, SocketAddr}, ops::DerefMut, - path::PathBuf, str::FromStr, sync::Mutex, }; @@ -19,8 +18,8 @@ use shuttle_proto::{ }, }; use shuttle_service::{ - loader::{LoadedService, Loader}, - Factory, Logger, ServiceName, + loader::{LoadedService, Loader, LoaderError}, + Logger, ServiceName, }; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; @@ -38,7 +37,7 @@ where S: StorageManager, { // Mutexes are for interior mutability - so_path: Mutex<Option<PathBuf>>, + loader: Mutex<Option<Loader>>, logs_rx: Mutex<Option<UnboundedReceiver<LogItem>>>, logs_tx: Mutex<UnboundedSender<LogItem>>, provisioner_address: Endpoint, @@ -55,7 +54,7 @@ where let (tx, rx) = mpsc::unbounded_channel(); Self { - so_path: Mutex::new(None), + loader: Mutex::new(None), logs_rx: Mutex::new(Some(rx)), logs_tx: Mutex::new(tx), kill_tx: Mutex::new(None), @@ -75,8 +74,11 @@ where let LoadRequest { path, secrets, .. } = request.into_inner(); trace!(path, "loading"); - let so_path = PathBuf::from(path); - *self.so_path.lock().unwrap() = Some(so_path); + *self.loader.lock().unwrap() = + Some(Loader::from_so_file(path).map_err(|error| match error { + LoaderError::Load(error) => Status::not_found(error.to_string()), + LoaderError::GetEntrypoint(error) => Status::invalid_argument(error.to_string()), + })?); *self.secrets.lock().unwrap() = Some(BTreeMap::from_iter(secrets.into_iter())); @@ -93,16 +95,16 @@ where .expect("failed to connect to provisioner"); let abstract_factory = AbstractProvisionerFactory::new(provisioner_client); - let so_path = self - .so_path + let loader = self + .loader .lock() .unwrap() - .as_ref() + .take() .ok_or_else(|| -> error::Error { error::Error::Start(anyhow!("trying to start a service that was not loaded")) }) - .map_err(|err| Status::from_error(Box::new(err)))? - .clone(); + .map_err(|err| Status::from_error(Box::new(err)))?; + let secrets = self .secrets .lock() @@ -140,7 +142,8 @@ where let logger = Logger::new(logs_tx, deployment_id); trace!(%service_address, "starting"); - let service = load_service(service_address, so_path, &mut factory, logger) + let service = loader + .load(&mut factory, service_address, logger) .await .map_err(|error| Status::internal(error.to_string()))?; @@ -233,15 +236,3 @@ async fn run_until_stopped( library.close().unwrap(); }); } - -#[instrument(skip(addr, so_path, factory, logger))] -async fn load_service( - addr: SocketAddr, - so_path: PathBuf, - factory: &mut dyn Factory, - logger: Logger, -) -> error::Result<LoadedService> { - let loader = Loader::from_so_file(so_path)?; - - Ok(loader.load(factory, addr, logger).await?) -} From 888db0a3653d5b12c7583ce566716935af6ce34c Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Fri, 13 Jan 2023 10:31:58 +0200 Subject: [PATCH 09/13] bug: stop services --- deployer/src/deployment/run.rs | 35 ++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 237f1ba51..41a0c0ace 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -50,14 +50,6 @@ pub async fn task( let secret_getter = secret_getter.clone(); let storage_manager = storage_manager.clone(); - let _service_name = match ServiceName::from_str(&built.service_name) { - Ok(name) => name, - Err(err) => { - start_crashed_cleanup(&id, err); - continue; - } - }; - let old_deployments_killer = kill_old_deployments( built.service_id, id, @@ -278,14 +270,14 @@ async fn load( } } -#[instrument(skip(runtime_client, deployment_updater, _kill_recv, _cleanup), fields(state = %State::Running))] +#[instrument(skip(runtime_client, deployment_updater, kill_recv, _cleanup), fields(state = %State::Running))] async fn run( id: Uuid, service_name: String, mut runtime_client: RuntimeClient<Channel>, address: SocketAddr, deployment_updater: impl DeploymentUpdater, - _kill_recv: KillReceiver, + mut kill_recv: KillReceiver, _cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>) + Send + 'static, @@ -294,14 +286,33 @@ async fn run( let start_request = tonic::Request::new(StartRequest { deployment_id: id.as_bytes().to_vec(), - service_name, + service_name: service_name.clone(), port: address.port() as u32, }); info!("starting service"); let response = runtime_client.start(start_request).await.unwrap(); - info!(response = ?response.into_inner(), "client response: "); + info!(response = ?response.into_inner(), "start client response: "); + + while let Ok(kill_id) = kill_recv.recv().await { + if kill_id == id { + let stop_request = tonic::Request::new(StopRequest { + deployment_id: id.as_bytes().to_vec(), + service_name, + }); + let response = runtime_client.stop(stop_request).await.unwrap(); + + info!(response = ?response.into_inner(), "stop client response: "); + + info!("deployment '{id}' killed"); + + stopped_cleanup(&id); + return; + } + } + + completed_cleanup(&id); } #[cfg(test)] From f6688bec8ae203cfc04d155d6e7fb4620b43fc8d Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Mon, 16 Jan 2023 09:57:03 +0200 Subject: [PATCH 10/13] bug: ffi and runtime manager not living long enough --- deployer/src/deployment/run.rs | 37 +++++++++++++++----------- deployer/src/runtime_manager.rs | 37 ++++++++++++++++---------- runtime/src/legacy/error.rs | 7 +++++ runtime/src/legacy/mod.rs | 46 +++++++++++++++++++++------------ service/src/loader.rs | 2 ++ 5 files changed, 85 insertions(+), 44 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 41a0c0ace..7cdc8b8bf 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -196,9 +196,8 @@ impl Built { }; let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); + let mut runtime_manager = runtime_manager.lock().await.clone(); let runtime_client = runtime_manager - .lock() - .await .get_runtime_client(self.is_next) .await .map_err(|e| Error::Runtime(e))?; @@ -212,18 +211,26 @@ impl Built { self.service_id, so_path, secret_getter, - runtime_client.clone(), + runtime_client, ) .await?; - tokio::spawn(run( - self.id, - self.service_name, - runtime_client, - address, - deployment_updater, - kill_recv, - cleanup, - )); + + // Move runtime manager to this thread so that the runtime lives long enough + tokio::spawn(async move { + let runtime_client = runtime_manager + .get_runtime_client(self.is_next) + .await + .unwrap(); + run( + self.id, + self.service_name, + runtime_client, + address, + deployment_updater, + kill_recv, + ) + .await + }); Ok(()) } @@ -234,7 +241,7 @@ async fn load( service_id: Uuid, so_path: PathBuf, secret_getter: impl SecretGetter, - mut runtime_client: RuntimeClient<Channel>, + runtime_client: &mut RuntimeClient<Channel>, ) -> Result<()> { info!( "loading project from: {}", @@ -274,7 +281,7 @@ async fn load( async fn run( id: Uuid, service_name: String, - mut runtime_client: RuntimeClient<Channel>, + runtime_client: &mut RuntimeClient<Channel>, address: SocketAddr, deployment_updater: impl DeploymentUpdater, mut kill_recv: KillReceiver, @@ -357,7 +364,7 @@ mod tests { let file = std::fs::read("../target/debug/shuttle-runtime").unwrap(); - RuntimeManager::new(&file, path, "http://provisioner:8000".to_string(), tx) + RuntimeManager::new(&file, path, "http://localhost:5000".to_string(), tx) } #[derive(Clone)] diff --git a/deployer/src/runtime_manager.rs b/deployer/src/runtime_manager.rs index cfc8feb4f..964b460c9 100644 --- a/deployer/src/runtime_manager.rs +++ b/deployer/src/runtime_manager.rs @@ -4,6 +4,7 @@ use anyhow::Context; use shuttle_proto::runtime::{self, runtime_client::RuntimeClient, SubscribeLogsRequest}; use tokio::{process, sync::Mutex}; use tonic::transport::Channel; +use tracing::{info, instrument, trace}; use crate::deployment::deploy_layer; @@ -41,11 +42,12 @@ impl RuntimeManager { pub async fn get_runtime_client( &mut self, is_next: bool, - ) -> anyhow::Result<RuntimeClient<Channel>> { + ) -> anyhow::Result<&mut RuntimeClient<Channel>> { if is_next { Self::get_runtime_client_helper( &mut self.next, &mut self.next_process, + is_next, 6002, &self.binary_bytes, self.artifacts_path.clone(), @@ -57,6 +59,7 @@ impl RuntimeManager { Self::get_runtime_client_helper( &mut self.legacy, &mut self.legacy_process, + is_next, 6001, &self.binary_bytes, self.artifacts_path.clone(), @@ -67,21 +70,25 @@ impl RuntimeManager { } } - async fn get_runtime_client_helper( - runtime_option: &mut Option<RuntimeClient<Channel>>, + #[instrument(skip(runtime_option, process_option, binary_bytes, log_sender))] + async fn get_runtime_client_helper<'a>( + runtime_option: &'a mut Option<RuntimeClient<Channel>>, process_option: &mut Option<Arc<std::sync::Mutex<process::Child>>>, + is_next: bool, port: u16, binary_bytes: &[u8], artifacts_path: PathBuf, provisioner_address: &str, log_sender: crossbeam_channel::Sender<deploy_layer::Log>, - ) -> anyhow::Result<RuntimeClient<Channel>> { - if let Some(ref runtime_client) = runtime_option { - Ok(runtime_client.clone()) + ) -> anyhow::Result<&'a mut RuntimeClient<Channel>> { + if let Some(runtime_client) = runtime_option { + trace!("returning previous client"); + Ok(runtime_client) } else { - let (process, mut runtime_client) = runtime::start( + trace!("making new client"); + let (process, runtime_client) = runtime::start( binary_bytes, - true, + is_next, runtime::StorageManagerType::Artifacts(artifacts_path), provisioner_address, port, @@ -91,32 +98,36 @@ impl RuntimeManager { let sender = log_sender; let mut stream = runtime_client + .clone() .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) .await .context("subscribing to runtime logs stream")? .into_inner(); tokio::spawn(async move { - while let Some(log) = stream.message().await.unwrap() { + while let Ok(Some(log)) = stream.message().await { sender.send(log.into()).expect("to send log to persistence"); } }); - *runtime_option = Some(runtime_client.clone()); + *runtime_option = Some(runtime_client); *process_option = Some(Arc::new(std::sync::Mutex::new(process))); - Ok(runtime_client) + // Safe to unwrap as it was just set + Ok(runtime_option.as_mut().unwrap()) } } } impl Drop for RuntimeManager { fn drop(&mut self) { - if let Some(ref process) = self.legacy_process { + info!("runtime manager shutting down"); + + if let Some(ref process) = self.legacy_process.take() { let _ = process.lock().unwrap().start_kill(); } - if let Some(ref process) = self.next_process { + if let Some(ref process) = self.next_process.take() { let _ = process.lock().unwrap().start_kill(); } } diff --git a/runtime/src/legacy/error.rs b/runtime/src/legacy/error.rs index 4fb96d813..9c57cd4e2 100644 --- a/runtime/src/legacy/error.rs +++ b/runtime/src/legacy/error.rs @@ -1,7 +1,14 @@ +use shuttle_service::loader::LoaderError; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { + #[error("Load error: {0}")] + Load(#[from] LoaderError), + #[error("Run error: {0}")] + Run(#[from] shuttle_service::Error), #[error("Start error: {0}")] Start(#[from] shuttle_service::error::CustomError), } + +pub type Result<T> = std::result::Result<T, Error>; diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index bb725a08b..ba1b3fd57 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -3,6 +3,7 @@ use std::{ iter::FromIterator, net::{Ipv4Addr, SocketAddr}, ops::DerefMut, + path::PathBuf, str::FromStr, sync::Mutex, }; @@ -18,8 +19,8 @@ use shuttle_proto::{ }, }; use shuttle_service::{ - loader::{LoadedService, Loader, LoaderError}, - Logger, ServiceName, + loader::{LoadedService, Loader}, + Factory, Logger, ServiceName, }; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; @@ -37,7 +38,7 @@ where S: StorageManager, { // Mutexes are for interior mutability - loader: Mutex<Option<Loader>>, + so_path: Mutex<Option<PathBuf>>, logs_rx: Mutex<Option<UnboundedReceiver<LogItem>>>, logs_tx: Mutex<UnboundedSender<LogItem>>, provisioner_address: Endpoint, @@ -54,7 +55,7 @@ where let (tx, rx) = mpsc::unbounded_channel(); Self { - loader: Mutex::new(None), + so_path: Mutex::new(None), logs_rx: Mutex::new(Some(rx)), logs_tx: Mutex::new(tx), kill_tx: Mutex::new(None), @@ -74,11 +75,8 @@ where let LoadRequest { path, secrets, .. } = request.into_inner(); trace!(path, "loading"); - *self.loader.lock().unwrap() = - Some(Loader::from_so_file(path).map_err(|error| match error { - LoaderError::Load(error) => Status::not_found(error.to_string()), - LoaderError::GetEntrypoint(error) => Status::invalid_argument(error.to_string()), - })?); + let so_path = PathBuf::from(path); + *self.so_path.lock().unwrap() = Some(so_path); *self.secrets.lock().unwrap() = Some(BTreeMap::from_iter(secrets.into_iter())); @@ -90,21 +88,23 @@ where &self, request: Request<StartRequest>, ) -> Result<Response<StartResponse>, Status> { + trace!("legacy starting"); + let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone()) .await .expect("failed to connect to provisioner"); let abstract_factory = AbstractProvisionerFactory::new(provisioner_client); - let loader = self - .loader + let so_path = self + .so_path .lock() .unwrap() - .take() + .as_ref() .ok_or_else(|| -> error::Error { error::Error::Start(anyhow!("trying to start a service that was not loaded")) }) - .map_err(|err| Status::from_error(Box::new(err)))?; - + .map_err(|err| Status::from_error(Box::new(err)))? + .clone(); let secrets = self .secrets .lock() @@ -118,6 +118,8 @@ where .map_err(|err| Status::from_error(Box::new(err)))? .clone(); + trace!("prepare done"); + let StartRequest { deployment_id, service_name, @@ -136,14 +138,14 @@ where secrets, self.storage_manager.clone(), ); + trace!("got factory"); let logs_tx = self.logs_tx.lock().unwrap().clone(); let logger = Logger::new(logs_tx, deployment_id); trace!(%service_address, "starting"); - let service = loader - .load(&mut factory, service_address, logger) + let service = load_service(service_address, so_path, &mut factory, logger) .await .map_err(|error| Status::internal(error.to_string()))?; @@ -236,3 +238,15 @@ async fn run_until_stopped( library.close().unwrap(); }); } + +#[instrument(skip(addr, so_path, factory, logger))] +async fn load_service( + addr: SocketAddr, + so_path: PathBuf, + factory: &mut dyn Factory, + logger: Logger, +) -> error::Result<LoadedService> { + let loader = Loader::from_so_file(so_path)?; + + Ok(loader.load(factory, addr, logger).await?) +} diff --git a/service/src/loader.rs b/service/src/loader.rs index 19d1114c3..7deb5b5fc 100644 --- a/service/src/loader.rs +++ b/service/src/loader.rs @@ -72,6 +72,8 @@ impl Loader { addr: SocketAddr, logger: logger::Logger, ) -> Result<LoadedService, Error> { + trace!("loading service"); + let mut bootstrapper = self.bootstrapper; AssertUnwindSafe(bootstrapper.bootstrap(factory, logger)) From 0950fefe3d3701e3a06586c8c836420c3543c80c Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Mon, 16 Jan 2023 10:03:55 +0200 Subject: [PATCH 11/13] bug: missing so error --- runtime/src/legacy/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index ba1b3fd57..9c2663f60 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -76,6 +76,11 @@ where trace!(path, "loading"); let so_path = PathBuf::from(path); + + if !so_path.exists() { + return Err(Status::not_found("'.so' to load does not exist")); + } + *self.so_path.lock().unwrap() = Some(so_path); *self.secrets.lock().unwrap() = Some(BTreeMap::from_iter(secrets.into_iter())); From 0a44b13885e4b8a7a13b28cd38a16052287502d3 Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Mon, 16 Jan 2023 10:24:13 +0200 Subject: [PATCH 12/13] refactor: run cleanups --- deployer/src/deployment/run.rs | 91 ++++++++++++++-------------------- 1 file changed, 37 insertions(+), 54 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 7cdc8b8bf..03108f284 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -10,11 +10,11 @@ use opentelemetry::global; use portpicker::pick_unused_port; use shuttle_common::storage_manager::ArtifactsStorageManager; use shuttle_proto::runtime::{ - runtime_client::RuntimeClient, LoadRequest, StartRequest, StopRequest, + runtime_client::RuntimeClient, LoadRequest, StartRequest, StopRequest, StopResponse, }; use tokio::sync::Mutex; -use tonic::transport::Channel; +use tonic::{transport::Channel, Response, Status}; use tracing::{debug, debug_span, error, info, instrument, trace, Instrument}; use tracing_opentelemetry::OpenTelemetrySpanExt; use uuid::Uuid; @@ -56,16 +56,13 @@ pub async fn task( active_deployment_getter.clone(), kill_send, ); - let cleanup = move |result: std::result::Result< - std::result::Result<(), shuttle_service::Error>, - JoinError, - >| match result { - Ok(inner) => match inner { - Ok(()) => completed_cleanup(&id), + let cleanup = move |result: std::result::Result<Response<StopResponse>, Status>| { + info!(response = ?result, "stop client response: "); + + match result { + Ok(_) => completed_cleanup(&id), Err(err) => crashed_cleanup(&id, err), - }, - Err(err) if err.is_cancelled() => stopped_cleanup(&id), - Err(err) => start_crashed_cleanup(&id, err), + } }; let runtime_manager = runtime_manager.clone(); @@ -180,9 +177,7 @@ impl Built { deployment_updater: impl DeploymentUpdater, kill_recv: KillReceiver, kill_old_deployments: impl futures::Future<Output = Result<()>>, - cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>) - + Send - + 'static, + cleanup: impl FnOnce(std::result::Result<Response<StopResponse>, Status>) + Send + 'static, ) -> Result<()> { let so_path = storage_manager.deployment_library_path(&self.id)?; @@ -228,6 +223,7 @@ impl Built { address, deployment_updater, kill_recv, + cleanup, ) .await }); @@ -277,7 +273,7 @@ async fn load( } } -#[instrument(skip(runtime_client, deployment_updater, kill_recv, _cleanup), fields(state = %State::Running))] +#[instrument(skip(runtime_client, deployment_updater, kill_recv, cleanup), fields(state = %State::Running))] async fn run( id: Uuid, service_name: String, @@ -285,9 +281,7 @@ async fn run( address: SocketAddr, deployment_updater: impl DeploymentUpdater, mut kill_recv: KillReceiver, - _cleanup: impl FnOnce(std::result::Result<std::result::Result<(), shuttle_service::Error>, JoinError>) - + Send - + 'static, + cleanup: impl FnOnce(std::result::Result<Response<StopResponse>, Status>) + Send + 'static, ) { deployment_updater.set_address(&id, &address).await.unwrap(); @@ -302,24 +296,21 @@ async fn run( info!(response = ?response.into_inner(), "start client response: "); + let mut response = Err(Status::unknown("not stopped yet")); + while let Ok(kill_id) = kill_recv.recv().await { if kill_id == id { let stop_request = tonic::Request::new(StopRequest { deployment_id: id.as_bytes().to_vec(), - service_name, + service_name: service_name.clone(), }); - let response = runtime_client.stop(stop_request).await.unwrap(); - - info!(response = ?response.into_inner(), "stop client response: "); + response = runtime_client.stop(stop_request).await; - info!("deployment '{id}' killed"); - - stopped_cleanup(&id); - return; + break; } } - completed_cleanup(&id); + cleanup(response); } #[cfg(test)] @@ -328,12 +319,14 @@ mod tests { use async_trait::async_trait; use shuttle_common::storage_manager::ArtifactsStorageManager; + use shuttle_proto::runtime::StopResponse; use tempdir::TempDir; use tokio::{ sync::{broadcast, oneshot, Mutex}, task::JoinError, time::sleep, }; + use tonic::{Response, Status}; use uuid::Uuid; use crate::{ @@ -407,14 +400,10 @@ mod tests { let (kill_send, kill_recv) = broadcast::channel(1); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |result: std::result::Result< - std::result::Result<(), shuttle_service::Error>, - JoinError, - >| { + let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| { assert!( - matches!(result, Err(ref join_error) if join_error.is_cancelled()), - "handle should have been cancelled: {:?}", - result + result.unwrap().into_inner().success, + "handle should have been cancelled", ); cleanup_send.send(()).unwrap(); }; @@ -452,16 +441,13 @@ mod tests { let (_kill_send, kill_recv) = broadcast::channel(1); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |result: std::result::Result< - std::result::Result<(), shuttle_service::Error>, - JoinError, - >| { - let result = result.unwrap(); - assert!( - result.is_ok(), - "did not expect error from self stopping service: {}", - result.unwrap_err() - ); + let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| { + // let result = result.unwrap(); + // assert!( + // result.is_ok(), + // "did not expect error from self stopping service: {}", + // result.unwrap_err() + // ); cleanup_send.send(()).unwrap(); }; let secret_getter = get_secret_getter(); @@ -492,16 +478,13 @@ mod tests { let (_kill_send, kill_recv) = broadcast::channel(1); let (cleanup_send, cleanup_recv): (oneshot::Sender<()>, _) = oneshot::channel(); - let handle_cleanup = |result: std::result::Result< - std::result::Result<(), shuttle_service::Error>, - JoinError, - >| { - let result = result.unwrap(); - assert!( - matches!(result, Err(shuttle_service::Error::BindPanic(ref msg)) if msg == "panic in bind"), - "expected inner error from handle: {:?}", - result - ); + let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| { + // let result = result.unwrap(); + // assert!( + // matches!(result, Err(shuttle_service::Error::BindPanic(ref msg)) if msg == "panic in bind"), + // "expected inner error from handle: {:?}", + // result + // ); cleanup_send.send(()).unwrap(); }; let secret_getter = get_secret_getter(); From ada01d7f3e1831100d26d18685085b47584ba81e Mon Sep 17 00:00:00 2001 From: chesedo <pieter@chesedo.me> Date: Mon, 16 Jan 2023 12:12:42 +0200 Subject: [PATCH 13/13] refactor: clippy suggestions --- deployer/src/deployment/mod.rs | 3 +-- deployer/src/deployment/run.rs | 6 +++--- deployer/src/persistence/mod.rs | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index ce2e4e0d3..121a4c926 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -172,8 +172,7 @@ pub struct DeploymentManager { impl DeploymentManager { /// Create a new deployment manager. Manages one or more 'pipelines' for /// processing service building, loading, and deployment. - pub fn builder<'a, LR, SR, ADG, DU, SG, QC>( - ) -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { + pub fn builder<LR, SR, ADG, DU, SG, QC>() -> DeploymentManagerBuilder<LR, SR, ADG, DU, SG, QC> { DeploymentManagerBuilder { build_log_recorder: None, secret_recorder: None, diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index aa24dbe29..a9edebc1d 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -195,7 +195,7 @@ impl Built { let runtime_client = runtime_manager .get_runtime_client(self.is_next) .await - .map_err(|e| Error::Runtime(e))?; + .map_err(Error::Runtime)?; kill_old_deployments.await?; @@ -438,7 +438,7 @@ mod tests { let (_kill_send, kill_recv) = broadcast::channel(1); let (cleanup_send, cleanup_recv) = oneshot::channel(); - let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| { + let handle_cleanup = |_result: std::result::Result<Response<StopResponse>, Status>| { // let result = result.unwrap(); // assert!( // result.is_ok(), @@ -475,7 +475,7 @@ mod tests { let (_kill_send, kill_recv) = broadcast::channel(1); let (cleanup_send, cleanup_recv): (oneshot::Sender<()>, _) = oneshot::channel(); - let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| { + let handle_cleanup = |_result: std::result::Result<Response<StopResponse>, Status>| { // let result = result.unwrap(); // assert!( // matches!(result, Err(shuttle_service::Error::BindPanic(ref msg)) if msg == "panic in bind"), diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index 96ab11349..7ac9f3e2d 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -540,7 +540,7 @@ mod tests { let update = p.get_deployment(&id).await.unwrap().unwrap(); assert_eq!(update.state, State::Built); assert_eq!(update.address, Some(address)); - assert_eq!(update.is_next, true); + assert!(update.is_next); assert_ne!( update.last_update, Utc.with_ymd_and_hms(2022, 4, 25, 4, 43, 33).unwrap()