diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index df76bef9a..9b1fca9a8 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -40,6 +40,8 @@ pub async fn start( .queue_client(GatewayClient::new(args.gateway_uri)) .build(); + persistence.cleanup_invalid_states().await.unwrap(); + let runnable_deployments = persistence.get_all_runnable_deployments().await.unwrap(); info!(count = %runnable_deployments.len(), "enqueuing runnable deployments"); for existing_deployment in runnable_deployments { diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index a19e7b514..31da58106 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -193,6 +193,20 @@ impl Persistence { .map_err(Error::from) } + // Clean up all invalid states inside persistence + pub async fn cleanup_invalid_states(&self) -> Result<()> { + sqlx::query("UPDATE deployments SET state = ? WHERE state IN(?, ?, ?, ?)") + .bind(State::Stopped) + .bind(State::Queued) + .bind(State::Built) + .bind(State::Building) + .bind(State::Loading) + .execute(&self.pool) + .await?; + + Ok(()) + } + pub async fn get_or_create_service(&self, name: &str) -> Result { if let Some(service) = self.get_service_by_name(name).await? { Ok(service) @@ -553,6 +567,108 @@ mod tests { ); } + // Test that we are correctly cleaning up any stale / unexpected states for a deployment + // The reason this does not clean up two (or more) running states for a single deployment is because + // it should theoretically be impossible for a service to have two deployments in the running state. + // And even if a service where to have this, then the start ups of these deployments (more specifically + // the last deployment that is starting up) will stop all the deployments correctly. + #[tokio::test(flavor = "multi_thread")] + async fn cleanup_invalid_states() { + let (p, _) = Persistence::new_in_memory().await; + + let service_id = add_service(&p.pool).await.unwrap(); + + let queued_id = Uuid::new_v4(); + let building_id = Uuid::new_v4(); + let built_id = Uuid::new_v4(); + let loading_id = Uuid::new_v4(); + + let deployment_crashed = Deployment { + id: Uuid::new_v4(), + service_id, + state: State::Crashed, + last_update: Utc::now(), + address: None, + }; + let deployment_stopped = Deployment { + id: Uuid::new_v4(), + service_id, + state: State::Stopped, + last_update: Utc::now(), + address: None, + }; + let deployment_running = Deployment { + id: Uuid::new_v4(), + service_id, + state: State::Running, + last_update: Utc::now(), + address: Some(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9876)), + }; + let deployment_queued = Deployment { + id: queued_id, + service_id, + state: State::Queued, + last_update: Utc::now(), + address: None, + }; + let deployment_building = Deployment { + id: building_id, + service_id, + state: State::Building, + last_update: Utc::now(), + address: None, + }; + let deployment_built = Deployment { + id: built_id, + service_id, + state: State::Built, + last_update: Utc::now(), + address: None, + }; + let deployment_loading = Deployment { + id: loading_id, + service_id, + state: State::Loading, + last_update: Utc::now(), + address: None, + }; + + for deployment in [ + &deployment_crashed, + &deployment_stopped, + &deployment_running, + &deployment_queued, + &deployment_built, + &deployment_building, + &deployment_loading, + ] { + p.insert_deployment(deployment.clone()).await.unwrap(); + } + + p.cleanup_invalid_states().await.unwrap(); + + let actual: Vec<_> = p + .get_deployments(&service_id) + .await + .unwrap() + .into_iter() + .map(|deployment| (deployment.id, deployment.state)) + .collect(); + let expected = vec![ + (deployment_crashed.id, State::Crashed), + (deployment_stopped.id, State::Stopped), + (deployment_running.id, State::Running), + (queued_id, State::Stopped), + (built_id, State::Stopped), + (building_id, State::Stopped), + (loading_id, State::Stopped), + ]; + + assert_eq!( + actual, expected, + "invalid states should be moved to the stopped state" + ); + } #[tokio::test(flavor = "multi_thread")] async fn fetching_runnable_deployments() { let (p, _) = Persistence::new_in_memory().await;