Skip to content

Commit

Permalink
feat: gateway admin revive (#412)
Browse files Browse the repository at this point in the history
* feat: gateway admin command (revive)

* fmt

* clippy

* refactor: revive deployers using GatewayService

* tests: add ContextArgs

* refactor: simplify passing around of fqdn

* tests: update test archive

* refactor: remove stray exec.rs file

* refactor: unused is_error()

Co-authored-by: chesedo <[email protected]>
  • Loading branch information
brokad and chesedo authored Oct 20, 2022
1 parent e676715 commit 6e771c7
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 77 deletions.
9 changes: 3 additions & 6 deletions gateway/src/api/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ pub mod tests {
#[tokio::test]
async fn api_create_get_delete_projects() -> anyhow::Result<()> {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
tokio::spawn(async move {
Expand Down Expand Up @@ -326,8 +325,7 @@ pub mod tests {
#[tokio::test]
async fn api_create_get_users() -> anyhow::Result<()> {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(256);
tokio::spawn(async move {
Expand Down Expand Up @@ -416,8 +414,7 @@ pub mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn status() {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);

let (sender, mut receiver) = channel::<Work>(1);
let (ctl_send, ctl_recv) = oneshot::channel();
Expand Down
46 changes: 33 additions & 13 deletions gateway/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Args {
pub enum Commands {
Start(StartArgs),
Init(InitArgs),
Exec(ExecCmds),
}

#[derive(clap::Args, Debug, Clone)]
Expand All @@ -29,6 +30,35 @@ pub struct StartArgs {
/// Address to bind the user plane to
#[arg(long, default_value = "127.0.0.1:8000")]
pub user: SocketAddr,
#[command(flatten)]
pub context: ContextArgs,
}

#[derive(clap::Args, Debug, Clone)]
pub struct InitArgs {
/// Name of initial account to create
#[arg(long)]
pub name: String,
/// Key to assign to initial account
#[arg(long)]
pub key: Option<Key>,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ExecCmds {
#[command(flatten)]
pub context: ContextArgs,
#[command(subcommand)]
pub command: ExecCmd,
}

#[derive(Subcommand, Debug, Clone)]
pub enum ExecCmd {
Revive,
}

#[derive(clap::Args, Debug, Clone)]
pub struct ContextArgs {
/// Default image to deploy user runtimes into
#[arg(long, default_value = "public.ecr.aws/shuttle/deployer:latest")]
pub image: String,
Expand All @@ -40,23 +70,13 @@ pub struct StartArgs {
/// the provisioner service
#[arg(long, default_value = "provisioner")]
pub provisioner_host: String,
/// The path to the docker daemon socket
#[arg(long, default_value = "/var/run/docker.sock")]
pub docker_host: String,
/// The Docker Network name in which to deploy user runtimes
#[arg(long, default_value = "shuttle_default")]
pub network_name: String,
/// FQDN where the proxy can be reached at
#[arg(long)]
pub proxy_fqdn: FQDN,
}

#[derive(clap::Args, Debug, Clone)]
pub struct InitArgs {
/// Name of initial account to create
#[arg(long)]
pub name: String,
/// Key to assign to initial account
#[arg(long)]
pub key: Option<Key>,
/// The path to the docker daemon socket
#[arg(long, default_value = "/var/run/docker.sock")]
pub docker_host: String,
}
38 changes: 19 additions & 19 deletions gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ pub mod tests {
use tracing::info;

use crate::api::make_api;
use crate::args::StartArgs;
use crate::args::{ContextArgs, StartArgs};
use crate::auth::User;
use crate::proxy::make_proxy;
use crate::service::{ContainerSettings, GatewayService, MIGRATIONS};
Expand Down Expand Up @@ -485,21 +485,18 @@ pub mod tests {
args: StartArgs,
hyper: HyperClient<HttpConnector, Body>,
pool: SqlitePool,
fqdn: String,
}

#[derive(Clone, Copy)]
pub struct WorldContext<'c> {
pub docker: &'c Docker,
pub container_settings: &'c ContainerSettings,
pub hyper: &'c HyperClient<HttpConnector, Body>,
pub fqdn: &'c str,
}

impl World {
pub async fn new() -> Self {
let docker = Docker::connect_with_local_defaults().unwrap();
let fqdn = "test.shuttleapp.rs".to_string();

docker
.list_images::<&str>(None)
Expand Down Expand Up @@ -529,17 +526,19 @@ pub mod tests {

let args = StartArgs {
control,
docker_host,
user,
image,
prefix,
provisioner_host,
network_name,
proxy_fqdn: FQDN::from_str(&fqdn).unwrap(),
context: ContextArgs {
docker_host,
image,
prefix,
provisioner_host,
network_name,
proxy_fqdn: FQDN::from_str("test.shuttleapp.rs").unwrap(),
},
};

let settings = ContainerSettings::builder(&docker, fqdn.clone())
.from_args(&args)
let settings = ContainerSettings::builder(&docker)
.from_args(&args.context)
.await;

let hyper = HyperClient::builder().build(HttpConnector::new());
Expand All @@ -553,12 +552,11 @@ pub mod tests {
args,
hyper,
pool,
fqdn,
}
}

pub fn args(&self) -> StartArgs {
self.args.clone()
pub fn args(&self) -> ContextArgs {
self.args.context.clone()
}

pub fn pool(&self) -> SqlitePool {
Expand All @@ -570,7 +568,11 @@ pub mod tests {
}

pub fn fqdn(&self) -> String {
self.fqdn.clone()
self.args()
.proxy_fqdn
.to_string()
.trim_end_matches('.')
.to_string()
}
}

Expand All @@ -580,7 +582,6 @@ pub mod tests {
docker: &self.docker,
container_settings: &self.settings,
hyper: &self.hyper,
fqdn: &self.fqdn,
}
}
}
Expand All @@ -598,8 +599,7 @@ pub mod tests {
#[tokio::test]
async fn end_to_end() {
let world = World::new().await;
let service =
Arc::new(GatewayService::init(world.args(), world.fqdn(), world.pool()).await);
let service = Arc::new(GatewayService::init(world.args(), world.pool()).await);
let worker = Worker::new(Arc::clone(&service));

let (log_out, mut log_in) = channel(256);
Expand Down
20 changes: 17 additions & 3 deletions gateway/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use clap::Parser;
use futures::prelude::*;
use shuttle_gateway::args::{Args, Commands, InitArgs};
use shuttle_gateway::args::{Args, Commands, ExecCmd, ExecCmds, InitArgs};
use shuttle_gateway::auth::Key;
use shuttle_gateway::proxy::make_proxy;
use shuttle_gateway::service::{GatewayService, MIGRATIONS};
use shuttle_gateway::worker::{Work, Worker};
use shuttle_gateway::{api::make_api, args::StartArgs};
use shuttle_gateway::{Refresh, Service};
use shuttle_gateway::{project, Refresh, Service};
use sqlx::migrate::MigrateDatabase;
use sqlx::{query, Sqlite, SqlitePool};
use std::io;
Expand Down Expand Up @@ -55,16 +55,18 @@ async fn main() -> io::Result<()> {
match args.command {
Commands::Start(start_args) => start(db, start_args).await,
Commands::Init(init_args) => init(db, init_args).await,
Commands::Exec(exec_cmd) => exec(db, exec_cmd).await,
}
}

async fn start(db: SqlitePool, args: StartArgs) -> io::Result<()> {
let fqdn = args
.context
.proxy_fqdn
.to_string()
.trim_end_matches('.')
.to_string();
let gateway = Arc::new(GatewayService::init(args.clone(), fqdn.clone(), db).await);
let gateway = Arc::new(GatewayService::init(args.context.clone(), db).await);

let worker = Worker::new(Arc::clone(&gateway));

Expand Down Expand Up @@ -146,3 +148,15 @@ async fn init(db: SqlitePool, args: InitArgs) -> io::Result<()> {
println!("`{}` created as super user with key: {key}", args.name);
Ok(())
}

async fn exec(db: SqlitePool, exec_cmd: ExecCmds) -> io::Result<()> {
let gateway = GatewayService::init(exec_cmd.context.clone(), db).await;

match exec_cmd.command {
ExecCmd::Revive => project::exec::revive(gateway)
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?,
};

Ok(())
}
56 changes: 56 additions & 0 deletions gateway/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,62 @@ impl<'c> State<'c> for ProjectError {
}
}

pub mod exec {
use bollard::service::ContainerState;

use crate::{
service::GatewayService,
worker::{do_work, Work},
};

use super::*;

pub async fn revive(gateway: GatewayService) -> Result<(), ProjectError> {
let mut mutations = Vec::new();

for Work {
project_name,
account_name,
work,
} in gateway
.iter_projects()
.await
.expect("could not list projects")
{
if let Project::Errored(ProjectError { ctx: Some(ctx), .. }) = work {
if let Some(container) = ctx.container() {
if let Ok(container) = gateway
.context()
.docker()
.inspect_container(safe_unwrap!(container.id), None)
.await
{
if let Some(ContainerState {
status: Some(ContainerStateStatusEnum::EXITED),
..
}) = container.state
{
mutations.push(Work {
project_name,
account_name,
work: Project::Stopped(ProjectStopped { container }),
});
}
}
}
}
}

for work in mutations {
debug!(?work, "project will be revived");

do_work(work, &gateway).await;
}

Ok(())
}
}

#[cfg(test)]
pub mod tests {

Expand Down
Loading

0 comments on commit 6e771c7

Please sign in to comment.