Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: next runtime not sending stop signal #728

Merged
merged 2 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 49 additions & 30 deletions deployer/src/deployment/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,25 @@ pub async fn task(
active_deployment_getter.clone(),
runtime_manager.clone(),
);
let cleanup = move |response: SubscribeStopResponse| {
let cleanup = move |response: Option<SubscribeStopResponse>| {
debug!(response = ?response, "stop client response: ");

match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => {
crashed_cleanup(&id, Error::Run(anyhow::Error::msg(response.message).into()))
if let Some(response) = response {
match StopReason::from_i32(response.reason).unwrap_or_default() {
StopReason::Request => stopped_cleanup(&id),
StopReason::End => completed_cleanup(&id),
StopReason::Crash => crashed_cleanup(
&id,
Error::Run(anyhow::Error::msg(response.message).into()),
),
}
} else {
crashed_cleanup(
&id,
Error::Runtime(anyhow::anyhow!(
"stop subscribe channel stopped unexpectedly"
)),
)
}
};
let runtime_manager = runtime_manager.clone();
Expand Down Expand Up @@ -188,7 +198,7 @@ impl Built {
runtime_manager: Arc<Mutex<RuntimeManager>>,
deployment_updater: impl DeploymentUpdater,
kill_old_deployments: impl futures::Future<Output = Result<()>>,
cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) -> Result<()> {
// For alpha this is the path to the users project with an embedded runtime.
// For shuttle-next this is the path to the compiled .wasm file, which will be
Expand Down Expand Up @@ -343,7 +353,7 @@ async fn run(
mut runtime_client: RuntimeClient<ClaimService<InjectPropagation<Channel>>>,
address: SocketAddr,
deployment_updater: impl DeploymentUpdater,
cleanup: impl FnOnce(SubscribeStopResponse) + Send + 'static,
cleanup: impl FnOnce(Option<SubscribeStopResponse>) + Send + 'static,
) {
deployment_updater
.set_address(&id, &address)
Expand All @@ -369,15 +379,15 @@ async fn run(
info!(response = ?response.into_inner(), "start client response: ");

// Wait for stop reason
let reason = stream.message().await.unwrap().unwrap();
let reason = stream.message().await.unwrap();
chesedo marked this conversation as resolved.
Show resolved Hide resolved

cleanup(reason);
}
Err(ref status) if status.code() == Code::InvalidArgument => {
cleanup(SubscribeStopResponse {
cleanup(Some(SubscribeStopResponse {
reason: StopReason::Crash as i32,
message: status.to_string(),
});
}));
}
Err(ref status) => {
start_crashed_cleanup(
Expand Down Expand Up @@ -534,12 +544,15 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to request"),
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Request, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to request"),
}
};

built
Expand Down Expand Up @@ -574,12 +587,15 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to self end"),
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::End, mes) if mes.is_empty() => cleanup_send.send(()).unwrap(),
_ => panic!("expected stop due to self end"),
}
};

built
Expand Down Expand Up @@ -611,14 +627,17 @@ mod tests {
let runtime_manager = get_runtime_manager();
let (cleanup_send, cleanup_recv) = oneshot::channel();

let handle_cleanup = |response: SubscribeStopResponse| match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Crash, mes) if mes.contains("panic in bind") => {
cleanup_send.send(()).unwrap()
let handle_cleanup = |response: Option<SubscribeStopResponse>| {
let response = response.unwrap();
match (
StopReason::from_i32(response.reason).unwrap(),
response.message,
) {
(StopReason::Crash, mes) if mes.contains("panic in bind") => {
cleanup_send.send(()).unwrap()
}
(_, mes) => panic!("expected stop due to crash: {mes}"),
}
(_, mes) => panic!("expected stop due to crash: {mes}"),
};

built
Expand Down
2 changes: 2 additions & 0 deletions deployer/src/runtime_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ impl RuntimeManager {
let stop_request = tonic::Request::new(StopRequest {});
let response = runtime_client.stop(stop_request).await.unwrap();

trace!(?response, "stop deployment response");

let result = response.into_inner().success;
let _ = process.start_kill();

Expand Down
6 changes: 4 additions & 2 deletions runtime/src/alpha/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use tonic::{
Request, Response, Status,
};
use tower::ServiceBuilder;
use tracing::{error, info, trace};
use tracing::{error, info, trace, warn};

use crate::{provisioner_factory::ProvisionerFactory, Logger};

Expand Down Expand Up @@ -378,7 +378,9 @@ where

Ok(Response::new(StopResponse { success: true }))
} else {
Err(Status::internal("failed to stop deployment"))
warn!("failed to stop deployment");

Ok(tonic::Response::new(StopResponse { success: false }))
}
}

Expand Down
55 changes: 43 additions & 12 deletions runtime/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use hyper::{Body, Request, Response};
use shuttle_common::wasm::{Bytesable, Log, RequestWrapper, ResponseWrapper};
use shuttle_proto::runtime::runtime_server::Runtime;
use shuttle_proto::runtime::{
self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse,
SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse,
self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopReason, StopRequest,
StopResponse, SubscribeLogsRequest, SubscribeStopRequest, SubscribeStopResponse,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{error, trace};
use tracing::{error, trace, warn};
use wasi_common::file::FileCaps;
use wasmtime::{Engine, Linker, Module, Store};
use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream;
Expand All @@ -45,6 +45,7 @@ pub struct AxumWasm {
logs_rx: Mutex<Option<Receiver<Result<runtime::LogItem, Status>>>>,
logs_tx: Sender<Result<runtime::LogItem, Status>>,
kill_tx: Mutex<Option<oneshot::Sender<String>>>,
stopped_tx: broadcast::Sender<(StopReason, String)>,
}

impl AxumWasm {
Expand All @@ -57,11 +58,14 @@ impl AxumWasm {
// seems acceptable so going with double the number for some headroom
let (tx, rx) = mpsc::channel(1 << 15);

let (stopped_tx, _stopped_rx) = broadcast::channel(10);

Self {
router: Mutex::new(None),
logs_rx: Mutex::new(Some(rx)),
logs_tx: tx,
kill_tx: Mutex::new(None),
stopped_tx,
}
}
}
Expand Down Expand Up @@ -122,7 +126,11 @@ impl Runtime for AxumWasm {
.context("tried to start a service that was not loaded")
.map_err(|err| Status::internal(err.to_string()))?;

tokio::spawn(run_until_stopped(router, address, logs_tx, kill_rx));
let stopped_tx = self.stopped_tx.clone();

tokio::spawn(run_until_stopped(
router, address, logs_tx, kill_rx, stopped_tx,
));

let message = StartResponse { success: true };

Expand Down Expand Up @@ -160,9 +168,9 @@ impl Runtime for AxumWasm {

Ok(tonic::Response::new(StopResponse { success: true }))
} else {
Err(Status::internal(
"trying to stop a service that was not started",
))
warn!("trying to stop a service that was not started");

Ok(tonic::Response::new(StopResponse { success: false }))
}
}

Expand All @@ -172,8 +180,21 @@ impl Runtime for AxumWasm {
&self,
_request: tonic::Request<SubscribeStopRequest>,
) -> Result<tonic::Response<Self::SubscribeStopStream>, Status> {
// Next does not really have a stopped state. Endpoints are loaded if and when needed until a request is done
let (_tx, rx) = mpsc::channel(1);
let mut stopped_rx = self.stopped_tx.subscribe();
let (tx, rx) = mpsc::channel(1);

// Move the stop channel into a stream to be returned
tokio::spawn(async move {
trace!("moved stop channel into thread");
while let Ok((reason, message)) = stopped_rx.recv().await {
tx.send(Ok(SubscribeStopResponse {
reason: reason as i32,
message,
}))
.await
.unwrap();
}
});

Ok(tonic::Response::new(ReceiverStream::new(rx)))
}
Expand Down Expand Up @@ -354,6 +375,7 @@ async fn run_until_stopped(
address: SocketAddr,
logs_tx: Sender<Result<runtime::LogItem, Status>>,
kill_rx: tokio::sync::oneshot::Receiver<String>,
stopped_tx: broadcast::Sender<(StopReason, String)>,
) {
let make_service = make_service_fn(move |_conn| {
let router = router.clone();
Expand Down Expand Up @@ -383,12 +405,21 @@ async fn run_until_stopped(
trace!("starting hyper server on: {}", &address);
tokio::select! {
_ = server => {
stopped_tx.send((StopReason::End, String::new())).unwrap();
trace!("axum wasm server stopped");
},
message = kill_rx => {
match message {
Ok(msg) => trace!("{msg}"),
Err(_) => trace!("the sender dropped")
Ok(msg) =>{
stopped_tx.send((StopReason::Request, String::new())).unwrap();
trace!("{msg}")
} ,
Err(_) => {
stopped_tx
.send((StopReason::Crash, "the kill sender dropped".to_string()))
.unwrap();
trace!("the sender dropped")
}
}
}
};
Expand Down