From ee342e49b9530f7d9d372f77ff28b87d6857973d Mon Sep 17 00:00:00 2001 From: Pieter Date: Mon, 14 Nov 2022 08:03:48 +0200 Subject: [PATCH] feat: get logs from runtime (#459) * feat: get logs from runtime * refactor: trim down deployer * refactor: start runtime earlier * refactor: connect to client earlier * refactor: hook runtime logs to persistence * bug: associate deployment id with logs * refactor: cleanup * feat: make sure grpc connection stays open --- Cargo.lock | 7 +- deployer/src/deployment/deploy_layer.rs | 129 +++++++++--------- deployer/src/deployment/mod.rs | 16 +-- .../src/deployment/provisioner_factory.rs | 62 --------- deployer/src/deployment/run.rs | 110 +++++---------- deployer/src/deployment/runtime_logger.rs | 54 -------- deployer/src/lib.rs | 47 ++----- deployer/src/main.rs | 71 ++++++++-- deployer/src/persistence/mod.rs | 2 + docker-compose.dev.yml | 3 + proto/Cargo.toml | 1 + proto/runtime.proto | 45 +++++- proto/src/lib.rs | 47 +++++++ runtime/Cargo.toml | 2 + runtime/README.md | 10 +- runtime/src/legacy/mod.rs | 55 ++++++-- runtime/src/main.rs | 11 +- runtime/src/next/mod.rs | 14 +- 18 files changed, 347 insertions(+), 339 deletions(-) delete mode 100644 deployer/src/deployment/provisioner_factory.rs delete mode 100644 deployer/src/deployment/runtime_logger.rs diff --git a/Cargo.lock b/Cargo.lock index b065cb0f8..d4f089eca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5818,6 +5818,7 @@ name = "shuttle-proto" version = "0.7.0" dependencies = [ "prost", + "prost-types", "shuttle-common", "tonic", "tonic-build", @@ -5863,9 +5864,11 @@ dependencies = [ "shuttle-service", "thiserror", "tokio", + "tokio-stream", "tonic", "tracing", "tracing-subscriber", + "uuid 1.1.2", "wasi-common", "wasmtime", "wasmtime-wasi", @@ -6823,9 +6826,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.9" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df54d54117d6fdc4e4fea40fe1e4e566b3505700e148a6827e59b34b0d2600d9" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" dependencies = [ "futures-core", "pin-project-lite 0.2.9", diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index 429ab99d6..f74d5039d 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -22,7 +22,8 @@ use chrono::{DateTime, Utc}; use serde_json::json; use shuttle_common::STATE_MESSAGE; -use std::{net::SocketAddr, str::FromStr}; +use shuttle_proto::runtime; +use std::{net::SocketAddr, str::FromStr, time::SystemTime}; use tracing::{error, field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; @@ -126,6 +127,51 @@ impl From for DeploymentState { } } +impl From for Log { + fn from(log: runtime::LogItem) -> Self { + Self { + id: Uuid::from_slice(&log.id).unwrap(), + state: runtime::LogState::from_i32(log.state).unwrap().into(), + level: runtime::LogLevel::from_i32(log.level).unwrap().into(), + timestamp: DateTime::from(SystemTime::try_from(log.timestamp.unwrap()).unwrap()), + file: log.file, + line: log.line, + target: log.target, + fields: serde_json::from_slice(&log.fields).unwrap(), + r#type: LogType::Event, + address: None, + } + } +} + +impl From for State { + fn from(state: runtime::LogState) -> Self { + match state { + runtime::LogState::Queued => Self::Queued, + runtime::LogState::Building => Self::Building, + runtime::LogState::Built => Self::Built, + runtime::LogState::Loading => Self::Loading, + runtime::LogState::Running => Self::Running, + runtime::LogState::Completed => Self::Completed, + runtime::LogState::Stopped => Self::Stopped, + runtime::LogState::Crashed => Self::Crashed, + runtime::LogState::Unknown => Self::Unknown, + } + } +} + +impl From for LogLevel { + fn from(level: runtime::LogLevel) -> Self { + match level { + runtime::LogLevel::Trace => Self::Trace, + runtime::LogLevel::Debug => Self::Debug, + runtime::LogLevel::Info => Self::Info, + runtime::LogLevel::Warn => Self::Warn, + runtime::LogLevel::Error => Self::Error, + } + } +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum LogType { Event, @@ -345,7 +391,6 @@ impl Visit for JsonVisitor { #[cfg(test)] mod tests { use std::{ - collections::BTreeMap, fs::read_dir, path::PathBuf, sync::{Arc, Mutex}, @@ -356,15 +401,15 @@ mod tests { use ctor::ctor; use flate2::{write::GzEncoder, Compression}; use futures::FutureExt; - use shuttle_service::Logger; - use tokio::{select, sync::mpsc, time::sleep}; + use shuttle_proto::runtime::runtime_client::RuntimeClient; + use tokio::{select, time::sleep}; + use tonic::transport::Channel; use tracing_subscriber::prelude::*; use uuid::Uuid; use crate::{ deployment::{ - deploy_layer::LogType, provisioner_factory, runtime_logger, ActiveDeploymentsGetter, - Built, DeploymentManager, Queued, + deploy_layer::LogType, ActiveDeploymentsGetter, Built, DeploymentManager, Queued, }, persistence::{SecretRecorder, State}, }; @@ -430,6 +475,12 @@ mod tests { } } + async fn get_runtime_client() -> RuntimeClient { + RuntimeClient::connect("http://127.0.0.1:6001") + .await + .unwrap() + } + #[async_trait::async_trait] impl SecretRecorder for Arc> { type Err = std::io::Error; @@ -450,54 +501,6 @@ mod tests { } } - struct StubAbstractProvisionerFactory; - - impl provisioner_factory::AbstractFactory for StubAbstractProvisionerFactory { - type Output = StubProvisionerFactory; - - fn get_factory(&self) -> Self::Output { - StubProvisionerFactory - } - } - - struct StubProvisionerFactory; - - #[async_trait::async_trait] - impl shuttle_service::Factory for StubProvisionerFactory { - async fn get_db_connection_string( - &mut self, - _db_type: shuttle_common::database::Type, - ) -> Result { - panic!("did not expect any deploy_layer test to connect to the database") - } - - async fn get_secrets( - &mut self, - ) -> Result, shuttle_service::Error> { - panic!("did not expect any deploy_layer test to get secrets") - } - - fn get_service_name(&self) -> shuttle_service::ServiceName { - panic!("did not expect any deploy_layer test to get the service name") - } - } - - struct StubRuntimeLoggerFactory; - - impl runtime_logger::Factory for StubRuntimeLoggerFactory { - fn get_logger(&self, id: Uuid) -> Logger { - let (tx, mut rx) = mpsc::unbounded_channel(); - - tokio::spawn(async move { - while let Some(log) = rx.recv().await { - println!("{log}") - } - }); - - Logger::new(tx, id) - } - } - #[derive(Clone)] struct StubActiveDeploymentGetter; @@ -516,8 +519,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_to_be_queued() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, @@ -635,8 +637,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_self_stop() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, @@ -715,8 +716,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_bind_panic() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, @@ -795,8 +795,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn deployment_main_panic() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, @@ -870,8 +869,7 @@ mod tests { #[tokio::test] async fn deployment_from_run() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, @@ -924,8 +922,7 @@ mod tests { #[tokio::test] async fn scope_with_nil_id() { let deployment_manager = DeploymentManager::new( - StubAbstractProvisionerFactory, - StubRuntimeLoggerFactory, + get_runtime_client().await, RECORDER.clone(), RECORDER.clone(), StubActiveDeploymentGetter, diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index d7f10eeb5..549f05ad6 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -1,13 +1,13 @@ pub mod deploy_layer; -pub mod provisioner_factory; mod queue; mod run; -pub mod runtime_logger; use std::path::PathBuf; pub use queue::Queued; pub use run::{ActiveDeploymentsGetter, Built}; +use shuttle_proto::runtime::runtime_client::RuntimeClient; +use tonic::transport::Channel; use tracing::instrument; use crate::persistence::{SecretRecorder, State}; @@ -30,8 +30,7 @@ impl DeploymentManager { /// Create a new deployment manager. Manages one or more 'pipelines' for /// processing service building, loading, and deployment. pub fn new( - abstract_dummy_factory: impl provisioner_factory::AbstractFactory, - runtime_logger_factory: impl runtime_logger::Factory, + runtime_client: RuntimeClient, build_log_recorder: impl LogRecorder, secret_recorder: impl SecretRecorder, active_deployment_getter: impl ActiveDeploymentsGetter, @@ -42,8 +41,7 @@ impl DeploymentManager { DeploymentManager { pipeline: Pipeline::new( kill_send.clone(), - abstract_dummy_factory, - runtime_logger_factory, + runtime_client, build_log_recorder, secret_recorder, active_deployment_getter, @@ -97,8 +95,7 @@ impl Pipeline { /// deployments between the aforementioned tasks. fn new( kill_send: KillSender, - abstract_factory: impl provisioner_factory::AbstractFactory, - runtime_logger_factory: impl runtime_logger::Factory, + runtime_client: RuntimeClient, build_log_recorder: impl LogRecorder, secret_recorder: impl SecretRecorder, active_deployment_getter: impl ActiveDeploymentsGetter, @@ -118,9 +115,8 @@ impl Pipeline { )); tokio::spawn(run::task( run_recv, + runtime_client, kill_send, - abstract_factory, - runtime_logger_factory, active_deployment_getter, artifacts_path, )); diff --git a/deployer/src/deployment/provisioner_factory.rs b/deployer/src/deployment/provisioner_factory.rs deleted file mode 100644 index 3b923b60a..000000000 --- a/deployer/src/deployment/provisioner_factory.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::collections::BTreeMap; -use std::str::FromStr; - -use async_trait::async_trait; -use shuttle_common::database; -use shuttle_service::{Factory, ServiceName}; - -/// Trait to make it easy to get a factory (service locator) for each service being started -pub trait AbstractFactory: Send + 'static { - type Output: Factory; - - /// Get a factory for a specific service - fn get_factory(&self) -> Self::Output; -} - -/// An abstract factory that makes factories which uses provisioner -#[derive(Clone)] -pub struct AbstractDummyFactory; - -impl AbstractFactory for AbstractDummyFactory { - type Output = DummyFactory; - - fn get_factory(&self) -> Self::Output { - DummyFactory::new() - } -} - -impl AbstractDummyFactory { - pub fn new() -> Self { - Self - } -} - -pub struct DummyFactory { - service_name: ServiceName, -} - -impl DummyFactory { - fn new() -> Self { - Self { - service_name: ServiceName::from_str("legacy").unwrap(), - } - } -} - -#[async_trait] -impl Factory for DummyFactory { - fn get_service_name(&self) -> ServiceName { - self.service_name.clone() - } - - async fn get_db_connection_string( - &mut self, - _: database::Type, - ) -> Result { - todo!() - } - - async fn get_secrets(&mut self) -> Result, shuttle_service::Error> { - todo!() - } -} diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 7ab74a557..cf35d4e33 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -8,12 +8,12 @@ use async_trait::async_trait; use shuttle_common::project::ProjectName as ServiceName; use shuttle_proto::runtime::{runtime_client::RuntimeClient, LoadRequest, StartRequest}; -use shuttle_service::{Factory, Logger}; use tokio::task::JoinError; +use tonic::transport::Channel; use tracing::{error, info, instrument, trace}; use uuid::Uuid; -use super::{provisioner_factory, runtime_logger, KillReceiver, KillSender, RunReceiver, State}; +use super::{KillReceiver, KillSender, RunReceiver, State}; use crate::error::{Error, Result}; /// Run a task which takes runnable deploys from a channel and starts them up with a factory provided by the @@ -21,9 +21,8 @@ use crate::error::{Error, Result}; /// A deploy is killed when it receives a signal from the kill channel pub async fn task( mut recv: RunReceiver, + runtime_client: RuntimeClient, kill_send: KillSender, - abstract_dummy_factory: impl provisioner_factory::AbstractFactory, - logger_factory: impl runtime_logger::Factory, active_deployment_getter: impl ActiveDeploymentsGetter, artifacts_path: PathBuf, ) { @@ -51,8 +50,6 @@ pub async fn task( continue; } }; - let mut factory = abstract_dummy_factory.get_factory(); - let logger = logger_factory.get_logger(id); let old_deployments_killer = kill_old_deployments( built.service_id, @@ -73,14 +70,14 @@ pub async fn task( }; let libs_path = libs_path.clone(); + let runtime_client = runtime_client.clone(); tokio::spawn(async move { if let Err(err) = built .handle( addr, libs_path, - &mut factory, - logger, + runtime_client, kill_recv, old_deployments_killer, cleanup, @@ -163,14 +160,13 @@ pub struct Built { } impl Built { - #[instrument(name = "built_handle", skip(self, libs_path, _factory, _logger, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(name = "built_handle", skip(self, libs_path, runtime_client, kill_recv, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, address: SocketAddr, libs_path: PathBuf, - _factory: &mut dyn Factory, - _logger: Logger, + runtime_client: RuntimeClient, kill_recv: KillReceiver, kill_old_deployments: impl futures::Future>, cleanup: impl FnOnce(std::result::Result, JoinError>) @@ -186,6 +182,7 @@ impl Built { self.id, self.service_name, libs_path, + runtime_client, address, kill_recv, cleanup, @@ -195,22 +192,18 @@ impl Built { } } -#[instrument(skip(_kill_recv, _cleanup), fields(address = %_address, state = %State::Running))] +#[instrument(skip(runtime_client, _kill_recv, _cleanup), fields(address = %_address, state = %State::Running))] async fn run( id: Uuid, service_name: String, libs_path: PathBuf, + mut runtime_client: RuntimeClient, _address: SocketAddr, _kill_recv: KillReceiver, _cleanup: impl FnOnce(std::result::Result, JoinError>) + Send + 'static, ) { - info!("starting up deployer grpc client"); - let mut client = RuntimeClient::connect("http://127.0.0.1:6001") - .await - .unwrap(); - info!( "loading project from: {}", libs_path.clone().into_os_string().into_string().unwrap() @@ -222,24 +215,26 @@ async fn run( service_name: service_name.clone(), }); info!("loading service"); - let response = client.load(load_request).await; + let response = runtime_client.load(load_request).await; if let Err(e) = response { info!("failed to load service: {}", e); } - let start_request = tonic::Request::new(StartRequest { service_name }); + let start_request = tonic::Request::new(StartRequest { + deployment_id: id.as_bytes().to_vec(), + service_name, + }); info!("starting service"); - let response = client.start(start_request).await.unwrap(); + let response = runtime_client.start(start_request).await.unwrap(); - info!(response = ?response, "client response: "); + info!(response = ?response.into_inner(), "client response: "); } #[cfg(test)] mod tests { use std::{ - collections::BTreeMap, fs, net::{Ipv4Addr, SocketAddr}, path::PathBuf, @@ -247,13 +242,13 @@ mod tests { time::Duration, }; - use shuttle_common::database; - use shuttle_service::{Factory, Logger}; + use shuttle_proto::runtime::runtime_client::RuntimeClient; use tokio::{ - sync::{broadcast, mpsc, oneshot}, + sync::{broadcast, oneshot}, task::JoinError, time::sleep, }; + use tonic::transport::Channel; use uuid::Uuid; use crate::error::Error; @@ -263,44 +258,16 @@ mod tests { const RESOURCES_PATH: &str = "tests/resources"; const LIBS_PATH: &str = "/tmp/shuttle-libs-tests"; - struct StubFactory; - - #[async_trait::async_trait] - impl Factory for StubFactory { - async fn get_db_connection_string( - &mut self, - _db_type: database::Type, - ) -> Result { - panic!("no run test should get an sql connection"); - } - - async fn get_secrets( - &mut self, - ) -> Result, shuttle_service::Error> { - panic!("no test should get any secrets"); - } - - fn get_service_name(&self) -> shuttle_service::ServiceName { - panic!("no test should get the service name"); - } - } - - fn get_logger(id: Uuid) -> Logger { - let (tx, mut rx) = mpsc::unbounded_channel(); - - tokio::spawn(async move { - while let Some(log) = rx.recv().await { - println!("{log}"); - } - }); - - Logger::new(tx, id) - } - async fn kill_old_deployments() -> crate::error::Result<()> { Ok(()) } + async fn get_runtime_client() -> RuntimeClient { + RuntimeClient::connect("http://127.0.0.1:6001") + .await + .unwrap() + } + // This test uses the kill signal to make sure a service does stop when asked to #[tokio::test] async fn can_be_killed() { @@ -321,15 +288,12 @@ mod tests { cleanup_send.send(()).unwrap(); }; let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8001); - let mut factory = StubFactory; - let logger = get_logger(built.id); built .handle( addr, PathBuf::from(LIBS_PATH), - &mut factory, - logger, + get_runtime_client().await, kill_recv, kill_old_deployments(), handle_cleanup, @@ -369,15 +333,12 @@ mod tests { cleanup_send.send(()).unwrap(); }; let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8001); - let mut factory = StubFactory; - let logger = get_logger(built.id); built .handle( addr, PathBuf::from(LIBS_PATH), - &mut factory, - logger, + get_runtime_client().await, kill_recv, kill_old_deployments(), handle_cleanup, @@ -411,15 +372,12 @@ mod tests { cleanup_send.send(()).unwrap(); }; let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8001); - let mut factory = StubFactory; - let logger = get_logger(built.id); built .handle( addr, PathBuf::from(LIBS_PATH), - &mut factory, - logger, + get_runtime_client().await, kill_recv, kill_old_deployments(), handle_cleanup, @@ -441,15 +399,12 @@ mod tests { let handle_cleanup = |_result| panic!("the service shouldn't even start"); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8001); - let mut factory = StubFactory; - let logger = get_logger(built.id); let result = built .handle( addr, PathBuf::from(LIBS_PATH), - &mut factory, - logger, + get_runtime_client().await, kill_recv, kill_old_deployments(), handle_cleanup, @@ -474,15 +429,12 @@ mod tests { let handle_cleanup = |_result| panic!("no service means no cleanup"); let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8001); - let mut factory = StubFactory; - let logger = get_logger(built.id); let result = built .handle( addr, PathBuf::from(LIBS_PATH), - &mut factory, - logger, + get_runtime_client().await, kill_recv, kill_old_deployments(), handle_cleanup, diff --git a/deployer/src/deployment/runtime_logger.rs b/deployer/src/deployment/runtime_logger.rs deleted file mode 100644 index 266805fed..000000000 --- a/deployer/src/deployment/runtime_logger.rs +++ /dev/null @@ -1,54 +0,0 @@ -use shuttle_common::LogItem; -use shuttle_service::Logger; -use tokio::sync::mpsc::{self, UnboundedSender}; -use uuid::Uuid; - -use super::deploy_layer::{self, LogType}; - -pub trait Factory: Send + 'static { - fn get_logger(&self, id: Uuid) -> Logger; -} - -/// Factory to create runtime loggers for deployments -pub struct RuntimeLoggerFactory { - log_send: crossbeam_channel::Sender, -} - -impl RuntimeLoggerFactory { - pub fn new(log_send: crossbeam_channel::Sender) -> Self { - Self { log_send } - } -} - -impl Factory for RuntimeLoggerFactory { - fn get_logger(&self, id: Uuid) -> Logger { - let (tx, mut rx): (UnboundedSender, _) = mpsc::unbounded_channel(); - - let sender = self.log_send.clone(); - - tokio::spawn(async move { - while let Some(log) = rx.recv().await { - sender.send(log.into()).expect("to send log to persistence"); - } - }); - - Logger::new(tx, id) - } -} - -impl From for deploy_layer::Log { - fn from(log: LogItem) -> Self { - Self { - id: log.id, - state: log.state.into(), - level: log.level.into(), - timestamp: log.timestamp, - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::from_slice(&log.fields).unwrap(), - r#type: LogType::Event, - address: None, - } - } -} diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index f1b325301..16dcd9992 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -1,11 +1,8 @@ -use std::{convert::Infallible, env, net::SocketAddr, path::PathBuf}; +use std::{convert::Infallible, net::SocketAddr}; pub use args::Args; -pub use deployment::{ - deploy_layer::DeployLayer, provisioner_factory::AbstractDummyFactory, - runtime_logger::RuntimeLoggerFactory, -}; -use deployment::{provisioner_factory, runtime_logger, Built, DeploymentManager}; +pub use deployment::deploy_layer::DeployLayer; +use deployment::{Built, DeploymentManager}; use fqdn::FQDN; use hyper::{ server::conn::AddrStream, @@ -13,7 +10,8 @@ use hyper::{ }; pub use persistence::Persistence; use proxy::AddressGetter; -use tokio::select; +use shuttle_proto::runtime::runtime_client::RuntimeClient; +use tonic::transport::Channel; use tracing::{error, info}; mod args; @@ -23,15 +21,9 @@ mod handlers; mod persistence; mod proxy; -pub async fn start( - abstract_dummy_factory: impl provisioner_factory::AbstractFactory, - runtime_logger_factory: impl runtime_logger::Factory, - persistence: Persistence, - args: Args, -) { +pub async fn start(persistence: Persistence, runtime_client: RuntimeClient, args: Args) { let deployment_manager = DeploymentManager::new( - abstract_dummy_factory, - runtime_logger_factory, + runtime_client, persistence.clone(), persistence.clone(), persistence.clone(), @@ -55,27 +47,10 @@ pub async fn start( ); let make_service = router.into_make_service(); - let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .parent() - .unwrap() - .to_path_buf(); - - let runtime_dir = workspace_root.join("target/debug"); - - let mut runtime = tokio::process::Command::new(runtime_dir.join("shuttle-runtime")) - .args(&["--legacy", "--provisioner-address", "http://localhost:8000"]) - .current_dir(&runtime_dir) - .spawn() - .unwrap(); - - select! { - _ = runtime.wait() => { - info!("Legacy runtime stopped.") - }, - _ = axum::Server::bind(&args.api_address).serve(make_service) => { - info!("Handlers server stopped serving addr: {}", &args.api_address); - }, - } + axum::Server::bind(&args.api_address) + .serve(make_service) + .await + .unwrap_or_else(|_| panic!("Failed to bind to address: {}", args.api_address)); } pub async fn start_proxy( diff --git a/deployer/src/main.rs b/deployer/src/main.rs index d473361ad..eac473365 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -1,9 +1,14 @@ +use std::path::PathBuf; +use std::process::exit; +use std::time::Duration; + use clap::Parser; -use shuttle_deployer::{ - start, start_proxy, AbstractDummyFactory, Args, DeployLayer, Persistence, RuntimeLoggerFactory, -}; +use shuttle_deployer::{start, start_proxy, Args, DeployLayer, Persistence}; +use shuttle_proto::runtime::runtime_client::RuntimeClient; +use shuttle_proto::runtime::SubscribeLogsRequest; use tokio::select; -use tracing::trace; +use tonic::transport::Endpoint; +use tracing::{error, info, trace}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -34,12 +39,62 @@ async fn main() { .with(opentelemetry) .init(); - let abstract_dummy_factory = AbstractDummyFactory::new(); + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .to_path_buf(); + let runtime_dir = workspace_root.join("target/debug"); + + let mut runtime = tokio::process::Command::new(runtime_dir.join("shuttle-runtime")) + .args(&[ + "--legacy", + "--provisioner-address", + "https://localhost:5000", + ]) + .current_dir(&runtime_dir) + .spawn() + .unwrap(); + + // Sleep because the timeout below does not seem to work + // TODO: investigate why + tokio::time::sleep(Duration::from_secs(2)).await; - let runtime_logger_factory = RuntimeLoggerFactory::new(persistence.get_log_sender()); + info!("connecting runtime client"); + let conn = Endpoint::new("http://127.0.0.1:6001") + .unwrap() + .connect_timeout(Duration::from_secs(5)) + .connect() + .await + .unwrap(); + let mut runtime_client = RuntimeClient::new(conn); + + let sender = persistence.get_log_sender(); + let mut stream = runtime_client + .subscribe_logs(tonic::Request::new(SubscribeLogsRequest {})) + .await + .unwrap() + .into_inner(); + + let logs_task = tokio::spawn(async move { + while let Some(log) = stream.message().await.unwrap() { + sender.send(log.into()).expect("to send log to persistence"); + } + }); select! { - _ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => {}, - _ = start(abstract_dummy_factory, runtime_logger_factory, persistence, args) => {}, + _ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => { + error!("Proxy stopped.") + }, + _ = start(persistence, runtime_client, args) => { + error!("Deployment service stopped.") + }, + _ = runtime.wait() => { + error!("Legacy runtime stopped.") + }, + _ = logs_task => { + error!("Logs task stopped") + }, } + + exit(1); } diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index 28be90397..8786142ea 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -277,10 +277,12 @@ impl Persistence { get_deployment_logs(&self.pool, id).await } + /// Get a broadcast channel for listening to logs that are being stored into persistence pub fn get_log_subscriber(&self) -> Receiver { self.stream_log_send.subscribe() } + /// Returns a sender for sending logs to persistence storage pub fn get_log_sender(&self) -> crossbeam_channel::Sender { self.log_send.clone() } diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index dcfe18a19..15928d045 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -21,3 +21,6 @@ services: >&2 echo "DBs are available - starting provisioner" exec /usr/local/bin/service "$${@:0}" + ports: + - published: 5000 + target: 8000 diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 3b64c23d9..bfe263cc4 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] prost = "0.11.0" +prost-types = "0.11.0" tonic = "0.8.2" [dependencies.shuttle-common] diff --git a/proto/runtime.proto b/proto/runtime.proto index b9c630556..b6b0b19e1 100644 --- a/proto/runtime.proto +++ b/proto/runtime.proto @@ -1,12 +1,16 @@ syntax = "proto3"; package runtime; +import "google/protobuf/timestamp.proto"; + service Runtime { // Load a service file to be ready to start it - rpc load(LoadRequest) returns (LoadResponse); + rpc Load(LoadRequest) returns (LoadResponse); // Start a loaded service file - rpc start(StartRequest) returns (StartResponse); + rpc Start(StartRequest) returns (StartResponse); + + rpc SubscribeLogs(SubscribeLogsRequest) returns (stream LogItem); } message LoadRequest { @@ -23,8 +27,10 @@ message LoadResponse { } message StartRequest { + // Id to associate with the deployment being started + bytes deployment_id = 1; // Name of service to start - string service_name = 1; + string service_name = 2; } message StartResponse { @@ -36,3 +42,36 @@ message StartResponse { // This is likely to be None for bots uint32 port = 2; } + +message SubscribeLogsRequest {} + +message LogItem { + bytes id = 1; + google.protobuf.Timestamp timestamp = 2; + LogState state = 3; + LogLevel level = 4; + optional string file = 5; + optional uint32 line = 6; + string target = 7; + bytes fields = 8; +} + +enum LogState { + Queued = 0; + Building = 1; + Built = 2; + Loading = 3; + Running = 4; + Completed = 5; + Stopped = 6; + Crashed = 7; + Unknown = 50; +} + +enum LogLevel { + Trace = 0; + Debug = 1; + Info = 2; + Warn = 3; + Error = 4; +} diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 49f39a14a..6d820a561 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -65,5 +65,52 @@ pub mod provisioner { } pub mod runtime { + use std::time::SystemTime; + + use prost_types::Timestamp; + tonic::include_proto!("runtime"); + + impl From for LogItem { + fn from(log: shuttle_common::LogItem) -> Self { + Self { + id: log.id.into_bytes().to_vec(), + timestamp: Some(Timestamp::from(SystemTime::from(log.timestamp))), + state: LogState::from(log.state) as i32, + level: LogLevel::from(log.level) as i32, + file: log.file, + line: log.line, + target: log.target, + fields: log.fields, + } + } + } + + impl From for LogState { + fn from(state: shuttle_common::deployment::State) -> Self { + match state { + shuttle_common::deployment::State::Queued => Self::Queued, + shuttle_common::deployment::State::Building => Self::Building, + shuttle_common::deployment::State::Built => Self::Built, + shuttle_common::deployment::State::Loading => Self::Loading, + shuttle_common::deployment::State::Running => Self::Running, + shuttle_common::deployment::State::Completed => Self::Completed, + shuttle_common::deployment::State::Stopped => Self::Stopped, + shuttle_common::deployment::State::Crashed => Self::Crashed, + shuttle_common::deployment::State::Unknown => Self::Unknown, + } + } + } + + impl From for LogLevel { + fn from(level: shuttle_common::log::Level) -> Self { + match level { + shuttle_common::log::Level::Trace => Self::Trace, + shuttle_common::log::Level::Debug => Self::Debug, + shuttle_common::log::Level::Info => Self::Info, + shuttle_common::log::Level::Warn => Self::Warn, + shuttle_common::log::Level::Error => Self::Error, + } + } + } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 8dab41bc0..04e03b72d 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -13,9 +13,11 @@ clap ={ version = "4.0.18", features = ["derive"] } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" tokio = { version = "=1.20.1", features = ["full"] } +tokio-stream = "0.1.11" tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +uuid = { version = "1.1.2", features = ["v4"] } wasi-common = "2.0.0" wasmtime = "2.0.0" wasmtime-wasi = "2.0.0" diff --git a/runtime/README.md b/runtime/README.md index af60bc7cf..193ee80f7 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -8,8 +8,9 @@ $ DISCORD_TOKEN=xxx cargo run In another terminal: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:8000 runtime.Runtime/load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:8000 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs ``` ## shuttle-legacy @@ -39,8 +40,9 @@ Pass the path to `deployer::start` Then in another shell, load a `.so` file and start it up: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/start +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs ``` ## Running the tests diff --git a/runtime/src/legacy/mod.rs b/runtime/src/legacy/mod.rs index 1316f2da4..2d3a8faad 100644 --- a/runtime/src/legacy/mod.rs +++ b/runtime/src/legacy/mod.rs @@ -1,5 +1,6 @@ use std::{ net::{Ipv4Addr, SocketAddr}, + ops::DerefMut, path::PathBuf, str::FromStr, sync::Mutex, @@ -10,15 +11,20 @@ use async_trait::async_trait; use shuttle_common::LogItem; use shuttle_proto::{ provisioner::provisioner_client::ProvisionerClient, - runtime::{runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse}, + runtime::{ + self, runtime_server::Runtime, LoadRequest, LoadResponse, StartRequest, StartResponse, + SubscribeLogsRequest, + }, }; use shuttle_service::{ loader::{LoadedService, Loader}, Factory, Logger, ServiceName, }; -use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Endpoint, Request, Response, Status}; use tracing::{info, instrument, trace}; +use uuid::Uuid; use crate::provisioner_factory::{AbstractFactory, AbstractProvisionerFactory}; @@ -28,14 +34,20 @@ pub struct Legacy { // Mutexes are for interior mutability so_path: Mutex>, port: Mutex>, + logs_rx: Mutex>>, + logs_tx: Mutex>, provisioner_address: Endpoint, } impl Legacy { pub fn new(provisioner_address: Endpoint) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + Self { so_path: Mutex::new(None), port: Mutex::new(None), + logs_rx: Mutex::new(Some(rx)), + logs_tx: Mutex::new(tx), provisioner_address, } } @@ -61,17 +73,21 @@ impl Runtime for Legacy { let service_port = 7001; let service_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), service_port); + let request = request.into_inner(); + let provisioner_client = ProvisionerClient::connect(self.provisioner_address.clone()) .await .expect("failed to connect to provisioner"); let abstract_factory = AbstractProvisionerFactory::new(provisioner_client); - let service_name = ServiceName::from_str(request.into_inner().service_name.as_str()) + let service_name = ServiceName::from_str(request.service_name.as_str()) .map_err(|err| Status::from_error(Box::new(err)))?; let mut factory = abstract_factory.get_factory(service_name); - let (logger, _rx) = get_logger(); + let logs_tx = self.logs_tx.lock().unwrap().clone(); + let deployment_id = Uuid::from_slice(&request.deployment_id).unwrap(); + let logger = Logger::new(logs_tx, deployment_id); let so_path = self .so_path @@ -100,6 +116,30 @@ impl Runtime for Legacy { Ok(Response::new(message)) } + + type SubscribeLogsStream = ReceiverStream>; + + async fn subscribe_logs( + &self, + _request: Request, + ) -> Result, Status> { + let logs_rx = self.logs_rx.lock().unwrap().deref_mut().take(); + + if let Some(mut logs_rx) = logs_rx { + let (tx, rx) = mpsc::channel(1); + + // Move logger items into stream to be returned + tokio::spawn(async move { + while let Some(log) = logs_rx.recv().await { + tx.send(Ok(log.into())).await.unwrap(); + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } else { + Err(Status::internal("logs have already been subscribed to")) + } + } } #[instrument(skip(service))] @@ -126,10 +166,3 @@ async fn load_service( Ok(loader.load(factory, addr, logger).await?) } - -fn get_logger() -> (Logger, UnboundedReceiver) { - let (tx, rx) = mpsc::unbounded_channel(); - let logger = Logger::new(tx, Default::default()); - - (logger, rx) -} diff --git a/runtime/src/main.rs b/runtime/src/main.rs index c11ebcdda..d2eae6557 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -1,4 +1,7 @@ -use std::net::{Ipv4Addr, SocketAddr}; +use std::{ + net::{Ipv4Addr, SocketAddr}, + time::Duration, +}; use clap::Parser; use shuttle_proto::runtime::runtime_server::RuntimeServer; @@ -26,15 +29,17 @@ async fn main() { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001); let provisioner_address = args.provisioner_address; + let mut server_builder = + Server::builder().http2_keepalive_interval(Some(Duration::from_secs(60))); let router = if args.legacy { let legacy = Legacy::new(provisioner_address); let svc = RuntimeServer::new(legacy); - Server::builder().add_service(svc) + server_builder.add_service(svc) } else { let next = Next::new(); let svc = RuntimeServer::new(next); - Server::builder().add_service(svc) + server_builder.add_service(svc) }; router.serve(addr).await.unwrap(); diff --git a/runtime/src/next/mod.rs b/runtime/src/next/mod.rs index 86d185c43..e0d03db45 100644 --- a/runtime/src/next/mod.rs +++ b/runtime/src/next/mod.rs @@ -9,7 +9,10 @@ use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; use serenity::{model::prelude::*, prelude::*}; use shuttle_proto::runtime::runtime_server::Runtime; -use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; +use shuttle_proto::runtime::{ + self, LoadRequest, LoadResponse, StartRequest, StartResponse, SubscribeLogsRequest, +}; +use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::trace; use wasi_common::file::FileCaps; @@ -69,6 +72,15 @@ impl Runtime for Next { Ok(Response::new(message)) } + + type SubscribeLogsStream = ReceiverStream>; + + async fn subscribe_logs( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } } struct BotBuilder {