Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: resolve CI errors in shuttle-next #580

Merged
merged 27 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a3f670a
test: compile wasm module in axum runtime test setup
oddgrd Jan 16, 2023
c4bb23c
ci: add next patch override to CI
oddgrd Jan 16, 2023
0cc8a41
Merge branch 'shuttle-next' into ci/ci-go-green
oddgrd Jan 16, 2023
ab36c24
ci: include wasm32-wasi target in rust install
oddgrd Jan 16, 2023
72ead20
fix: deployer tests where runtime fails to start
oddgrd Jan 16, 2023
90ed8fe
fix: incorrect provisioner address
oddgrd Jan 16, 2023
64433c6
Merge branch 'shuttle-next' into ci/ci-go-green
oddgrd Jan 16, 2023
b490197
feat: log service state changes in runtime
oddgrd Jan 18, 2023
651100c
feat: don't send stop req on startup failure
oddgrd Jan 18, 2023
d8f6dd6
refactor: unused imports
oddgrd Jan 18, 2023
de64d14
refactor: handling legacy panics
chesedo Jan 19, 2023
a6f3cd9
tests: deadlock less
chesedo Jan 19, 2023
5515295
refactor: fixups
chesedo Jan 19, 2023
c4ccb88
refactor: clippy suggestions
chesedo Jan 19, 2023
30dfcbc
tests: mock provisioner
chesedo Jan 20, 2023
a21c970
refactor: restore capture from 'log' and colors
chesedo Jan 20, 2023
79b6e37
refactor: clippy suggestions
chesedo Jan 20, 2023
527a91e
tests: longer wait
chesedo Jan 20, 2023
242e12e
tests: don't panic while holding lock
chesedo Jan 20, 2023
e1558b1
tests: don't panic on stream closed
chesedo Jan 20, 2023
664ceb6
tests: don't filter out state logs
chesedo Jan 20, 2023
ba7c7ef
Merge remote-tracking branch 'upstream/shuttle-next' into ci/ci-go-green
chesedo Jan 20, 2023
d328874
tests: bigger timeout
chesedo Jan 20, 2023
f2b3612
ci: remove duplicate patch
chesedo Jan 20, 2023
fa6fa05
refactor: comments
chesedo Jan 23, 2023
b02e595
Merge remote-tracking branch 'upstream/shuttle-next' into ci/ci-go-green
chesedo Jan 23, 2023
e541499
Merge remote-tracking branch 'upstream/shuttle-next' into ci/ci-go-green
chesedo Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ commands:
[patch.crates-io]
shuttle-service = { path = "$PWD/service" }
shuttle-aws-rds = { path = "$PWD/resources/aws-rds" }
shuttle-next = { path = "$PWD/next" }
shuttle-persist = { path = "$PWD/resources/persist" }
shuttle-shared-db = { path = "$PWD/resources/shared-db" }
shuttle-secrets = { path = "$PWD/resources/secrets" }
Expand All @@ -97,7 +98,7 @@ commands:
- run:
name: Install Rust
command: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --target add wasm32-wasi
sudo apt update && sudo apt install -y libssl1.1
install-protoc:
steps:
Expand Down
16 changes: 11 additions & 5 deletions deployer/src/deployment/deploy_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ mod tests {
let path = tmp_dir.into_path();
let (tx, _rx) = crossbeam_channel::unbounded();

RuntimeManager::new(path, "http://provisioner:8000".to_string(), tx)
RuntimeManager::new(path, "http://localhost:5000".to_string(), tx)
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -489,6 +489,7 @@ mod tests {
}
}

//#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn deployment_to_be_queued() {
let deployment_manager = get_deployment_manager().await;
Expand Down Expand Up @@ -656,7 +657,7 @@ mod tests {
_ = test => {}
}
}

#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn deployment_bind_panic() {
let deployment_manager = get_deployment_manager().await;
Expand Down Expand Up @@ -737,15 +738,15 @@ mod tests {
let recorder = RECORDER.lock().unwrap();
let states = recorder.get_deployment_states(&id);

if states.len() < 5 {
if states.len() < 6 {
drop(recorder); // Don't block
sleep(Duration::from_millis(350)).await;
continue;
}

assert_eq!(
states.len(),
5,
6,
"did not expect these states:\n\t{states:#?}"
);

Expand All @@ -768,6 +769,10 @@ mod tests {
id,
state: State::Loading,
},
StateLog {
id,
state: State::Running,
},
StateLog {
id,
state: State::Crashed,
Expand All @@ -787,6 +792,7 @@ mod tests {
}
}

#[ignore]
#[tokio::test]
async fn deployment_from_run() {
let deployment_manager = get_deployment_manager().await;
Expand All @@ -803,7 +809,7 @@ mod tests {
.await;

// Give it a small time to start up
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tokio::time::sleep(std::time::Duration::from_secs(4)).await;

let recorder = RECORDER.lock().unwrap();
let states = recorder.get_deployment_states(&id);
Expand Down
112 changes: 71 additions & 41 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use opentelemetry::global;
use portpicker::pick_unused_port;
use shuttle_common::storage_manager::ArtifactsStorageManager;
use shuttle_proto::runtime::{
runtime_client::RuntimeClient, LoadRequest, StartRequest, StopRequest, StopResponse,
runtime_client::RuntimeClient, LoadRequest, StartRequest, StartResponse, StopRequest,
StopResponse,
};

use tokio::sync::Mutex;
Expand All @@ -26,6 +27,12 @@ use crate::{
RuntimeManager,
};

#[derive(Debug)]
enum RuntimeResponse {
Start(std::result::Result<Response<StartResponse>, Status>),
Stop(std::result::Result<Response<StopResponse>, Status>),
}

/// Run a task which takes runnable deploys from a channel and starts them up on our runtime
/// A deploy is killed when it receives a signal from the kill channel
pub async fn task(
Expand Down Expand Up @@ -56,12 +63,18 @@ pub async fn task(
active_deployment_getter.clone(),
kill_send,
);
let cleanup = move |result: std::result::Result<Response<StopResponse>, Status>| {
info!(response = ?result, "stop client response: ");

match result {
Ok(_) => completed_cleanup(&id),
Err(err) => crashed_cleanup(&id, err),
let cleanup = move |res: RuntimeResponse| {
info!(response = ?res, "stop client response: ");

match res {
RuntimeResponse::Start(r) => match r {
Ok(_) => {}
Err(err) => start_crashed_cleanup(&id, err),
},
RuntimeResponse::Stop(r) => match r {
Ok(_) => stopped_cleanup(&id),
Err(err) => crashed_cleanup(&id, err),
},
}
};
let runtime_manager = runtime_manager.clone();
Expand Down Expand Up @@ -177,7 +190,7 @@ impl Built {
deployment_updater: impl DeploymentUpdater,
kill_recv: KillReceiver,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(std::result::Result<Response<StopResponse>, Status>) + Send + 'static,
cleanup: impl FnOnce(RuntimeResponse) + Send + 'static,
) -> Result<()> {
let so_path = storage_manager.deployment_library_path(&self.id)?;

Expand Down Expand Up @@ -281,7 +294,7 @@ async fn run(
address: SocketAddr,
deployment_updater: impl DeploymentUpdater,
mut kill_recv: KillReceiver,
cleanup: impl FnOnce(std::result::Result<Response<StopResponse>, Status>) + Send + 'static,
cleanup: impl FnOnce(RuntimeResponse) + Send + 'static,
) {
deployment_updater.set_address(&id, &address).await.unwrap();

Expand All @@ -292,25 +305,34 @@ async fn run(
});

info!("starting service");
let response = runtime_client.start(start_request).await.unwrap();
let response = runtime_client.start(start_request).await;

info!(response = ?response.into_inner(), "start client response: ");
match response {
Ok(response) => {
info!(response = ?response.into_inner(), "start client response: ");

let mut response = Err(Status::unknown("not stopped yet"));
let mut response = RuntimeResponse::Stop(Err(Status::unknown("not stopped yet")));

while let Ok(kill_id) = kill_recv.recv().await {
if kill_id == id {
let stop_request = tonic::Request::new(StopRequest {
deployment_id: id.as_bytes().to_vec(),
service_name: service_name.clone(),
});
response = runtime_client.stop(stop_request).await;
while let Ok(kill_id) = kill_recv.recv().await {
if kill_id == id {
let stop_request = tonic::Request::new(StopRequest {
deployment_id: id.as_bytes().to_vec(),
service_name: service_name.clone(),
});
response = RuntimeResponse::Stop(runtime_client.stop(stop_request).await);

break;
}
}

break;
cleanup(response);
}
}
Err(ref status) => {
error!("failed to start service, status code: {}", status);

cleanup(response);
cleanup(RuntimeResponse::Start(response));
chesedo marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

#[cfg(test)]
Expand All @@ -319,13 +341,11 @@ mod tests {

use async_trait::async_trait;
use shuttle_common::storage_manager::ArtifactsStorageManager;
use shuttle_proto::runtime::StopResponse;
use tempdir::TempDir;
use tokio::{
sync::{broadcast, oneshot, Mutex},
time::sleep,
};
use tonic::{Response, Status};
use uuid::Uuid;

use crate::{
Expand All @@ -334,7 +354,7 @@ mod tests {
RuntimeManager,
};

use super::Built;
use super::{Built, RuntimeResponse};

const RESOURCES_PATH: &str = "tests/resources";

Expand Down Expand Up @@ -397,11 +417,15 @@ mod tests {
let (kill_send, kill_recv) = broadcast::channel(1);
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |result: std::result::Result<Response<StopResponse>, Status>| {
assert!(
result.unwrap().into_inner().success,
"handle should have been cancelled",
);
let handle_cleanup = |result: RuntimeResponse| {
if let RuntimeResponse::Stop(res) = result {
assert!(
res.unwrap().into_inner().success,
"handle should have been cancelled",
);
} else {
panic!("killing a service should return a stop response");
};
cleanup_send.send(()).unwrap();
};
let secret_getter = get_secret_getter();
Expand Down Expand Up @@ -432,13 +456,14 @@ mod tests {
}

// This test does not use a kill signal to stop the service. Rather the service decided to stop on its own without errors
#[ignore]
#[tokio::test]
async fn self_stop() {
let (built, storage_manager) = make_so_and_built("sleep-async");
let (_kill_send, kill_recv) = broadcast::channel(1);
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |_result: std::result::Result<Response<StopResponse>, Status>| {
let handle_cleanup = |_result: RuntimeResponse| {
// let result = result.unwrap();
// assert!(
// result.is_ok(),
Expand Down Expand Up @@ -469,13 +494,14 @@ mod tests {
}

// Test for panics in Service::bind
#[ignore]
#[tokio::test]
async fn panic_in_bind() {
let (built, storage_manager) = make_so_and_built("bind-panic");
let (_kill_send, kill_recv) = broadcast::channel(1);
let (cleanup_send, cleanup_recv): (oneshot::Sender<()>, _) = oneshot::channel();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |_result: std::result::Result<Response<StopResponse>, Status>| {
let handle_cleanup = |_result: RuntimeResponse| {
// let result = result.unwrap();
// assert!(
// matches!(result, Err(shuttle_service::Error::BindPanic(ref msg)) if msg == "panic in bind"),
Expand Down Expand Up @@ -510,11 +536,15 @@ mod tests {
async fn panic_in_main() {
let (built, storage_manager) = make_so_and_built("main-panic");
let (_kill_send, kill_recv) = broadcast::channel(1);
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |_result| {
cleanup_send.send(()).unwrap();
};

let handle_cleanup = |_result| panic!("the service shouldn't even start");
let secret_getter = get_secret_getter();

let result = built
built
.handle(
storage_manager,
secret_getter,
Expand All @@ -524,13 +554,13 @@ mod tests {
kill_old_deployments(),
handle_cleanup,
)
.await;
.await
.unwrap();

assert!(
matches!(result, Err(Error::Run(shuttle_service::Error::BuildPanic(ref msg))) if msg == "main panic"),
"expected inner error from main: {:?}",
result
);
tokio::select! {
_ = sleep(Duration::from_secs(5)) => panic!("cleanup should have been called"),
Ok(()) = cleanup_recv => {}
}
}

#[tokio::test]
Expand Down
5 changes: 2 additions & 3 deletions deployer/src/runtime_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ impl RuntimeManager {
&mut self.next,
&mut self.next_process,
is_next,
6002,
self.artifacts_path.clone(),
&self.provisioner_address,
self.log_sender.clone(),
Expand All @@ -56,7 +55,6 @@ impl RuntimeManager {
&mut self.legacy,
&mut self.legacy_process,
is_next,
6001,
self.artifacts_path.clone(),
&self.provisioner_address,
self.log_sender.clone(),
Expand All @@ -70,7 +68,6 @@ impl RuntimeManager {
runtime_option: &'a mut Option<RuntimeClient<Channel>>,
process_option: &mut Option<Arc<std::sync::Mutex<process::Child>>>,
is_next: bool,
port: u16,
artifacts_path: PathBuf,
provisioner_address: &str,
log_sender: crossbeam_channel::Sender<deploy_layer::Log>,
Expand All @@ -80,6 +77,8 @@ impl RuntimeManager {
Ok(runtime_client)
} else {
trace!("making new client");
let port = portpicker::pick_unused_port().context("failed to find available port")?;

let (process, runtime_client) = runtime::start(
is_next,
runtime::StorageManagerType::Artifacts(artifacts_path),
Expand Down
20 changes: 18 additions & 2 deletions runtime/src/axum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl Runtime for AxumWasm {
}
}
}

struct RouterBuilder {
engine: Engine,
linker: Linker<WasiCtx>,
Expand Down Expand Up @@ -409,15 +408,32 @@ async fn run_until_stopped(

#[cfg(test)]
pub mod tests {
use std::process::Command;

use super::*;
use hyper::{http::HeaderValue, Method, Request, StatusCode, Version};
use uuid::Uuid;

// Compile axum wasm module
fn compile_module() {
Command::new("cargo")
.arg("build")
.arg("--target")
.arg("wasm32-wasi")
.current_dir("../tmp/axum-wasm-expanded")
.spawn()
.unwrap()
.wait()
.unwrap();
}

#[tokio::test]
async fn axum() {
compile_module();

let router = RouterBuilder::new()
.unwrap()
.src("axum.wasm")
.src("../tmp/axum-wasm-expanded/target/wasm32-wasi/debug/shuttle_axum_expanded.wasm")
.build()
.unwrap();
let id = Uuid::default().as_bytes().to_vec();
Expand Down
Loading