Skip to content

Commit

Permalink
bug: next runtime not sending stop signal (#728)
Browse files Browse the repository at this point in the history
* bug: next runtime not sending stop signal

* refactor: better unwrap
  • Loading branch information
chesedo authored Mar 20, 2023
1 parent ee04376 commit ba66b33
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 44 deletions.
79 changes: 49 additions & 30 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,25 @@ pub async fn task(
active_deployment_getter.clone(),
runtime_manager.clone(),
);
let cleanup = move |response: SubscribeStopResponse| {
let cleanup = move |response: Option<SubscribeStopResponse>| {
debug!(response = ?response, "stop client response: ");

match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => {
crashed_cleanup(&id, Error::Run(anyhow::Error::msg(response.message).into()))
if let Some(response) = response {
match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => crashed_cleanup(
&id,
Error::Run(anyhow::Error::msg(response.message).into()),
),
}
} else {
crashed_cleanup(
&id,
Error::Runtime(anyhow::anyhow!(
"stop subscribe channel stopped unexpectedly"
)),
)
}
};
let runtime_manager = runtime_manager.clone();
Expand Down Expand Up @@ -188,7 +198,7 @@ impl Built {
runtime_manager: Arc<Mutex<RuntimeManager>>,
deployment_updater: impl DeploymentUpdater,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) -> Result<()> {
// For alpha this is the path to the users project with an embedded runtime.
// For shuttle-next this is the path to the compiled .wasm file, which will be
Expand Down Expand Up @@ -343,7 +353,7 @@ async fn run(
mut runtime_client: RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
address: SocketAddr,
deployment_updater: impl DeploymentUpdater,
cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) {
deployment_updater
.set_address(&id, &address)
Expand All @@ -369,15 +379,15 @@ async fn run(
info!(response = ?response.into_inner(), "start client response: ");

// Wait for stop reason
let reason = stream.message().await.unwrap().unwrap();
let reason = stream.message().await.expect("message from tonic stream");

cleanup(reason);
}
Err(ref status) if status.code() == Code::InvalidArgument => {
cleanup(SubscribeStopResponse {
cleanup(Some(SubscribeStopResponse {
reason: StopReason::Crash as i32,
message: status.to_string(),
});
}));
}
Err(ref status) => {
start_crashed_cleanup(
Expand Down Expand Up @@ -534,12 +544,15 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to request"),
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to request"),
}
};

built
Expand Down Expand Up @@ -574,12 +587,15 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to self end"),
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to self end"),
}
};

built
Expand Down Expand Up @@ -611,14 +627,17 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Crash, mes) if mes.contains("panic in bind") => {
cleanup_send.send(()).unwrap()
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Crash, mes) if mes.contains("panic in bind") => {
cleanup_send.send(()).unwrap()
}
(_, mes) => panic!("expected stop due to crash: {mes}"),
}
(_, mes) => panic!("expected stop due to crash: {mes}"),
};

built
Expand Down
2 changes: 2 additions & 0 deletions deployer/src/runtime_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ impl RuntimeManager {
let stop_request = tonic::Request::new(StopRequest {});
let response = runtime_client.stop(stop_request).await.unwrap();

trace!(?response, "stop deployment response");

let result = response.into_inner().success;
let _ = process.start_kill();

Expand Down
6 changes: 4 additions & 2 deletions runtime/src/alpha/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tonic::{
Request, Response, Status,
};
use tower::ServiceBuilder;
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};

use crate::{provisioner_factory::ProvisionerFactory, Logger};

Expand Down Expand Up @@ -378,7 +378,9 @@ where

Ok(Response::new(StopResponse { success: true }))
} else {
Err(Status::internal("failed to stop deployment"))
warn!("failed to stop deployment");

Ok(tonic::Response::new(StopResponse { success: false }))
}
}

Expand Down
55 changes: 43 additions & 12 deletions runtime/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use hyper::{Body, Request, Response};
use shuttle_common::wasm::{Bytesable, Log, RequestWrapper, ResponseWrapper};
use shuttle_proto::runtime::runtime_server::Runtime;
use shuttle_proto::runtime::{
self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse,
SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse,
self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopReason, StopRequest,
StopResponse, SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{error, trace};
use tracing::{error, trace, warn};
use wasi_common::file::FileCaps;
use wasmtime::{Engine, Linker, Module, Store};
use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream;
Expand All @@ -45,6 +45,7 @@ pub struct AxumWasm {
logs_rx: Mutex<Option<Receiver<Result<runtime::LogItem, Status>>>>,
logs_tx: Sender<Result<runtime::LogItem, Status>>,
kill_tx: Mutex<Option<oneshot::Sender<String>>>,
stopped_tx: broadcast::Sender<(StopReason, String)>,
}

impl AxumWasm {
Expand All @@ -57,11 +58,14 @@ impl AxumWasm {
// seems acceptable so going with double the number for some headroom
let (tx, rx) = mpsc::channel(1 << 15);

let (stopped_tx, _stopped_rx) = broadcast::channel(10);

Self {
router: Mutex::new(None),
logs_rx: Mutex::new(Some(rx)),
logs_tx: tx,
kill_tx: Mutex::new(None),
stopped_tx,
}
}
}
Expand Down Expand Up @@ -122,7 +126,11 @@ impl Runtime for AxumWasm {
.context("tried to start a service that was not loaded")
.map_err(|err| Status::internal(err.to_string()))?;

tokio::spawn(run_until_stopped(router, address, logs_tx, kill_rx));
let stopped_tx = self.stopped_tx.clone();

tokio::spawn(run_until_stopped(
router, address, logs_tx, kill_rx, stopped_tx,
));

let message = StartResponse { success: true };

Expand Down Expand Up @@ -160,9 +168,9 @@ impl Runtime for AxumWasm {

Ok(tonic::Response::new(StopResponse { success: true }))
} else {
Err(Status::internal(
"trying to stop a service that was not started",
))
warn!("trying to stop a service that was not started");

Ok(tonic::Response::new(StopResponse { success: false }))
}
}

Expand All @@ -172,8 +180,21 @@ impl Runtime for AxumWasm {
&self,
_request: tonic::Request<SubscribeStopRequest>,
) -> Result<tonic::Response<Self::SubscribeStopStream>, Status> {
// Next does not really have a stopped state. Endpoints are loaded if and when needed until a request is done
let (_tx, rx) = mpsc::channel(1);
let mut stopped_rx = self.stopped_tx.subscribe();
let (tx, rx) = mpsc::channel(1);

// Move the stop channel into a stream to be returned
tokio::spawn(async move {
trace!("moved stop channel into thread");
while let Ok((reason, message)) = stopped_rx.recv().await {
tx.send(Ok(SubscribeStopResponse {
reason: reason as i32,
message,
}))
.await
.unwrap();
}
});

Ok(tonic::Response::new(ReceiverStream::new(rx)))
}
Expand Down Expand Up @@ -354,6 +375,7 @@ async fn run_until_stopped(
address: SocketAddr,
logs_tx: Sender<Result<runtime::LogItem, Status>>,
kill_rx: tokio::sync::oneshot::Receiver<String>,
stopped_tx: broadcast::Sender<(StopReason, String)>,
) {
let make_service = make_service_fn(move |_conn| {
let router = router.clone();
Expand Down Expand Up @@ -383,12 +405,21 @@ async fn run_until_stopped(
trace!("starting hyper server on: {}", &address);
tokio::select! {
_ = server => {
stopped_tx.send((StopReason::End, String::new())).unwrap();
trace!("axum wasm server stopped");
},
message = kill_rx => {
match message {
Ok(msg) => trace!("{msg}"),
Err(_) => trace!("the sender dropped")
Ok(msg) =>{
stopped_tx.send((StopReason::Request, String::new())).unwrap();
trace!("{msg}")
} ,
Err(_) => {
stopped_tx
.send((StopReason::Crash, "the kill sender dropped".to_string()))
.unwrap();
trace!("the sender dropped")
}
}
}
};
Expand Down

0 comments on commit ba66b33

Please sign in to comment.