diff --git a/Cargo.lock b/Cargo.lock index cb8b4de40..25def720b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6415,8 +6415,10 @@ dependencies = [ "futures", "hyper", "rmp-serde", + "rocket", "shuttle-common", "shuttle-proto", + "shuttle-secrets", "shuttle-service", "thiserror", "tokio", @@ -6436,7 +6438,6 @@ version = "0.8.0" dependencies = [ "async-trait", "shuttle-service", - "tokio", ] [[package]] diff --git a/proto/runtime.proto b/proto/runtime.proto index 8faab7589..567a3fa58 100644 --- a/proto/runtime.proto +++ b/proto/runtime.proto @@ -36,8 +36,6 @@ message LoadResponse { message StartRequest { // Id to associate with the deployment being started bytes deployment_id = 1; - // Name of service to start - string service_name = 2; // Address and port to start the service on string ip = 3; } @@ -47,12 +45,7 @@ message StartResponse { bool success = 1; } -message StopRequest { - // Id to associate with the deployment being stopped - bytes deployment_id = 1; - // Name of service to stop - string service_name = 2; -} +message StopRequest {} message StopResponse { // Was the stop successful diff --git a/resources/secrets/Cargo.toml b/resources/secrets/Cargo.toml index 13455707e..83c107c1c 100644 --- a/resources/secrets/Cargo.toml +++ b/resources/secrets/Cargo.toml @@ -10,4 +10,3 @@ keywords = ["shuttle-service", "secrets"] [dependencies] async-trait = "0.1.56" shuttle-service = { path = "../../service", version = "0.8.0", default-features = false } -tokio = { version = "1.19.2", features = ["rt"] } diff --git a/resources/secrets/src/lib.rs b/resources/secrets/src/lib.rs index 27ab12ff8..856ea0593 100644 --- a/resources/secrets/src/lib.rs +++ b/resources/secrets/src/lib.rs @@ -3,7 +3,6 @@ use std::collections::BTreeMap; use async_trait::async_trait; use shuttle_service::{Error, Factory, ResourceBuilder}; -use tokio::runtime::Runtime; pub struct Secrets; @@ -14,11 +13,7 @@ impl ResourceBuilder for Secrets { Self {} } - async fn build( - self, - factory: &mut dyn Factory, - _runtime: &Runtime, - ) -> Result { + async fn build(self, factory: &mut dyn Factory) -> Result { let secrets = factory.get_secrets().await?; Ok(SecretStore { secrets }) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index d57b96596..cfa96cd80 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -6,6 +6,9 @@ license.workspace = true publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[[bin]] +name = "rocket" + [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } @@ -25,6 +28,11 @@ wasmtime = "4.0.0" wasmtime-wasi = "4.0.0" futures = "0.3.25" +# For rocket.rs +# TODO: remove +shuttle-secrets = { path = "../resources/secrets" } +rocket = "0.5.0-rc.2" + [dependencies.shuttle-common] workspace = true features = ["wasm"] @@ -34,4 +42,4 @@ workspace = true [dependencies.shuttle-service] workspace = true -features = ["loader"] +features = ["loader", "web-rocket"] # TODO: remove web-rocket diff --git a/runtime/README.md b/runtime/README.md index a3b0c40fb..5e1f1d910 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -57,8 +57,8 @@ curl localhost:7002/goodbye ``` ## shuttle-legacy - -Load and run an .so library that implements `shuttle_service::Service`. +This will no loger load a `.so` will the code to start the runtime will be codegened for all services. +An example can be found in `src/bin/rocket.rs` which contains the secrets rocket example at the bottom and the codegen at the top. To test, first start a provisioner from the root directory using: @@ -66,10 +66,10 @@ To test, first start a provisioner from the root directory using: docker-compose -f docker-compose.rendered.yml up provisioner ``` -Then in another shell, start the runtime using the clap CLI: +Then in another shell, start the wrapped runtime using the clap CLI: ```bash -cargo run -- --legacy --provisioner-address http://localhost:5000 +cargo run -- --port 6001 --storage-manager-type working-dir --storage-manager-path ./ ``` Or directly (this is the path hardcoded in `deployer::start`): @@ -77,24 +77,23 @@ Or directly (this is the path hardcoded in `deployer::start`): # first, make sure the shuttle-runtime binary is built cargo build # then -/home//target/debug/shuttle-runtime --legacy --provisioner-address http://localhost:5000 +/home//target/debug/shuttle-runtime --port 6001 --storage-manager-type working-dir --storage-manager-path ./ ``` -Pass the path to `deployer::start` -Then in another shell, load a `.so` file and start it up: +Then in another shell, load the service and start it up: ``` bash # load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "/home//examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:6001 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "/home//examples/rocket/hello-world/target/debug/libhello_world.so", "secrets": {"MY_API_KEY": "test"}}' localhost:6001 runtime.Runtime/Load # run (this deployment id is default uuid encoded as base64) -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "deployment_id": "MDAwMDAwMDAtMDAwMC0wMDAwLTAwMDAtMDAwMDAwMDAwMDAw"}' localhost:6001 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "deployment_id": "MDAwMDAwMDAtMDAwMC0wMDAwLTAwMDAtMDAwMDAwMDAwMDAw", "ip": "127.0.0.1:8000"}' localhost:6001 runtime.Runtime/Start # subscribe to logs grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:6001 runtime.Runtime/SubscribeLogs # stop (the service started in the legacy runtime can't currently be stopped) -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:6001 runtime.Runtime/Stop +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{}' localhost:6001 runtime.Runtime/Stop ``` ## Running the tests diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 15b3cf0c7..5bc798964 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -20,7 +20,6 @@ use shuttle_proto::runtime::{ self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse, SubscribeLogsRequest, }; -use shuttle_service::ServiceName; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; @@ -149,18 +148,12 @@ impl Runtime for AxumWasm { &self, request: tonic::Request, ) -> Result, Status> { - let request = request.into_inner(); - - let service_name = ServiceName::from_str(request.service_name.as_str()) - .map_err(|err| Status::from_error(Box::new(err)))?; + let _request = request.into_inner(); let kill_tx = self.kill_tx.lock().unwrap().deref_mut().take(); if let Some(kill_tx) = kill_tx { - if kill_tx - .send(format!("stopping deployment: {}", &service_name)) - .is_err() - { + if kill_tx.send(format!("stopping deployment")).is_err() { error!("the receiver dropped"); return Err(Status::internal("failed to stop deployment")); } diff --git a/runtime/src/bin/rocket.rs b/runtime/src/bin/rocket.rs new file mode 100644 index 000000000..8da00a54f --- /dev/null +++ b/runtime/src/bin/rocket.rs @@ -0,0 +1,50 @@ +// The few line below is what we should now codegen for legacy +#[tokio::main] +async fn main() { + shuttle_runtime::start(loader).await; +} + +async fn loader( + mut factory: shuttle_runtime::ProvisionerFactory, +) -> shuttle_service::ShuttleRocket { + use shuttle_service::ResourceBuilder; + + let secrets = shuttle_secrets::Secrets::new().build(&mut factory).await?; + + rocket(secrets).await +} + +// Everything below this is the usual code a user will write +use anyhow::anyhow; +use rocket::response::status::BadRequest; +use rocket::State; +use shuttle_secrets::SecretStore; + +#[rocket::get("/secret")] +async fn secret(state: &State) -> Result> { + Ok(state.secret.clone()) +} + +struct MyState { + secret: String, +} + +// #[shuttle_service::main] +pub async fn rocket( + // #[shuttle_secrets::Secrets] secret_store: SecretStore, + secret_store: SecretStore, +) -> shuttle_service::ShuttleRocket { + // get secret defined in `Secrets.toml` file. + let secret = if let Some(secret) = secret_store.get("MY_API_KEY") { + secret + } else { + return Err(anyhow!("secret was not found").into()); + }; + + let state = MyState { secret }; + let rocket = rocket::build() + .mount("/", rocket::routes![secret]) + .manage(state); + + Ok(rocket) +} diff --git a/runtime/src/legacy/error.rs b/runtime/src/legacy/error.rs deleted file mode 100644 index 9c57cd4e2..000000000 --- a/runtime/src/legacy/error.rs +++ /dev/null @@ -1,14 +0,0 @@ -use shuttle_service::loader::LoaderError; -use thiserror::Error; - -#[derive(Error, Debug)] -pub enum Error { - #[error("Load error: {0}")] - Load(#[from] LoaderError), - #[error("Run error: {0}")] - Run(#[from] shuttle_service::Error), - #[error("Start error: {0}")] - Start(#[from] shuttle_service::error::CustomError), -} - -pub type Result = std::result::Result; diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index 137e7a369..d5fe5ab8b 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -1,141 +1,147 @@ use std::{ - collections::BTreeMap, iter::FromIterator, net::SocketAddr, ops::DerefMut, path::PathBuf, - str::FromStr, sync::Mutex, + collections::BTreeMap, + iter::FromIterator, + net::{Ipv4Addr, SocketAddr}, + ops::DerefMut, + str::FromStr, + sync::Mutex, + time::Duration, }; -use anyhow::{anyhow, Context}; +use anyhow::Context; use async_trait::async_trait; -use shuttle_common::{storage_manager::StorageManager, LogItem}; +use clap::Parser; +use futures::Future; +use shuttle_common::{ + storage_manager::{StorageManager, WorkingDirStorageManager}, + LogItem, +}; use shuttle_proto::{ provisioner::provisioner_client::ProvisionerClient, runtime::{ - self, runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse, - StopRequest, StopResponse, SubscribeLogsRequest, + self, + runtime_server::{Runtime, RuntimeServer}, + LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse, + SubscribeLogsRequest, }, }; -use shuttle_service::{ - loader::{LoadedService, Loader}, - Factory, Logger, ServiceName, -}; +use shuttle_service::{Factory, Service, ServiceName}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use tokio_stream::wrappers::ReceiverStream; -use tonic::{transport::Endpoint, Request, Response, Status}; +use tonic::{ + transport::{Endpoint, Server}, + Request, Response, Status, +}; use tracing::{error, instrument, trace}; use uuid::Uuid; -use crate::provisioner_factory::ProvisionerFactory; +use crate::{provisioner_factory::ProvisionerFactory, Args}; -mod error; +pub async fn start( + loader: impl Loader> + Send + 'static, +) { + let args = Args::parse(); + let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), args.port); -pub struct Legacy -where - S: StorageManager, -{ + let provisioner_address = args.provisioner_address; + let mut server_builder = + Server::builder().http2_keepalive_interval(Some(Duration::from_secs(60))); + + let router = { + let legacy = Legacy::new( + provisioner_address, + loader, + WorkingDirStorageManager::new(args.storage_manager_path), + ); + + let svc = RuntimeServer::new(legacy); + server_builder.add_service(svc) + }; + + router.serve(addr).await.unwrap(); +} + +pub struct Legacy { // Mutexes are for interior mutability - so_path: Mutex>, logs_rx: Mutex>>, logs_tx: UnboundedSender, provisioner_address: Endpoint, kill_tx: Mutex>>, - secrets: Mutex>>, - storage_manager: S, + storage_manager: M, + loader: Mutex>, + service: Mutex>, } -impl Legacy -where - S: StorageManager, -{ - pub fn new(provisioner_address: Endpoint, storage_manager: S) -> Self { +impl Legacy { + pub fn new(provisioner_address: Endpoint, loader: L, storage_manager: M) -> Self { let (tx, rx) = mpsc::unbounded_channel(); Self { - so_path: Mutex::new(None), logs_rx: Mutex::new(Some(rx)), logs_tx: tx, kill_tx: Mutex::new(None), provisioner_address, - secrets: Mutex::new(None), storage_manager, + loader: Mutex::new(Some(loader)), + service: Mutex::new(None), } } } #[async_trait] -impl Runtime for Legacy +pub trait Loader where - S: StorageManager + 'static, + Fac: Factory, { - async fn load(&self, request: Request) -> Result, Status> { - let LoadRequest { path, secrets, .. } = request.into_inner(); - trace!(path, "loading"); - - let so_path = PathBuf::from(path); - - if !so_path.exists() { - return Err(Status::not_found("'.so' to load does not exist")); - } + type Service: Service; - *self.so_path.lock().unwrap() = Some(so_path); + async fn load(self, factory: Fac) -> Result; +} - *self.secrets.lock().unwrap() = Some(BTreeMap::from_iter(secrets.into_iter())); +#[async_trait] +impl Loader for F +where + F: FnOnce(Fac) -> O + Send, + O: Future> + Send, + Fac: Factory + 'static, + S: Service, +{ + type Service = S; - let message = LoadResponse { success: true }; - Ok(Response::new(message)) + async fn load(self, factory: Fac) -> Result { + (self)(factory).await } +} - async fn start( - &self, - request: Request, - ) -> Result, Status> { - trace!("legacy starting"); +#[async_trait] +impl Runtime for Legacy +where + M: StorageManager + Send + Sync + 'static, + L: Loader, Service = S> + Send + 'static, + S: Service + Send + 'static, +{ + async fn load(&self, request: Request) -> Result, Status> { + let LoadRequest { + path, + secrets, + service_name, + } = request.into_inner(); + trace!(path, "loading"); + + let secrets = BTreeMap::from_iter(secrets.into_iter()); let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone()) .await .context("failed to connect to provisioner") .map_err(|err| Status::internal(err.to_string()))?; - let so_path = self - .so_path - .lock() - .unwrap() - .as_ref() - .ok_or_else(|| -> error::Error { - error::Error::Start(anyhow!("trying to start a service that was not loaded")) - }) - .map_err(|err| Status::from_error(Box::new(err)))? - .clone(); - let secrets = self - .secrets - .lock() - .unwrap() - .as_ref() - .ok_or_else(|| -> error::Error { - error::Error::Start(anyhow!( - "trying to get secrets from a service that was not loaded" - )) - }) - .map_err(|err| Status::from_error(Box::new(err)))? - .clone(); - - trace!("prepare done"); - - let StartRequest { - deployment_id, - service_name, - ip, - } = request.into_inner(); - let service_address = SocketAddr::from_str(&ip) - .context("invalid socket address") - .map_err(|err| Status::invalid_argument(err.to_string()))?; - let service_name = ServiceName::from_str(service_name.as_str()) .map_err(|err| Status::from_error(Box::new(err)))?; - let deployment_id = Uuid::from_slice(&deployment_id) - .map_err(|error| Status::invalid_argument(error.to_string()))?; + let deployment_id = Uuid::new_v4(); - let mut factory = ProvisionerFactory::new( + let factory = ProvisionerFactory::new( provisioner_client, service_name, deployment_id, @@ -144,17 +150,34 @@ where ); trace!("got factory"); - let logs_tx = self.logs_tx.clone(); + let loader = self.loader.lock().unwrap().deref_mut().take().unwrap(); - let logger = Logger::new(logs_tx, deployment_id); + let service = loader.load(factory).await.unwrap(); + + *self.service.lock().unwrap() = Some(service); + + let message = LoadResponse { success: true }; + Ok(Response::new(message)) + } + + async fn start( + &self, + request: Request, + ) -> Result, Status> { + trace!("legacy starting"); + let service = self.service.lock().unwrap().deref_mut().take(); + let service = service.unwrap(); + + let StartRequest { ip, .. } = request.into_inner(); + let service_address = SocketAddr::from_str(&ip) + .context("invalid socket address") + .map_err(|err| Status::invalid_argument(err.to_string()))?; + + let _logs_tx = self.logs_tx.clone(); trace!(%service_address, "starting"); - let service = load_service(service_address, so_path, &mut factory, logger) - .await - .map_err(|error| Status::internal(error.to_string()))?; let (kill_tx, kill_rx) = tokio::sync::oneshot::channel(); - *self.kill_tx.lock().unwrap() = Some(kill_tx); // start service as a background task with a kill receiver @@ -189,21 +212,11 @@ where } } - // todo: this doesn't currently stop the service, since we can't stop the tokio runtime it - // is started on. - async fn stop(&self, request: Request) -> Result, Status> { - let request = request.into_inner(); - - let service_name = ServiceName::from_str(request.service_name.as_str()) - .map_err(|err| Status::from_error(Box::new(err)))?; - + async fn stop(&self, _request: Request) -> Result, Status> { let kill_tx = self.kill_tx.lock().unwrap().deref_mut().take(); if let Some(kill_tx) = kill_tx { - if kill_tx - .send(format!("stopping deployment: {}", &service_name)) - .is_err() - { + if kill_tx.send(format!("stopping deployment")).is_err() { error!("the receiver dropped"); return Err(Status::internal("failed to stop deployment")); } @@ -218,15 +231,14 @@ where /// Run the service until a stop signal is received #[instrument(skip(service, kill_rx))] async fn run_until_stopped( - service: LoadedService, + // service: LoadedService, + service: impl Service, addr: SocketAddr, kill_rx: tokio::sync::oneshot::Receiver, ) { - let (handle, library) = service; - trace!("starting deployment on {}", &addr); tokio::select! { - _ = handle => { + _ = service.bind(addr) => { trace!("deployment stopped on {}", &addr); }, message = kill_rx => { @@ -236,21 +248,4 @@ async fn run_until_stopped( } } } - - tokio::spawn(async move { - trace!("closing .so file"); - library.close().unwrap(); - }); -} - -#[instrument(skip(addr, so_path, factory, logger))] -async fn load_service( - addr: SocketAddr, - so_path: PathBuf, - factory: &mut dyn Factory, - logger: Logger, -) -> error::Result { - let loader = Loader::from_so_file(so_path)?; - - Ok(loader.load(factory, addr, logger).await?) } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index fc17f47c4..62fedf7f3 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,8 +1,9 @@ mod args; mod axum; mod legacy; -pub mod provisioner_factory; +mod provisioner_factory; pub use args::{Args, StorageManagerType}; pub use axum::AxumWasm; -pub use legacy::Legacy; +pub use legacy::{start, Legacy}; +pub use provisioner_factory::ProvisionerFactory; diff --git a/service/src/lib.rs b/service/src/lib.rs index 2ece4b571..b40fe8b61 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -389,7 +389,7 @@ pub trait Factory: Send + Sync { #[async_trait] pub trait ResourceBuilder { fn new() -> Self; - async fn build(self, factory: &mut dyn Factory, runtime: &Runtime) -> Result; + async fn build(self, factory: &mut dyn Factory) -> Result; } /// A tokio handle the service was started on @@ -406,7 +406,7 @@ pub trait Service: Send + Sync { /// This function is run exactly once on each instance of a deployment. /// /// The deployer expects this instance of [Service][Service] to bind to the passed [SocketAddr][SocketAddr]. - async fn bind(mut self: Box, addr: SocketAddr) -> Result<(), error::Error>; + async fn bind(mut self, addr: SocketAddr) -> Result<(), error::Error>; } /// This function is generated by our codegen. It uses the factory to get other services and instantiate them on @@ -476,7 +476,7 @@ impl Drop for Bootstrapper { #[cfg(feature = "web-rocket")] #[async_trait] impl Service for rocket::Rocket { - async fn bind(mut self: Box, addr: SocketAddr) -> Result<(), error::Error> { + async fn bind(mut self, addr: SocketAddr) -> Result<(), error::Error> { let shutdown = rocket::config::Shutdown { ctrlc: false, ..rocket::config::Shutdown::default()