Skip to content

Commit

Permalink
fix: gateway state drifts, health checks and project recreation (#447)
Browse files Browse the repository at this point in the history
  • Loading branch information
brokad authored Nov 3, 2022
1 parent bd0c381 commit 9d5e345
Show file tree
Hide file tree
Showing 9 changed files with 930 additions and 548 deletions.
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tower-http = { version = "0.3.4", features = ["trace"] }
tracing = "0.1.35"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }
uuid = { version = "1.2.1", features = [ "v4" ] }

[dependencies.shuttle-common]
version = "0.7.2"
Expand Down
56 changes: 35 additions & 21 deletions gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tower_http::trace::TraceLayer;
use tracing::{debug, debug_span, field, Span};

use crate::auth::{Admin, ScopedUser, User};
use crate::worker::Work;
use crate::task::{self, BoxedTask};
use crate::{AccountName, Error, GatewayService, ProjectName};

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -79,39 +79,52 @@ async fn get_project(

async fn post_project(
Extension(service): Extension<Arc<GatewayService>>,
Extension(sender): Extension<Sender<Work>>,
Extension(sender): Extension<Sender<BoxedTask>>,
User { name, .. }: User,
Path(project): Path<ProjectName>,
) -> Result<AxumJson<project::Response>, Error> {
let work = service.create_project(project.clone(), name).await?;
let state = service
.create_project(project.clone(), name.clone())
.await?;

let name = work.project_name.to_string();
let state = work.work.clone().into();
service
.new_task()
.project(project.clone())
.account(name.clone())
.send(&sender)
.await?;

sender.send(work).await?;

let response = project::Response { name, state };
let response = project::Response {
name: project.to_string(),
state: state.into(),
};

Ok(AxumJson(response))
}

async fn delete_project(
Extension(service): Extension<Arc<GatewayService>>,
Extension(sender): Extension<Sender<Work>>,
Extension(sender): Extension<Sender<BoxedTask>>,
ScopedUser {
scope: _,
user: User { name, .. },
}: ScopedUser,
Path(project): Path<ProjectName>,
) -> Result<AxumJson<project::Response>, Error> {
let work = service.destroy_project(project, name).await?;

let name = work.project_name.to_string();
let state = work.work.clone().into();
let project_name = project.clone();

sender.send(work).await?;
service
.new_task()
.project(project)
.account(name)
.and_then(task::destroy())
.send(&sender)
.await?;

let response = project::Response { name, state };
let response = project::Response {
name: project_name.to_string(),
state: shuttle_common::models::project::State::Destroying,
};
Ok(AxumJson(response))
}

Expand All @@ -123,7 +136,7 @@ async fn route_project(
service.route(&scope, req).await
}

async fn get_status(Extension(sender): Extension<Sender<Work>>) -> Response<Body> {
async fn get_status(Extension(sender): Extension<Sender<BoxedTask>>) -> Response<Body> {
let (status, body) = if !sender.is_closed() && sender.capacity() > 0 {
(StatusCode::OK, StatusResponse::healthy())
} else {
Expand All @@ -140,8 +153,9 @@ async fn get_status(Extension(sender): Extension<Sender<Work>>) -> Response<Body
.unwrap()
}

pub fn make_api(service: Arc<GatewayService>, sender: Sender<Work>) -> Router<Body> {
pub fn make_api(service: Arc<GatewayService>, sender: Sender<BoxedTask>) -> Router<Body> {
debug!("making api route");

Router::<Body>::new()
.route(
"/",
Expand Down Expand Up @@ -185,14 +199,13 @@ pub mod tests {
use super::*;
use crate::service::GatewayService;
use crate::tests::{RequestBuilderExt, World};
use crate::worker::Work;

#[tokio::test]
async fn api_create_get_delete_projects() -> anyhow::Result<()> {
let world = World::new().await;
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
let (sender, mut receiver) = channel::<BoxedTask>(256);
tokio::spawn(async move {
while receiver.recv().await.is_some() {
// do not do any work with inbound requests
Expand Down Expand Up @@ -327,7 +340,7 @@ pub mod tests {
let world = World::new().await;
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
let (sender, mut receiver) = channel::<BoxedTask>(256);
tokio::spawn(async move {
while receiver.recv().await.is_some() {
// do not do any work with inbound requests
Expand Down Expand Up @@ -416,7 +429,7 @@ pub mod tests {
let world = World::new().await;
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(1);
let (sender, mut receiver) = channel::<BoxedTask>(1);
let (ctl_send, ctl_recv) = oneshot::channel();
let (done_send, done_recv) = oneshot::channel();
let worker = tokio::spawn(async move {
Expand Down Expand Up @@ -468,6 +481,7 @@ pub mod tests {
assert_eq!(resp.status(), StatusCode::OK);

worker.abort();
let _ = worker.await;

let resp = router.call(get_status()).await.unwrap();
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
Expand Down
Loading

0 comments on commit 9d5e345

Please sign in to comment.