diff --git a/Cargo.lock b/Cargo.lock index e556fcf2b..b065cb0f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5754,7 +5754,6 @@ dependencies = [ "opentelemetry", "opentelemetry-datadog", "pipe", - "portpicker", "rand 0.8.5", "serde", "serde_json", diff --git a/Makefile b/Makefile index ac0d31cdc..aa4cb75cc 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ POSTGRES_TAG?=latest RUST_LOG?=debug -DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=latest APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD) +DOCKER_COMPOSE_ENV=STACK=$(STACK) BACKEND_TAG=$(TAG) PROVISIONER_TAG=$(TAG) POSTGRES_TAG=14 APPS_FQDN=$(APPS_FQDN) DB_FQDN=$(DB_FQDN) POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) RUST_LOG=$(RUST_LOG) CONTAINER_REGISTRY=$(CONTAINER_REGISTRY) MONGO_INITDB_ROOT_USERNAME=$(MONGO_INITDB_ROOT_USERNAME) MONGO_INITDB_ROOT_PASSWORD=$(MONGO_INITDB_ROOT_PASSWORD) .PHONY: images clean src up down deploy shuttle-% postgres docker-compose.rendered.yml test diff --git a/deployer/Cargo.toml b/deployer/Cargo.toml index fa6f90edb..d7c575d17 100644 --- a/deployer/Cargo.toml +++ b/deployer/Cargo.toml @@ -24,14 +24,13 @@ once_cell = "1.14.0" opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } opentelemetry-datadog = { version = "0.5.0", features = ["reqwest-client"] } pipe = "0.4.0" -portpicker = "0.1.1" serde = "1.0.137" serde_json = "1.0.81" sqlx = { version = "0.6.0", features = ["runtime-tokio-native-tls", "sqlite", "chrono", "json", "migrate", "uuid"] } strum = { version = "0.24.1", features = ["derive"] } tar = "0.4.38" thiserror = "1.0.24" -tokio = { version = "1.19.2", features = ["fs"] } +tokio = { version = "1.19.2", features = ["fs", "process"] } toml = "0.5.9" tonic = "0.8.2" tower = { version = "0.4.12", features = ["make"] } diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index d6c01b0b2..429ab99d6 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -455,11 +455,7 @@ mod tests { impl provisioner_factory::AbstractFactory for StubAbstractProvisionerFactory { type Output = StubProvisionerFactory; - fn get_factory( - &self, - _project_name: shuttle_common::project::ProjectName, - _service_id: Uuid, - ) -> Self::Output { + fn get_factory(&self) -> Self::Output { StubProvisionerFactory } } diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index d5c804b3f..7ab74a557 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -5,14 +5,12 @@ use std::{ }; use async_trait::async_trait; -use portpicker::pick_unused_port; use shuttle_common::project::ProjectName as ServiceName; -use shuttle_service::{ - loader::{LoadedService, Loader}, - Factory, Logger, -}; +use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest}; + +use shuttle_service::{Factory, Logger}; use tokio::task::JoinError; -use tracing::{debug, error, info, instrument, trace}; +use tracing::{error, info, instrument, trace}; use uuid::Uuid; use super::{provisioner_factory, runtime_logger, KillReceiver, KillSender, RunReceiver, State}; @@ -42,18 +40,9 @@ pub async fn task( let kill_send = kill_send.clone(); let kill_recv = kill_send.subscribe(); - let port = match pick_unused_port() { - Some(port) => port, - None => { - start_crashed_cleanup( - &id, - Error::PrepareLoad( - "could not find a free port to deploy service on".to_string(), - ), - ); - continue; - } - }; + // todo: this is the port the legacy runtime is hardcoded to start services on + let port = 7001; + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); let _service_name = match ServiceName::from_str(&built.service_name) { Ok(name) => name, @@ -174,81 +163,77 @@ pub struct Built { } impl Built { - #[instrument(name = "built_handle", skip(self, libs_path, factory, logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(name = "built_handle", skip(self, libs_path, _factory, _logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, address: SocketAddr, libs_path: PathBuf, - factory: &mut dyn Factory, - logger: Logger, + _factory: &mut dyn Factory, + _logger: Logger, kill_recv: KillReceiver, kill_old_deployments: impl futures::Future>, cleanup: impl FnOnce(std::result::Result, JoinError>) + Send + 'static, ) -> Result<()> { - let service = load_deployment(&self.id, address, libs_path, factory, logger).await?; - + // todo: refactor this? kill_old_deployments.await?; info!("got handle for deployment"); // Execute loaded service - tokio::spawn(run(self.id, service, address, kill_recv, cleanup)); + tokio::spawn(run( + self.id, + self.service_name, + libs_path, + address, + kill_recv, + cleanup, + )); Ok(()) } } -#[instrument(skip(service, kill_recv, cleanup), fields(address = %_address, state = %State::Running))] +#[instrument(skip(_kill_recv, _cleanup), fields(address = %_address, state = %State::Running))] async fn run( id: Uuid, - service: LoadedService, + service_name: String, + libs_path: PathBuf, _address: SocketAddr, - mut kill_recv: KillReceiver, - cleanup: impl FnOnce(std::result::Result, JoinError>) + _kill_recv: KillReceiver, + _cleanup: impl FnOnce(std::result::Result, JoinError>) + Send + 'static, ) { - info!("starting up service"); - let (mut handle, library) = service; - let result; - loop { - tokio::select! { - Ok(kill_id) = kill_recv.recv() => { - if kill_id == id { - debug!("deployment '{id}' killed"); - handle.abort(); - result = handle.await; - break; - } - } - rsl = &mut handle => { - result = rsl; - break; - } - } - } + info!("starting up deployer grpc client"); + let mut client = RuntimeClient::connect("http://127.0.0.1:6001") + .await + .unwrap(); - if let Err(err) = library.close() { - crashed_cleanup(&id, err); - } else { - cleanup(result); - } -} + info!( + "loading project from: {}", + libs_path.clone().into_os_string().into_string().unwrap() + ); -#[instrument(skip(id, addr, libs_path, factory, logger))] -async fn load_deployment( - id: &Uuid, - addr: SocketAddr, - libs_path: PathBuf, - factory: &mut dyn Factory, - logger: Logger, -) -> Result { let so_path = libs_path.join(id.to_string()); - let loader = Loader::from_so_file(so_path)?; + let load_request = tonic::Request::new(LoadRequest { + path: so_path.into_os_string().into_string().unwrap(), + service_name: service_name.clone(), + }); + info!("loading service"); + let response = client.load(load_request).await; + + if let Err(e) = response { + info!("failed to load service: {}", e); + } + + let start_request = tonic::Request::new(StartRequest { service_name }); + + info!("starting service"); + let response = client.start(start_request).await.unwrap(); - Ok(loader.load(factory, addr, logger).await?) + info!(response = ?response, "client response: "); } #[cfg(test)] diff --git a/deployer/src/error.rs b/deployer/src/error.rs index 9a0fd8c29..f7d3ecd45 100644 --- a/deployer/src/error.rs +++ b/deployer/src/error.rs @@ -14,8 +14,6 @@ pub enum Error { InputOutput(#[from] io::Error), #[error("Build error: {0}")] Build(#[source] Box), - #[error("Prepare to load error: {0}")] - PrepareLoad(String), #[error("Load error: {0}")] Load(#[from] LoaderError), #[error("Run error: {0}")] diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 30ee4528d..f1b325301 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, net::SocketAddr}; +use std::{convert::Infallible, env, net::SocketAddr, path::PathBuf}; pub use args::Args; pub use deployment::{ @@ -13,6 +13,7 @@ use hyper::{ }; pub use persistence::Persistence; use proxy::AddressGetter; +use tokio::select; use tracing::{error, info}; mod args; @@ -54,12 +55,27 @@ pub async fn start( ); let make_service = router.into_make_service(); - info!("Binding to and listening at address: {}", args.api_address); + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .to_path_buf(); - axum::Server::bind(&args.api_address) - .serve(make_service) - .await - .unwrap_or_else(|_| panic!("Failed to bind to address: {}", args.api_address)); + let runtime_dir = workspace_root.join("target/debug"); + + let mut runtime = tokio::process::Command::new(runtime_dir.join("shuttle-runtime")) + .args(&["--legacy", "--provisioner-address", "http://localhost:8000"]) + .current_dir(&runtime_dir) + .spawn() + .unwrap(); + + select! { + _ = runtime.wait() => { + info!("Legacy runtime stopped.") + }, + _ = axum::Server::bind(&args.api_address).serve(make_service) => { + info!("Handlers server stopped serving addr: {}", &args.api_address); + }, + } } pub async fn start_proxy( diff --git a/proto/runtime.proto b/proto/runtime.proto index d26e0c6c8..b9c630556 100644 --- a/proto/runtime.proto +++ b/proto/runtime.proto @@ -31,7 +31,8 @@ message StartResponse { // Was the start successful bool success = 1; + // todo: find a way to add optional flag here // Optional port the service was started on // This is likely to be None for bots - optional uint32 port = 2; + uint32 port = 2; } diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 6dff3b13e..49f39a14a 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,8 +1,8 @@ -pub mod provisioner { - // This clippy is disabled as per this prost comment - // https://github.com/tokio-rs/prost/issues/661#issuecomment-1156606409 - #![allow(clippy::derive_partial_eq_without_eq)] +// This clippy is disabled as per this prost comment +// https://github.com/tokio-rs/prost/issues/661#issuecomment-1156606409 +#![allow(clippy::derive_partial_eq_without_eq)] +pub mod provisioner { use std::fmt::Display; use shuttle_common::{ diff --git a/runtime/README.md b/runtime/README.md index 843951e75..af60bc7cf 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -21,12 +21,21 @@ To test, first start a provisioner from the root directory using: docker-compose -f docker-compose.rendered.yml up provisioner ``` -Then in another shell, start the runtime using: +Then in another shell, start the runtime using the clap CLI: ```bash cargo run -- --legacy --provisioner-address http://localhost:8000 ``` +Or directly (this is the path hardcoded in `deployer::start`): +```bash +# first, make sure the shuttle-runtime binary is built +cargo build +# then +/home//target/debug/shuttle-runtime --legacy --provisioner-address http://localhost:8000 +``` + +Pass the path to `deployer::start` Then in another shell, load a `.so` file and start it up: ``` bash diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index 89a688f87..1316f2da4 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -58,8 +58,8 @@ impl Runtime for Legacy { &self, request: Request, ) -> Result, Status> { - let port = 8001; - let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); + let service_port = 7001; + let service_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), service_port); let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone()) .await @@ -84,18 +84,18 @@ impl Runtime for Legacy { .map_err(|err| Status::from_error(Box::new(err)))? .clone(); - trace!(%address, "starting"); - let service = load_service(address, so_path, &mut factory, logger) + trace!(%service_address, "starting"); + let service = load_service(service_address, so_path, &mut factory, logger) .await .unwrap(); - _ = tokio::spawn(run(service, address)); + _ = tokio::spawn(run(service, service_address)); - *self.port.lock().unwrap() = Some(port); + *self.port.lock().unwrap() = Some(service_port); let message = StartResponse { success: true, - port: Some(port as u32), + port: service_port as u32, }; Ok(Response::new(message)) diff --git a/runtime/src/main.rs b/runtime/src/main.rs index b287a7239..c11ebcdda 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -23,7 +23,7 @@ async fn main() { trace!(args = ?args, "parsed args"); - let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000); + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001); let provisioner_address = args.provisioner_address; diff --git a/runtime/src/next/mod.rs b/runtime/src/next/mod.rs index b6f15c671..86d185c43 100644 --- a/runtime/src/next/mod.rs +++ b/runtime/src/next/mod.rs @@ -63,7 +63,8 @@ impl Runtime for Next { let message = StartResponse { success: true, - port: None, + // todo: port set here until I can set the port field to optional in the protobuf + port: 8001, }; Ok(Response::new(message))