From c3c0ceda61cdb7f00135208385fdeb6461a2e1be Mon Sep 17 00:00:00 2001
From: Pieter <pieter@chesedo.me>
Date: Thu, 17 Nov 2022 09:03:07 +0200
Subject: [PATCH] bug: deployer freezes (#478)

* bug: reduce spawning to have deployer lock up less

* refactor: don't return cargo logs
---
 cargo-shuttle/src/lib.rs         |  4 +--
 deployer/src/deployment/queue.rs | 62 ++++++++++++++++----------------
 deployer/src/persistence/log.rs  |  1 -
 service/src/loader.rs            | 43 ++++++++++------------
 4 files changed, 51 insertions(+), 59 deletions(-)

diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs
index 6b7ee7a9e..53838c344 100644
--- a/cargo-shuttle/src/lib.rs
+++ b/cargo-shuttle/src/lib.rs
@@ -266,7 +266,7 @@ impl Shuttle {
         trace!("starting a local run for a service: {run_args:?}");
 
         let (tx, rx): (crossbeam_channel::Sender<Message>, _) = crossbeam_channel::bounded(0);
-        tokio::spawn(async move {
+        tokio::task::spawn_blocking(move || {
             while let Ok(message) = rx.recv() {
                 match message {
                     Message::TextLine(line) => println!("{line}"),
@@ -332,7 +332,7 @@ impl Shuttle {
 
         handle.await??;
 
-        tokio::spawn(async move {
+        tokio::task::spawn_blocking(move || {
             trace!("closing so file");
             so.close().unwrap();
         });
diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs
index 2eb3c3ea9..298e36eb8 100644
--- a/deployer/src/deployment/queue.rs
+++ b/deployer/src/deployment/queue.rs
@@ -133,7 +133,7 @@ impl Queued {
 
         let (tx, rx): (crossbeam_channel::Sender<Message>, _) = crossbeam_channel::bounded(0);
         let id = self.id;
-        tokio::spawn(async move {
+        tokio::task::spawn_blocking(move || {
             while let Ok(message) = rx.recv() {
                 trace!(?message, "received cargo message");
                 // TODO: change these to `info!(...)` as [valuable] support increases.
@@ -286,6 +286,8 @@ async fn build_deployment(
         .await
         .map_err(|e| Error::Build(e.into()))?;
 
+    trace!(?so_path, "got so path");
+
     Ok(so_path)
 }
 
@@ -297,36 +299,8 @@ async fn run_pre_deploy_tests(
     let (read, write) = pipe::pipe();
     let project_path = project_path.to_owned();
 
-    let handle = tokio::spawn(async move {
-        let config = get_config(write)?;
-        let manifest_path = project_path.join("Cargo.toml");
-
-        let ws = Workspace::new(&manifest_path, &config)?;
-
-        let mut compile_opts = CompileOptions::new(&config, CompileMode::Test)?;
-
-        compile_opts.build_config.message_format = MessageFormat::Json {
-            render_diagnostics: false,
-            short: false,
-            ansi: false,
-        };
-
-        let opts = TestOptions {
-            compile_opts,
-            no_run: false,
-            no_fail_fast: false,
-        };
-
-        let test_failures = cargo::ops::run_tests(&ws, &opts, &[])?;
-
-        match test_failures {
-            Some(failures) => Err(failures.into()),
-            None => Ok(()),
-        }
-    });
-
     // This needs to be on a separate thread, else deployer will block (reason currently unknown :D)
-    tokio::spawn(async move {
+    tokio::task::spawn_blocking(move || {
         for message in Message::parse_stream(read) {
             match message {
                 Ok(message) => {
@@ -341,7 +315,31 @@ async fn run_pre_deploy_tests(
         }
     });
 
-    handle.await?
+    let config = get_config(write)?;
+    let manifest_path = project_path.join("Cargo.toml");
+
+    let ws = Workspace::new(&manifest_path, &config)?;
+
+    let mut compile_opts = CompileOptions::new(&config, CompileMode::Test)?;
+
+    compile_opts.build_config.message_format = MessageFormat::Json {
+        render_diagnostics: false,
+        short: false,
+        ansi: false,
+    };
+
+    let opts = TestOptions {
+        compile_opts,
+        no_run: false,
+        no_fail_fast: false,
+    };
+
+    let test_failures = cargo::ops::run_tests(&ws, &opts, &[])?;
+
+    match test_failures {
+        Some(failures) => Err(failures.into()),
+        None => Ok(()),
+    }
 }
 
 /// Store 'so' file in the libs folder
@@ -462,7 +460,7 @@ ff0e55bda1ff01000000000000000000e0079c01ff12a55500280000",
         let root = Path::new(env!("CARGO_MANIFEST_DIR"));
         let (tx, rx) = crossbeam_channel::unbounded();
 
-        tokio::spawn(async move { while rx.recv().is_ok() {} });
+        tokio::task::spawn_blocking(move || while rx.recv().is_ok() {});
 
         let failure_project_path = root.join("tests/resources/tests-fail");
         assert!(matches!(
diff --git a/deployer/src/persistence/log.rs b/deployer/src/persistence/log.rs
index 822982bc7..f5c17e22a 100644
--- a/deployer/src/persistence/log.rs
+++ b/deployer/src/persistence/log.rs
@@ -107,7 +107,6 @@ fn extract_message(fields: &Value) -> Option<String> {
                         return Some(rendered.as_str()?.to_string());
                     }
                 }
-                Value::String(mes_str) => return Some(mes_str.to_string()),
                 _ => {}
             }
         }
diff --git a/service/src/loader.rs b/service/src/loader.rs
index f415805f3..223871251 100644
--- a/service/src/loader.rs
+++ b/service/src/loader.rs
@@ -50,7 +50,7 @@ impl Loader {
     /// function called `ENTRYPOINT_SYMBOL_NAME`, likely automatically generated
     /// using the [`shuttle_service::main`][crate::main] macro.
     pub fn from_so_file<P: AsRef<OsStr>>(so_path: P) -> Result<Self, LoaderError> {
-        trace!("loading {:?}", so_path.as_ref().to_str());
+        trace!(so_path = so_path.as_ref().to_str(), "loading .so path");
         unsafe {
             let lib = Library::new(so_path).map_err(LoaderError::Load)?;
 
@@ -110,29 +110,8 @@ pub async fn build_crate(
     let (read, write) = pipe::pipe();
     let project_path = project_path.to_owned();
 
-    let handle = tokio::spawn(async move {
-        trace!("started thread to build crate");
-        let config = get_config(write)?;
-        let manifest_path = project_path.join("Cargo.toml");
-        let mut ws = Workspace::new(&manifest_path, &config)?;
-
-        let current = ws.current_mut().map_err(|_| anyhow!("A Shuttle project cannot have a virtual manifest file - please ensure your Cargo.toml file specifies it as a library."))?;
-        let manifest = current.manifest_mut();
-        ensure_cdylib(manifest)?;
-
-        let summary = current.manifest_mut().summary_mut();
-        make_name_unique(summary, deployment_id);
-        check_version(summary)?;
-        check_no_panic(&ws)?;
-
-        let opts = get_compile_options(&config, release_mode)?;
-        let compilation = compile(&ws, &opts);
-
-        Ok(compilation?.cdylibs[0].path.clone())
-    });
-
     // This needs to be on a separate thread, else deployer will block (reason currently unknown :D)
-    tokio::spawn(async move {
+    tokio::task::spawn_blocking(move || {
         trace!("started thread to to capture build output stream");
         for message in Message::parse_stream(read) {
             trace!(?message, "parsed cargo message");
@@ -149,7 +128,23 @@ pub async fn build_crate(
         }
     });
 
-    handle.await?
+    let config = get_config(write)?;
+    let manifest_path = project_path.join("Cargo.toml");
+    let mut ws = Workspace::new(&manifest_path, &config)?;
+
+    let current = ws.current_mut().map_err(|_| anyhow!("A Shuttle project cannot have a virtual manifest file - please ensure your Cargo.toml file specifies it as a library."))?;
+    let manifest = current.manifest_mut();
+    ensure_cdylib(manifest)?;
+
+    let summary = current.manifest_mut().summary_mut();
+    make_name_unique(summary, deployment_id);
+    check_version(summary)?;
+    check_no_panic(&ws)?;
+
+    let opts = get_compile_options(&config, release_mode)?;
+    let compilation = compile(&ws, &opts);
+
+    Ok(compilation?.cdylibs[0].path.clone())
 }
 
 /// Get the default compile config with output redirected to writer