diff --git a/Cargo.lock b/Cargo.lock index 5779ef872..ceb299131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -321,9 +321,9 @@ dependencies = [ [[package]] name = "clap" -version = "2.33.1" +version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ "ansi_term 0.11.0", "atty", @@ -502,22 +502,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "dcon" -version = "0.1.0" -dependencies = [ - "ansi_term 0.12.1", - "anyhow", - "clap", - "directories", - "env_logger", - "log", - "prettytable-rs", - "rustyline", - "rustyline-derive", - "strsim 0.10.0", -] - [[package]] name = "digest" version = "0.8.1" @@ -538,11 +522,10 @@ dependencies = [ [[package]] name = "directories" -version = "2.0.2" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551a778172a450d7fc12e629ca3b0428d00f6afa9a43da1b630d54604e97371c" +checksum = "f8fed639d60b58d0f53498ab13d26f621fd77569cc6edb031f4cc36a2ad9da0f" dependencies = [ - "cfg-if", "dirs-sys", ] @@ -1132,6 +1115,7 @@ dependencies = [ "async-trait", "base64 0.12.3", "bitflags", + "byteorder", "config", "ed25519-dalek", "floating-duration", @@ -1158,6 +1142,7 @@ dependencies = [ "structopt", "structure", "tempfile", + "thiserror", "url", "uuid", "vergen", @@ -1177,6 +1162,29 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nstar" +version = "0.1.0" +dependencies = [ + "ansi_term 0.12.1", + "anyhow", + "byteorder", + "clap", + "directories", + "env_logger", + "itertools", + "log", + "north_common", + "prettytable-rs", + "rustyline", + "rustyline-derive", + "serde 1.0.114", + "serde_json", + "strsim 0.10.0", + "structopt", + "uuid", +] + [[package]] name = "num-integer" version = "0.1.43" @@ -1841,9 +1849,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "structopt" -version = "0.3.15" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de2f5e239ee807089b62adce73e48c625e0ed80df02c7ab3f068f5db5281065c" +checksum = "a33f6461027d7f08a13715659b2948e1602c31a3756aeae9378bfe7518c72e82" dependencies = [ "clap", "lazy_static", @@ -1852,9 +1860,9 @@ dependencies = [ [[package]] name = "structopt-derive" -version = "0.4.8" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "510413f9de616762a4fbeab62509bf15c729603b72d7cd71280fbca431b1c118" +checksum = "c92e775028122a4b3dd55d58f14fc5120289c69bee99df1d117ae30f84b225c9" dependencies = [ "heck", "proc-macro-error", @@ -1967,6 +1975,26 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thiserror" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfdd070ccd8ccb78f4ad66bf1982dc37f620ef696c6b5028fe2ed83dd3d0d08" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd80fc12f73063ac132ac92aceea36734f04a1d93c1240c6944e23a3b8841793" +dependencies = [ + "proc-macro2", + "quote 1.0.7", + "syn", +] + [[package]] name = "thread_local" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 238918c00..dd7fa8bc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,6 @@ [workspace] members = [ - "dcon", "examples/container/cpueater", "examples/container/crashing", "examples/container/datarw", @@ -12,9 +11,10 @@ members = [ "minijail/minijail-sys", "north", "north_common", + "nstar", "sextant", ] [profile.release] lto = true - +opt-level = 'z' # Optimize for size diff --git a/README.md b/README.md index 07c7b6535..265de9f21 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ So far we tested Northstar on - [x] chroot environment - every process only sees it's own environment - [ ] User-support of configuring and managing network-namespaces - [ ] Dedicated UID for each container -- [ ] Management API of the runtime [#64](https://github.com/esrlabs/northstar/issues/64) +- [x] Management API of the runtime - [x] Signature Check of NPK [#54](https://github.com/esrlabs/northstar/issues/54) - [ ] PID Namespaces [#51](https://github.com/esrlabs/northstar/issues/51) @@ -50,7 +50,7 @@ So far we tested Northstar on * secure boot * verification on each read access prevents manipulation -## How do Northstar images/containers work? +## How do Northstar images/containers work ### Northstar Packages (NPK) @@ -174,6 +174,16 @@ Both `memory` and `cpu` will tell northstar where to mount the cgroup hierarchie * **`device_mapper`** -- Device mapper control file. * **`device_mapper_dev`** -- Prefix of device mapper mappings. +## Controlling the runtime + +The northstar runtime can be controlled our `nstar` application or from a custom application. You can + +- start and stop containers +- uninstall or upgrade containers +- query information about running processes and setttings +- ... +With `nstar` we include a commandline interface that can be used to issue commands to the runtime. Underneth all communication happends via protocol-buffer objects over sockets (see [the runtime API README](north_common/README.md)) + ## Examples If you want to see how containers can look like, take a look at the examples in the examples directory. diff --git a/dcon/.gitignore b/dcon/.gitignore deleted file mode 100644 index f0e3bcacb..000000000 --- a/dcon/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -/target -**/*.rs.bk \ No newline at end of file diff --git a/dcon/Cargo.toml b/dcon/Cargo.toml deleted file mode 100644 index 45b249aeb..000000000 --- a/dcon/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "dcon" -version = "0.1.0" -authors = ["ESRLabs"] -edition = "2018" - -[dependencies] -ansi_term = "0.12.1" -anyhow = "1.0.31" -clap = "2.33.0" -directories = "2.0.2" -env_logger = "0.7.1" -log = "0.4.8" -prettytable-rs = "0.8.0" -rustyline = "6.0.0" -rustyline-derive = "0.3.0" -strsim = "0.10.0" diff --git a/dcon/src/main.rs b/dcon/src/main.rs deleted file mode 100644 index 166e26154..000000000 --- a/dcon/src/main.rs +++ /dev/null @@ -1,386 +0,0 @@ -// Copyright (c) 2019 - 2020 ESRLabs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use ansi_term::Color; -use anyhow::{anyhow, Context, Result}; -use clap::{value_t, App, AppSettings, Arg}; -use log::{info, warn}; -use prettytable::{cell, format, row, Table}; -use rustyline::{ - completion::{Completer, FilenameCompleter, Pair}, - config::OutputStreamType, - error::ReadlineError, - highlight::{Highlighter, MatchingBracketHighlighter}, - hint::{Hinter, HistoryHinter}, - Cmd, CompletionType, Config, EditMode, Editor, Helper, KeyPress, -}; -use rustyline_derive::Validator; -use std::{ - borrow::Cow::{self, Owned}, - collections::HashSet, - env, fs, - io::{Read, Write}, - net::{self, ToSocketAddrs}, -}; - -static PROMPT: &str = ">> "; -const BUFFER_SIZE: usize = 64 * 1024; - -#[derive(Debug)] -struct Opt { - host: String, - verbose: bool, - history: bool, - cmd: Option, -} - -fn cli() -> Opt { - let app = App::new("dcon") - .about("Debug console client") - .setting(AppSettings::AllowExternalSubcommands) - .setting(AppSettings::ArgRequiredElseHelp) - .setting(AppSettings::ColoredHelp) - .setting(AppSettings::UnifiedHelpMessage) - .arg( - Arg::with_name("verbose") - .short("v") - .long("verbose") - .help("Verbose mode"), - ) - .arg( - Arg::with_name("disable-history") - .short("d") - .long("disable-history") - .help("Disable history functionality"), - ) - .arg( - Arg::with_name("host") - .short("h") - .long("host") - .takes_value(true) - .default_value("127.0.0.1:4242") - .help("Console host address"), - ); - let matches = app.get_matches(); - let host = value_t!(matches.value_of("host"), String).unwrap(); - let verbose = matches.is_present("verbose"); - let history = !matches.is_present("disable-history"); - let cmd = match matches.subcommand() { - (c, Some(matches)) => { - let arg = vec![ - c.to_string(), - matches - .values_of("") - .map(|v| v.collect::>().join(" ")) - .unwrap_or_else(|| "".into()), - ] - .join(" "); - Some(arg) - } - _ => None, - }; - Opt { - host, - verbose, - history, - cmd, - } -} - -#[derive(Validator)] -struct DConHelper<'a> { - file_name: FilenameCompleter, - brackets_highliter: MatchingBracketHighlighter, - history_hinter: HistoryHinter, - command_hinter: CommandHinter<'a>, -} - -impl<'a> Completer for DConHelper<'a> { - type Candidate = Pair; - - fn complete( - &self, - line: &str, - pos: usize, - ctx: &rustyline::Context<'_>, - ) -> Result<(usize, Vec), ReadlineError> { - self.file_name.complete(line, pos, ctx) - } -} - -impl Hinter for DConHelper<'_> { - fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option { - let hint = self.command_hinter.hint(line, pos, ctx); - if hint.is_some() { - hint - } else { - self.history_hinter.hint(line, pos, ctx) - } - } -} - -impl Highlighter for DConHelper<'_> { - fn highlight_prompt<'b, 's: 'b, 'p: 'b>( - &'s self, - prompt: &'p str, - _default: bool, - ) -> Cow<'b, str> { - if prompt == PROMPT { - Owned(Color::Green.bold().paint(PROMPT).to_string()) - } else { - PROMPT.into() - } - } - - fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> { - Owned(Color::Fixed(240).paint(hint).to_string()) - } - - fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> { - self.brackets_highliter.highlight(line, pos) - } - - fn highlight_char(&self, line: &str, pos: usize) -> bool { - self.brackets_highliter.highlight_char(line, pos) - } -} - -impl Helper for DConHelper<'_> {} - -struct CommandHinter<'a> { - commands: Vec<&'a str>, -} - -impl<'a> CommandHinter<'a> { - fn new(commands: Vec<&str>) -> CommandHinter { - CommandHinter { commands } - } - - fn hint(&self, line: &str, pos: usize, _ctx: &rustyline::Context<'_>) -> Option { - did_you_mean(&line, &self.commands).and_then(|s| { - if s.len() > pos { - Some(s[pos..].into()) - } else { - None - } - }) - } -} - -fn did_you_mean<'a, T: ?Sized, I>(v: &str, possible_values: I) -> Option<&'a str> -where - T: AsRef + 'a, - I: IntoIterator, -{ - let mut candidate: Option<(f64, &str)> = None; - for pv in possible_values { - let confidence = strsim::jaro_winkler(v, pv.as_ref()); - if confidence > 0.8 && (candidate.is_none() || (candidate.as_ref().unwrap().0 < confidence)) - { - candidate = Some((confidence, pv.as_ref())); - } - } - match candidate { - None => None, - Some((_, candidate)) => Some(candidate), - } -} - -fn run(addr: &str, cmd: &str) -> Result { - if let Some(addr) = addr.to_socket_addrs()?.next() { - let mut socket = net::TcpStream::connect(addr) - .with_context(|| format!("Failed to connect to {}", addr))?; - - let mut cmd = cmd.to_string(); - cmd.push('\n'); - socket - .write_all(cmd.as_bytes()) - .context("Failed to send command")?; - - let mut reply = String::new(); - let mut buffer = [0u8; BUFFER_SIZE]; - loop { - let n = socket - .read(&mut buffer) - .context("Failed to read from connection")?; - let s = String::from_utf8(buffer[..n].to_vec()).context("Received invalid reply")?; - reply.push_str(&s); - if !n < BUFFER_SIZE { - continue; - } - break; - } - Ok(reply) - } else { - Err(anyhow!("Cannot resolve address {}", addr)) - } -} - -fn run_help(opt: &Opt) -> Result<()> { - let reply = run(&opt.host, "help")?; - let mut table = Table::new(); - let format = format::FormatBuilder::new() - .column_separator('|') - .separators(&[], format::LineSeparator::new('-', '+', '+', '+')) - .padding(1, 1) - .build(); - table.set_format(format); - table.set_titles(row!["Command", "Subcommands", "Help"]); - - let mut last_cmd = ""; - for l in reply.lines().filter(|n| !n.is_empty()) { - let mut split = l.split_terminator(':').map(str::trim); - if let Some(e) = split.next() { - let mut cmd = e.split(' ').map(str::trim); - let help = split.map(str::trim).collect::>().join(" "); - if let Some(e) = cmd.next() { - let args = cmd.collect::>().join(" "); - let c = if last_cmd == e { "" } else { e }; - last_cmd = e; - table.add_row(row![bFg->c, Fc->args, help]); - } - } - } - - table.printstd(); - Ok(()) -} - -fn run_cmd( - cmd: &str, - opt: &Opt, - commands: &[&str], - editor: &mut Editor>, - commands_entered: Option<&mut HashSet>, -) -> Result<()> { - let cmd = cmd.trim(); - let stripped_cmd = cmd.replace(" ", ""); - - if cmd == "help" || cmd == "?" { - run_help(opt) - } else if stripped_cmd.is_empty() { - Ok(()) - } else if !commands - .iter() - .map(|c| c.replace(" ", "")) - .any(|c| stripped_cmd.starts_with(&c)) - { - println!("Unknown command: \"{}\"", Color::Yellow.paint(cmd)); - if let Some(suggestion) = did_you_mean(cmd, commands) { - println!("Did you mean: \"{}\"?", Color::Green.paint(suggestion)); - } - Ok(()) - } else { - let reply = run(&opt.host, cmd).map(|r| if r.is_empty() { "none".into() } else { r })?; - if let Some(commands_entered) = commands_entered { - if !commands_entered.contains(cmd) { - editor.add_history_entry(cmd); - commands_entered.insert(cmd.into()); - } - } - println!("{}", reply); - Ok(()) - } -} - -fn main() -> Result<()> { - let opt = cli(); - - if opt.verbose { - env::set_var("RUST_LOG", "dcon=debug"); - env_logger::init(); - info!("Verbose mode is enabled"); - } else { - env::set_var("RUST_LOG", "dcon=warn"); - env_logger::init(); - } - - // Run help once to populate hinter and suggestions - let help = run(&opt.host, "help")?; - // Get a list of all commands for hinter and suggestions - let commands: Vec<&str> = help - .lines() - .filter(|n| !n.is_empty()) - .flat_map(|l| l.split_terminator(':').next()) - .collect(); - - let config = Config::builder() - .auto_add_history(false) - .completion_type(CompletionType::List) - .edit_mode(EditMode::Vi) - .history_ignore_space(true) - .history_ignore_dups(true) - .output_stream(OutputStreamType::Stdout) - .max_history_size(1000) - .build(); - let h = DConHelper { - file_name: FilenameCompleter::new(), - brackets_highliter: MatchingBracketHighlighter::new(), - history_hinter: HistoryHinter {}, - command_hinter: CommandHinter::new(commands.clone()), - }; - let mut rl = Editor::with_config(config); - rl.set_helper(Some(h)); - rl.bind_sequence(KeyPress::Tab, Cmd::CompleteHint); - rl.bind_sequence(KeyPress::Ctrl('L'), Cmd::ClearScreen); - - // Single command mode - if let Some(ref cmd) = opt.cmd { - run_cmd(cmd, &opt, &commands, &mut rl, None) - } else { - let history = if opt.history { - let history = directories::ProjectDirs::from("com", "esrlabs", "dcon") - .map(|d| d.config_dir().join("history")) - .ok_or_else(|| anyhow!("Failed to get config directory"))?; - if history.exists() { - info!("Loading history from {:?}", history); - rl.load_history(&history) - .context("Failed to load history")?; - } - Some(history) - } else { - info!("History is disabled"); - None - }; - - // Prompt loop - let mut commands_entered = HashSet::new(); - loop { - match rl.readline(PROMPT) { - Ok(line) => run_cmd(&line, &opt, &commands, &mut rl, Some(&mut commands_entered))?, - Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => break, - Err(err) => { - warn!("Error: {}", err); - break; - } - } - } - - // Maybe store history... - if opt.history { - if let Some(ref history) = history { - if let Some(parent) = history.parent() { - if !parent.exists() { - info!("Creating dcon config dir {:?}", parent); - fs::create_dir_all(parent) - .with_context(|| format!("Failed to create {}", parent.display()))?; - } - info!("Saving history to {:?}", history); - rl.save_history(history)?; - } - } - } - Ok(()) - } -} diff --git a/examples/README.md b/examples/README.md index fe60a65a3..a8a4cf33f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,4 +1,4 @@ -## Running the examples +# Running the examples To run the examples, you first need to create the `NPK`s for the architecture you plan to run them on. @@ -7,13 +7,14 @@ In the toplevel rakefile we provide a task to build the `NPK`s for different tar Once you have the `*.npk` packages in your registry, you can start the northstar process and configure this registry. When the northstar runtime is up and running, you can issue control commands to it via a socket. -We provide an utility that can easily communicate with the northstar daemon and send it commands (`dcon`) +We provide an utility that can easily communicate with the northstar daemon and send it commands (`nstar`) -Build `dcon` using the raketask `rake build:dcon` +Build `nstar` using the raketask `rake build:nstar` -After that just start `dcon` (found in `./target/release/dcon`). It is an interactive client that offers this help: -``` -➜ northstar git:(master) ✗ ./target/release/dcon +After that just start `nstar` (found in `./target/release/nstar`). It is an interactive client that offers this help: + +``` shell +➜ northstar git:(master) ✗ ./target/release/nstar >> help Command | Subcommands | Help help | | Display help text diff --git a/north/Cargo.toml b/north/Cargo.toml index 451773025..c8a3c2667 100644 --- a/north/Cargo.toml +++ b/north/Cargo.toml @@ -16,6 +16,7 @@ async-std = { version = "1.6.2", features = ["attributes", "unstable"] } async-trait = "0.1.36" base64 = "0.12.3" bitflags = "1.2.1" +byteorder = "1.3.4" config = "0.10.1" ed25519-dalek = "1.0.0-pre.4" floating-duration = "0.1.2" @@ -40,6 +41,7 @@ stop-token = "0.1.2" structopt = "0.3.15" structure = "0.1.2" tempfile = "3.1.0" +thiserror = "1.0.20" url = "2.1.1" uuid = { version = "0.8.1", features = [ "v4"] } zip = "0.5.6" diff --git a/north/src/console.rs b/north/src/console.rs index e3cb06a1f..ec2f83cbf 100644 --- a/north/src/console.rs +++ b/north/src/console.rs @@ -12,328 +12,133 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Event, EventTx, Name, State, TerminationReason, SETTINGS}; -use anyhow::{anyhow, Context, Result}; -use async_std::{io, net::TcpListener, path::PathBuf, prelude::*, sync, task}; -use itertools::Itertools; -use log::{debug, warn, *}; -use prettytable::{format, Table}; -use std::{iter, time}; - -/// Helptext displayed on the `help` command. The `dcon` tool parses this text -/// and creates suggestions and completions. Ensure to a correct helptext when -/// adding/removing/changing commands. -const HELP: &str = "\ - help: Display help text\n\ - list: List all loaded images\n\ - ps: List running instances\n\ - shutdown: Stop the north runtime\n\ - settings: Dump north configuration\n\ - start: PATTERN Start containers matching PATTERN e.g 'start hello*'. Omit PATTERN to start all containers\n\ - stop: PATTERN Stop all containers matching PATTERN. Omit PATTERN to stop all running containers\n\ - uninstall: PATTERN: Unmount and remove all containers matching PATTERN\n\ - update: Run update with provided ressources\n\ - versions: Version list of installed applications"; +use crate::{Event, EventTx, State, TerminationReason, SETTINGS}; +use anyhow::{Context, Result}; +use api::{ + Container, Message, Payload, Process, Request, Response, ShutdownResult, StartResult, + StopResult, +}; +use async_std::{ + io::{self, Write}, + net::TcpListener, + prelude::*, + sync::{self, Receiver, Sender}, + task, +}; +use byteorder::{BigEndian, ByteOrder}; +use io::{ErrorKind, Read}; +use log::{debug, info, warn}; +use north_common::api; pub async fn init(tx: &EventTx) -> Result<()> { - let rx = serve().await?; - let tx = tx.clone(); - // Spawn a task that handles lines received on the debug port. - task::spawn(async move { - while let Ok((line, tx_reply)) = rx.recv().await { - tx.send(Event::Console(line, tx_reply)).await; - } - }); - + serve(tx.clone()).await?; Ok(()) } -pub async fn process(state: &mut State, command: &str, reply: sync::Sender) -> Result<()> { - info!("Running \'{}\'", command); - let mut commands = command.split_whitespace(); +pub async fn process( + state: &mut State, + message: &Message, + response_tx: sync::Sender, +) -> Result<()> { + let payload = &message.payload; + if let Payload::Request(ref request) = payload { + let response = match request { + Request::Containers => { + let containers = state + .applications() + .map(|app| { + Container { + manifest: app.manifest().clone(), + process: app.process_context().map(|f| Process { + pid: f.process().pid(), + uptime: f.uptime().as_nanos() as u64, + memory: { + #[cfg(not(any(target_os = "linux", target_os = "android")))] + { + None + } + #[cfg(any(target_os = "linux", target_os = "android"))] + { + // TODO + const PAGE_SIZE: usize = 4096; + let pid = f.process().pid(); + let statm = procinfo::pid::statm(pid as i32) + .expect("Failed get statm"); + Some(api::Memory { + size: (statm.size * PAGE_SIZE) as u64, + resident: (statm.resident * PAGE_SIZE) as u64, + shared: (statm.share * PAGE_SIZE) as u64, + text: (statm.text * PAGE_SIZE) as u64, + data: (statm.data * PAGE_SIZE) as u64, + }) + } + }, + }), + } + }) + .collect(); - if let Some(cmd) = commands.next() { - let args = commands.collect::>(); - let start_timestamp = time::Instant::now(); - match match cmd { - "help" => help(), - "list" => list(state).await, - "ps" => ps(state).await, - "settings" => settings(), - "shutdown" => shutdown(state).await, - "start" => start(state, &args).await, - "stop" => stop(state, &args).await, - "uninstall" => uninstall(state, &args).await, - "update" => update(state, &args).await, - "versions" => versions(state), - _ => Err(anyhow!("Unknown command: {}", command)), - } { - Ok(mut r) => { - r.push_str(&format!("Duration: {:?}\n", start_timestamp.elapsed())); - reply.send(r).await + Response::Containers(containers) } - Err(e) => { - let msg = format!("Failed to run: {} {:?}: {}\n", cmd, args, e); - reply.send(msg).await + Request::Start(name) => match state.start(&name).await { + Ok(_) => Response::Start { + result: StartResult::Success, + }, + Err(e) => Response::Start { + result: StartResult::Error(e.to_string()), + }, + }, + Request::Stop(name) => { + match state + .stop( + &name, + std::time::Duration::from_secs(1), + TerminationReason::Stopped, + ) + .await + { + Ok(_) => Response::Stop { + result: StopResult::Success, + }, + Err(e) => Response::Stop { + result: StopResult::Error(e.to_string()), + }, + } } - } - } else { - reply.send("Invalid command".into()).await - } - Ok(()) -} - -/// Return the help text -fn help() -> Result { - Ok(HELP.into()) -} - -/// List all known containers instances and their state. -async fn list(state: &State) -> Result { - to_table( - vec![vec![ - "Name".to_string(), - "Version".to_string(), - "Running".to_string(), - "Type".to_string(), - ]] - .iter() - .cloned() - .chain( - state - .applications() - .sorted_by_key(|app| app.name()) - .map(|app| { - vec![ - app.name().to_string(), - app.version().to_string(), - app.process_context() - .map(|c| format!("Yes (pid: {})", c.process().pid())) - .unwrap_or_else(|| "No".to_string()), - if app.container().is_resource_container() { - "resource" - } else { - "app" - } - .to_owned(), - ] - }), - ), - ) -} - -/// List all running applications. -#[cfg(all(not(target_os = "android"), not(target_os = "linux")))] -async fn ps(state: &State) -> Result { - to_table( - vec![vec![ - "Name".to_string(), - "Version".to_string(), - "Uptime".to_string(), - ]] - .iter() - .cloned() - .chain( - state - .applications() - .filter_map(|app| app.process_context().map(|p| (app, p))) - .sorted_by_key(|(app, _)| app.name()) - .map(|(app, context)| { - vec![ - app.name().to_string(), - app.version().to_string(), - format!("{:?}", context.uptime()), - ] - }), - ), - ) -} - -/// List all running applications. -#[cfg(any(target_os = "android", target_os = "linux"))] -async fn ps(state: &State) -> Result { - use pretty_bytes::converter::convert; - const PAGE_SIZE: usize = 4096; - - let mut result = vec![[ - "Name", "Version", "PID", "Size", "Resident", "Shared", "Text", "Data", "Uptime", - ] - .iter() - .map(ToString::to_string) - .collect()]; - - for app in state.applications().sorted_by_key(|app| app.name()) { - if let Some(ref context) = app.process_context() { - let pid = context.process().pid(); - let statm = procinfo::pid::statm(pid as i32)?; - result.push(vec![ - app.name().to_string(), - app.version().to_string(), - pid.to_string(), - convert((statm.size * PAGE_SIZE) as f64), - convert((statm.resident * PAGE_SIZE) as f64), - convert((statm.share * PAGE_SIZE) as f64), - convert((statm.text * PAGE_SIZE) as f64), - convert((statm.data * PAGE_SIZE) as f64), - format!("{:?}", context.uptime()), - ]); - } - } - - to_table(result) -} - -/// Start applications. If `args` is empty *all* known applications that -/// are not in a running state are started. If a argument is supplied it -/// is used to construct a Regex and all container (names) matching that -/// Regex are attempted to be started. -async fn start(state: &mut State, args: &[&str]) -> Result { - let re = arg_regex(args)?; - - let mut result = vec![vec![ - "Name".to_string(), - "Result".to_string(), - "Duration".to_string(), - ]]; - let apps = state - .applications() - // Filter for not already running containers - .filter(|app| app.process_context().is_none()) - // Filter ressource container that are not startable - .filter(|app| !app.container().is_resource_container()) - // Filter matching container - .filter(|app| re.is_match(app.name())) - // Sort container by name - .sorted_by_key(|app| app.name().clone()) - .map(|app| app.name().clone()) - .collect::>(); - for app in &apps { - let start = time::Instant::now(); - match state.start(&app, 0).await { - Ok(_) => result.push(vec![ - app.to_string(), - "Ok".to_string(), - format!("{:?}", start.elapsed()), - ]), - Err(e) => result.push(vec![ - app.to_string(), - format!("Failed: {:?}", e), - format!("{:?}", start.elapsed()), - ]), - } - } - - to_table(result) -} - -/// Dump settings -fn settings() -> Result { - Ok(format!("{}", *SETTINGS)) -} - -/// Stop one, some or all containers. See start for the argument handling. -async fn stop(state: &mut State, args: &[&str]) -> Result { - let re = arg_regex(args)?; - - let mut result = vec![vec![ - "Name".to_string(), - "Result".to_string(), - "Duration".to_string(), - ]]; - let apps = state - .applications() - .filter(|app| app.process_context().is_some()) - .filter(|app| re.is_match(app.name())) - .map(|app| app.name().clone()) - .collect::>(); - for app in &apps { - let timeout = time::Duration::from_secs(10); - let reason = TerminationReason::Stopped; - let start = time::Instant::now(); - match state.stop(&app, timeout, reason).await { - Ok(()) => result.push(vec![ - app.to_string(), - "Ok".to_string(), - format!("{:?}", start.elapsed()), - ]), - - Err(e) => result.push(vec![ - app.to_string(), - e.to_string(), - format!("{:?}", start.elapsed()), - ]), - } - } - - to_table(result) -} - -/// Umount and remove a containers. See `start` for the argument handling. -/// The data directory is not removed. This needs discussion. -async fn uninstall(state: &mut State, args: &[&str]) -> Result { - let re = arg_regex(args)?; - - let mut result = vec![vec!["Name".to_string(), "Result".to_string()]]; - - let to_uninstall = state - .applications - .values() - .filter(|app| app.process_context().is_none()) - .filter(|app| re.is_match(app.name())) - .map(|app| app.name()) - .cloned() - .collect::>(); - - for app in &to_uninstall { - match state.uninstall(&app).await { - Ok(()) => result.push(vec![app.to_string(), "Ok".to_string()]), - Err(e) => result.push(vec![app.to_string(), e.to_string()]), - } - } - - to_table(result) -} - -/// Trigger the update module. -async fn update(state: &mut State, args: &[&str]) -> Result { - if args.len() != 1 { - return Err(anyhow!("Invalid arguments for update command")); - } - - let dir = PathBuf::from(args[0]); - - if !dir.exists().await { - let err = anyhow!("Update directory {} does not exists", dir.display()); - Err(err) + Request::Install(_) => Response::Install { + result: api::InstallationResult::Error("unimplemented".into()), + }, + Request::Uninstall { name, version } => { + let _ = name; + let _ = version; + Response::Uninstall { + result: api::UninstallResult::Error("unimplemented".into()), + } + } + Request::Shutdown => match state.shutdown().await { + Ok(_) => Response::Shutdown { + result: ShutdownResult::Success, + }, + Err(e) => Response::Shutdown { + result: ShutdownResult::Error(e.to_string()), + }, + }, + }; + + let response_message = Message { + id: message.id.clone(), + payload: Payload::Response(response), + }; + response_tx.send(response_message).await; + Ok(()) } else { - let updates = crate::update::update(state, &dir).await?; - - let mut result = vec![vec![ - "Name".to_string(), - "From".to_string(), - "To".to_string(), - ]]; - for update in &updates { - result.push(vec![ - update.0.to_string(), - (update.1).0.to_string(), - (update.1).1.to_string(), - ]) - } - to_table(result) + // TODO + panic!("Received message is not a request"); } } -/// Send a shutdown command to the main loop. -async fn shutdown(state: &mut State) -> Result { - let stop = stop(state, &[]).await?; - state.tx().send(Event::Shutdown).await; - - Ok(stop) -} - /// Open a TCP socket and read lines terminated with `\n`. -async fn serve() -> Result)>> { +async fn serve(tx: EventTx) -> Result<()> { let address = &SETTINGS.console_address; debug!("Starting console on {}", address); @@ -341,93 +146,85 @@ async fn serve() -> Result)>> { let listener = TcpListener::bind(address) .await .with_context(|| format!("Failed to open listener on {}", address))?; - let (tx, rx) = sync::channel(1000); task::spawn(async move { let mut incoming = listener.incoming(); // Spawn a task for each incoming connection. while let Some(stream) = incoming.next().await { - let (tx_reply, rx_reply) = sync::channel::(10); - + let mut tx_main = tx.clone(); if let Ok(stream) = stream { let peer = match stream.peer_addr() { Ok(peer) => peer, Err(e) => { - warn!("Failed to get peer from console connection: {}", e); - return; + warn!("Failed to get peer from command connection: {}", e); + continue; } }; debug!("Client {:?} connected", peer); - let tx = tx.clone(); + // Spawn a task that handles this client task::spawn(async move { let (reader, writer) = &mut (&stream, &stream); - let reader = io::BufReader::new(reader); - let mut lines = reader.lines(); - while let Some(Ok(line)) = lines.next().await { - let line = line.trim(); - tx.send((line.into(), tx_reply.clone())).await; - if let Ok(reply) = rx_reply.recv().await { - if let Err(e) = writer.write_all(reply.as_bytes()).await { - warn!("Error on console connection {:?}: {}", peer, e); - break; + let mut reader = io::BufReader::new(reader); + let (mut tx, mut rx) = sync::channel::(10); + + loop { + if let Err(e) = + connection(&mut reader, writer, &mut tx_main, &mut rx, &mut tx).await + { + match e.kind() { + ErrorKind::UnexpectedEof => info!("Client {:?} disconnected", peer), + _ => warn!("Error on connection to {:?}: {:?}", peer, e), } + break; } } }); } } }); - Ok(rx) + Ok(()) } -/// List versions of currently known containers and applications. -fn versions(state: &mut State) -> Result { - let versions = state - .applications() - .map(|app| app.manifest()) - .map(|manifest| { - ( - manifest.name.clone(), - manifest.version.clone(), - manifest.arch.clone(), - ) - }) - .collect::>(); - serde_json::to_string(&versions).context("Failed to encode manifest to json") -} +async fn connection( + reader: &mut R, + writer: &mut W, + tx: &mut EventTx, + rx_reply: &mut Receiver, + tx_reply: &mut Sender, +) -> io::Result<()> { + // Read frame length + let mut buf = [0u8; 4]; + reader.read_exact(&mut buf).await?; + let frame_len = BigEndian::read_u32(&buf) as usize; + + // Read payload + let mut buffer = vec![0; frame_len]; + reader.read_exact(&mut buffer).await?; + + // Deserialize message + let message: Message = serde_json::from_slice(&buffer)?; + + // Send message and response handle to main loop + tx.send(Event::Console(message, tx_reply.clone())).await; + + // Wait for reply of main loop + // TODO: timeout + let reply = rx_reply + .recv() + .await + .map_err(|e| io::Error::new(ErrorKind::Other, e))?; -/// Format something iterateable into a ascii table. The first row of the table input -/// contains the column titles. The table cannot be empty. -fn to_table, I: iter::IntoIterator, S: ToString>( - table: T, -) -> Result { - let mut t = Table::new(); - let format = prettytable::format::FormatBuilder::new() - .column_separator('|') - .separators(&[], format::LineSeparator::new('-', '+', '+', '+')) - .padding(1, 1) - .build(); - t.set_format(format); - let mut rows = table.into_iter(); - let titles = rows.next().ok_or_else(|| anyhow!("Missing titles"))?.into(); - t.set_titles(titles); - for r in rows { - t.add_row(r.into()); - } + // Serialize reply + let reply = + serde_json::to_string_pretty(&reply).map_err(|e| io::Error::new(ErrorKind::Other, e))?; - let mut result = vec![]; - t.print(&mut result).context("Failed to format table")?; - String::from_utf8(result).context("Invalid table content") -} + // Send reply + let mut buffer = [0u8; 4]; + BigEndian::write_u32(&mut buffer, reply.len() as u32); + writer.write_all(&buffer).await?; + writer.write_all(reply.as_bytes()).await?; -fn arg_regex(args: &[&str]) -> Result { - match args.len() { - 1 => regex::Regex::new(args[0]) - .with_context(|| format!("Invalid container name regex {}", args[0])), - 0 => regex::Regex::new(".*") - .with_context(|| format!("Invalid container name regex {}", args[0])), - _ => Err(anyhow!("Arguments invalid. Use `start PATTERN`",)), - } + Ok(()) } diff --git a/north/src/main.rs b/north/src/main.rs index a10524eda..d4c65db38 100644 --- a/north/src/main.rs +++ b/north/src/main.rs @@ -22,7 +22,7 @@ use anyhow::{Context, Error, Result}; use async_std::{fs, sync}; use log::*; use nix::unistd::{self, chown}; -use north_common::manifest::Name; +use north_common::{api, manifest::Name}; mod console; mod keys; @@ -32,7 +32,6 @@ mod npk; mod process; mod settings; mod state; -mod update; pub const SYSTEM_UID: u32 = 1000; pub const SYSTEM_GID: u32 = 1000; @@ -43,9 +42,10 @@ pub use state::State; pub type EventTx = sync::Sender; #[allow(clippy::large_enum_variant)] +#[derive(Debug)] pub enum Event { - /// Incomming console command - Console(String, sync::Sender), + /// Incomming command + Console(api::Message, sync::Sender), /// A instance exited with return code Exit(Name, i32), /// Out of memory event occured @@ -163,7 +163,7 @@ async fn main() -> Result<()> { .collect::>(); for app in &autostart_apps { info!("Autostarting {}", app); - if let Err(e) = state.start(&app, 0).await { + if let Err(e) = state.start(&app).await { warn!("Failed to start {}: {}", app, e); } } @@ -175,8 +175,8 @@ async fn main() -> Result<()> { // Debug console commands are handled via the main loop in order to get access // to the global state. Therefore the console server receives a tx handle to the // main loop and issues `Event::Console`. Processing of the command takes place - // in the console module again but with access to `state`. - Event::Console(cmd, txr) => console::process(&mut state, &cmd, txr).await?, + // in the console module but with access to `state`. + Event::Console(msg, txr) => console::process(&mut state, &msg, txr).await?, // The OOM event is signaled by the cgroup memory monitor if configured in a manifest. // If a out of memory condition occours this is signaled with `Event::Oom` which // carries the id of the container that is oom. diff --git a/north/src/settings.rs b/north/src/settings.rs index 06e05afa7..09526d7e6 100644 --- a/north/src/settings.rs +++ b/north/src/settings.rs @@ -16,7 +16,7 @@ use async_std::path::PathBuf; use std::{fmt, path::Path}; use structopt::StructOpt; -const DEFAULT_CONSOLE_ADDRESS: &str = "127.0.0.1:4242"; +const DEFAULT_CONSOLE_ADDRESS: &str = "127.0.0.1:4200"; lazy_static::lazy_static! { #[derive(Debug)] diff --git a/north/src/state.rs b/north/src/state.rs index 3fb097b30..6aea13468 100644 --- a/north/src/state.rs +++ b/north/src/state.rs @@ -12,12 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{keys, npk, npk::Container, process::Process, EventTx, Name, TerminationReason}; -use anyhow::{anyhow, Result}; +use crate::{keys, npk, npk::Container, process::Process, Event, EventTx, Name, TerminationReason}; +use anyhow::{Error as AnyhowError, Result}; +use async_std::path::Path; use ed25519_dalek::PublicKey; use log::{info, warn}; -use north_common::manifest::{Manifest, OnExit, Version}; -use std::{collections::HashMap, fmt, iter, time}; +use north_common::manifest::{Manifest, Version}; +use std::{ + collections::{HashMap, HashSet}, + fmt, iter, result, time, +}; +use thiserror::Error; +use time::Duration; + +#[derive(Error, Debug)] +pub enum Error { + #[error("No application found")] + UnknownApplication, + #[error("Missing resouce {0}")] + MissingResource(String), + #[error("Failed to spawn process")] + ProcessError(AnyhowError), + #[error("Application(s) \"{0:?}\" is/are running")] + ApplicationRunning(Vec), + #[error("Failed to uninstall")] + UninstallationError(AnyhowError), + #[error("Failed to install")] + InstallationError(AnyhowError), + #[error("Application is not started")] + ApplicationNotRunning, +} #[derive(Debug)] pub struct State { @@ -46,7 +70,7 @@ impl ProcessContext { &self.process } - pub fn uptime(&self) -> time::Duration { + pub fn uptime(&self) -> Duration { self.start_timestamp.elapsed() } } @@ -123,79 +147,84 @@ impl State { Ok(()) } - /// Remove and umount a specific container - pub async fn uninstall(&mut self, name: &str) -> Result<()> { - if let Some(app) = self.applications.get_mut(name) { - if app.process_context().is_none() { - info!("Removing {}", app); - npk::uninstall(app.container()).await?; - self.applications.remove(name); - Ok(()) - } else { - warn!("Cannot uninstall running container {}", app); - Err(anyhow!("Cannot uninstall running container {}", app)) - } - } else { - warn!("Cannot uninstall unknown container {}", name); - Err(anyhow!("Cannot uninstall unknown container {}", name)) - } - } + pub async fn start(&mut self, name: &str) -> result::Result<(), Error> { + let tx = self.tx.clone(); - /// Start a container with name `name` - pub async fn start(&mut self, name: &str, incarnation: u32) -> Result<()> { - let available_resource_ids: Vec = self + // Setup set of available resources + let resources = self .applications .values() - .map(|a| &a.container) - .filter(|c| c.is_resource_container()) - .map(|c| c.manifest.name.clone()) - .collect(); - if let Some(app) = self.applications.get_mut(name) { - if app.container.is_resource_container() { - warn!("Cannot start resource containers ({})", app); - return Err(anyhow!("Attempted to start resource container {}", name)); - } - if let Some(required_resources) = &app.container.manifest.resources { - for r in required_resources { - if !available_resource_ids.contains(&r.name) { - warn!( - "Container {} missing required resource \"{}\")", - name, &r.name - ); - return Err(anyhow!( - "Failed to start {} because of missing resource \"{}\"", - name, - r.name - )); - } + .filter_map(|app| { + if app.container.is_resource_container() { + Some(app.container.manifest.name.clone()) + } else { + None } - } - info!("Starting {}", app); - let process = Process::spawn(&app.container, self.tx.clone()).await?; - #[cfg(any(target_os = "android", target_os = "linux"))] - let cgroups = if let Some(ref c) = app.manifest().cgroups { - log::debug!("Creating cgroup configuration for {}", app); - let cgroups = - crate::linux::cgroups::CGroups::new(app.name(), c, self.tx.clone()).await?; - - log::debug!("Assigning {} to cgroup {}", process.pid(), app); - cgroups.assign(process.pid()).await?; - Some(cgroups) - } else { - None - }; - app.process = Some(ProcessContext { - process, - incarnation, - start_timestamp: time::Instant::now(), - #[cfg(any(target_os = "android", target_os = "linux"))] - cgroups, - }); - info!("Started {}", app); - Ok(()) + }) + .collect::>(); + + // Look for app + let app = if let Some(app) = self.applications.get_mut(name) { + app } else { - Err(anyhow!("Invalid application {}", name)) + return Err(Error::UnknownApplication); + }; + + // Check if application is already running + if app.process.is_some() { + warn!("Application {} is already running", app.manifest().name); + return Err(Error::ApplicationRunning(vec![app.manifest().name.clone()])); } + + // Check if app is a resource container that cannot be started + if app.container.is_resource_container() { + warn!("Cannot start resource containers ({})", app); + return Err(Error::UnknownApplication); + } + + // Check for all required resources + if let Some(required_resources) = &app.container.manifest.resources { + for r in required_resources { + if !resources.contains(&r.name) { + return Err(Error::MissingResource(r.name.clone())); + } + } + } + + // Spawn process + info!("Starting {}", app); + let process = Process::spawn(&app.container, tx) + .await + .map_err(Error::ProcessError)?; + + // CGroups + #[cfg(any(target_os = "android", target_os = "linux"))] + let cgroups = if let Some(ref c) = app.manifest().cgroups { + log::debug!("Creating cgroup configuration for {}", app); + let cgroups = crate::linux::cgroups::CGroups::new(app.name(), c, self.tx.clone()) + .await + .map_err(Error::ProcessError)?; + + log::debug!("Assigning {} to cgroup {}", process.pid(), app); + cgroups + .assign(process.pid()) + .await + .map_err(Error::ProcessError)?; + Some(cgroups) + } else { + None + }; + + app.process = Some(ProcessContext { + process, + incarnation: 0, + start_timestamp: time::Instant::now(), + #[cfg(any(target_os = "android", target_os = "linux"))] + cgroups, + }); + info!("Started {}", app); + + Ok(()) } /// Stop a application. Timeout specifies the time until the process is @@ -203,42 +232,84 @@ impl State { pub async fn stop( &mut self, name: &str, - timeout: time::Duration, + timeout: Duration, reason: TerminationReason, - ) -> Result<()> { + ) -> result::Result<(), Error> { if let Some(app) = self.applications.get_mut(name) { if let Some(mut context) = app.process.take() { info!("Stopping {}", app); let status = context .process .terminate(timeout, Some(reason)) - .await? + .await + .map_err(Error::ProcessError)? .await; #[cfg(any(target_os = "android", target_os = "linux"))] { if let Some(cgroups) = context.cgroups { log::debug!("Destroying cgroup configuration of {}", app); - cgroups.destroy().await?; + cgroups.destroy().await.map_err(Error::ProcessError)?; } } info!("Stopped {} {:?}", app, status); + Ok(()) } else { warn!("Application {} is not running", app); + Err(Error::ApplicationNotRunning) } + } else { + Err(Error::UnknownApplication) + } + } + + pub async fn shutdown(&self) -> result::Result<(), Error> { + if self + .applications + .values() + .all(|app| app.process_context().is_none()) + { + self.tx.send(Event::Shutdown).await; Ok(()) } else { - Err(anyhow!("Invalid application {}", name)) + let apps = self + .applications + .values() + .filter_map(|app| app.process_context().map(|_| app.name().to_string())) + .collect(); + Err(Error::ApplicationRunning(apps)) + } + } + + /// Install a npk from give path + pub async fn install(&mut self, npk: &Path) -> result::Result<(), Error> { + npk::install(self, npk) + .await + .map_err(Error::InstallationError)?; + Ok(()) + } + + /// Remove and umount a specific app + pub async fn uninstall(&mut self, app: &Application) -> result::Result<(), Error> { + if app.process_context().is_none() { + info!("Removing {}", app); + npk::uninstall(app.container()) + .await + .map_err(Error::UninstallationError)?; + self.applications.remove(&app.manifest().name); + Ok(()) + } else { + warn!("Cannot uninstall running container {}", app); + Err(Error::ApplicationRunning(vec![app.manifest().name.clone()])) } } /// Handle the exit of a container. The restarting of containers is a subject /// to be removed and handled externally - #[allow(unused_mut)] pub async fn on_exit(&mut self, name: &str, return_code: i32) -> Result<()> { if let Some(app) = self.applications.get_mut(name) { - if let Some(mut context) = app.process.take() { + if let Some(context) = app.process.take() { info!( "Process {} exited after {:?} with code {} and termination reason {:?}", app, @@ -249,37 +320,39 @@ impl State { #[cfg(any(target_os = "android", target_os = "linux"))] { + let mut context = context; if let Some(cgroups) = context.cgroups.take() { log::debug!("Destroying cgroup configuration of {}", app); cgroups.destroy().await?; } } - - if let Some(OnExit::Restart(n)) = app.manifest().on_exit { - if context.incarnation < n { - info!( - "Restarting {} in incarnation {}", - app, - context.incarnation + 1 - ); - self.start(name, context.incarnation + 1).await?; - } - } } } Ok(()) } /// Handle out of memory conditions for container `name` - pub async fn on_oom(&mut self, name: &str) -> Result<()> { + pub async fn on_oom(&mut self, name: &str) -> result::Result<(), Error> { if let Some(app) = self.applications.get_mut(name) { - warn!("Process {} is out of memory. Stopping {}", app, app); - self.stop( - name, - time::Duration::from_secs(1), - TerminationReason::OutOfMemory, - ) - .await?; + if let Some(mut context) = app.process.take() { + warn!("Process {} is out of memory. Stopping {}", app, app); + let status = context + .process + .terminate(Duration::from_secs(1), Some(TerminationReason::OutOfMemory)) + .await + .map_err(Error::ProcessError)? + .await; + + #[cfg(any(target_os = "android", target_os = "linux"))] + { + if let Some(cgroups) = context.cgroups { + log::debug!("Destroying cgroup configuration of {}", app); + cgroups.destroy().await.map_err(Error::ProcessError)?; + } + } + + info!("Stopped {} {:?}", app, status); + } } Ok(()) } diff --git a/north/src/update.rs b/north/src/update.rs deleted file mode 100644 index b06e1f845..000000000 --- a/north/src/update.rs +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright (c) 2019 - 2020 ESRLabs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::{npk, State, TerminationReason}; -use anyhow::{Context, Result}; -use async_std::{fs, path::Path}; -use futures::stream::StreamExt; -use log::{debug, info}; -use north_common::manifest::{Manifest, Version}; -use std::{io::Read, str::FromStr, time}; - -const MANIFEST: &str = "manifest.yaml"; - -lazy_static::lazy_static! { - static ref RE: regex::Regex = regex::Regex::new( - format!( - r"^.*-{}-\d+\.\d+\.\d+\.npk$", - env!("VERGEN_TARGET_TRIPLE") - ) - .as_str(), - ) - .expect("Invalid regex"); -} - -pub async fn update(state: &mut State, dir: &Path) -> Result> { - let mut updates = Box::pin( - fs::read_dir(&dir) - .await - .with_context(|| format!("Failed to read {}", dir.display()))? - .filter_map(move |d| async move { d.ok() }) - .map(|d| d.path()) - .filter_map(move |d| async move { - if RE.is_match(&d.display().to_string()) { - Some(d) - } else { - None - } - }), - ); - - let mut result = vec![]; - - while let Some(update) = updates.next().await { - let file = std::fs::File::open(&update) - .with_context(|| format!("Failed to open {}", update.display()))?; - let reader = std::io::BufReader::new(file); - let mut archive = zip::ZipArchive::new(reader).context("Failed to read zip")?; - - debug!("Loading manifest from {}", update.display()); - let manifest = { - let mut manifest_file = archive - .by_name(MANIFEST) - .with_context(|| format!("Failed to read manifest from {}", update.display()))?; - let mut manifest = String::new(); - manifest_file.read_to_string(&mut manifest)?; - Manifest::from_str(&manifest)? - }; - drop(archive); - - let old_version = manifest.version; - let name = manifest.name; - - let is_installed = state.application(&name).is_some(); - let is_started = state - .application(&name) - .map(|a| a.process_context().map(|_| true).unwrap_or_default()) - .unwrap_or_default(); - - if is_installed { - if is_started { - info!("Update: Stopping {}", name); - state - .stop( - &name, - time::Duration::from_secs(10), - TerminationReason::Stopped, - ) - .await?; - } - - info!("Update: Uninstalling {}", name); - state.uninstall(&name).await?; - } - - info!("Update: Installing {}", update.display()); - let installed = npk::install(state, &update).await?; - let new_version: Version = installed.1; - - if is_started { - state.start(&name, 0).await?; - } - - result.push((name, (old_version, new_version))); - } - - Ok(result) -} diff --git a/north_common/Cargo.toml b/north_common/Cargo.toml index f71169e7d..8aab16b3a 100644 --- a/north_common/Cargo.toml +++ b/north_common/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1.0.31" async-std = { version = "1.6.2", features = ["attributes", "unstable"] } semver = "0.10.0" serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.56" +serde_json = "1.0.57" serde_yaml = "0.8.13" [dev-dependencies] diff --git a/north_common/README.md b/north_common/README.md new file mode 100644 index 000000000..514859c21 --- /dev/null +++ b/north_common/README.md @@ -0,0 +1,232 @@ +# Northstar Common Functionality + +## Northstar Runtime API + +Northstar is a container runtime, not an application modle. It is highly application specific how components running in different northstar containers should behave. +Still, the runtime is in charge of starting/stoping/controlling the containers at runtime. + +To make this functionality available to outside components, we offer a socket based API. Applications that want to control the runtime can issues commands to the runtime and receive status information in response. + +Requests to the runtime and responses from the runtime are encoded as json objects. + +### List installed packages + +**REQUEST**: + +```.json +{ + "request":"ListContainers", + "request_id":3 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "ContainerList":[ + { + "name":"cpueater", + "version":"0.0.1", + "pid":null, + "container_type":"APP" + }, + { + "name":"crashing", + "version":"0.0.1", + "pid":null, + "container_type":"APP" + }, + ... + ] + }, + "response_id":3 +} +``` + +### Start a container + +**REQUEST**: + +```.json +{ + "request":{ + "StartContainer":"hello" + }, + "request_id":4 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "StartedList":[ + { + "id":"hello", + "success":"Ok", + "duration_in_ns":1845816 + } + ] + }, + "response_id":4 +} +``` + +### List running containers + +**REQUEST**: + +```.json +{ + "request":{ + "ListProcesses":"hello" + }, + "request_id":5 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "ProcessList":[ + { + "name":"hello", + "version":"0.0.2", + "pid":330106, + "mem_info":{ + "size":3043328, + "resident":368640, + "shared":299008, + "text":204800, + "data":364544 + }, + "uptime_in_us":2354498 + } + ] + }, + "response_id":5 +} +``` + +### Stop one or all running containers + +**REQUEST**: + +```.json +{ + "request":{ + "StopContainer":null + }, + "request_id":6 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "StoppedList":[ + { + "id":"hello", + "success":"Ok", + "duration_in_ns":732681 + } + ] + }, + "response_id":6 +} +``` + +### Uninstall a package + +**REQUEST**: + +```.json +{ + "request":{ + "UninstallContainer":"hello" + }, + "request_id":7 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "Uninstall":[ + { + "name":"hello", + "uninstalled_version":"0.0.2", + "result":"OK" + } + ] + }, + "response_id":7 +} +``` + +### List installed packages with versions + +**REQUEST**: + +```.json +{ + "request": "GetVersions", + "request_id":9 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "Versions":[ + { + "name":"hello_message", + "version":"0.1.2", + "architecture":"x86_64-unknown-linux-gnu" + }, + { + "name":"cpueater", + "version":"0.0.1", + "architecture":"x86_64-unknown-linux-gnu" + }, + ... + ] + }, + "response_id":9 +} +``` + +### Shutdown the north runtime + +**REQUEST**: + +```.json +{ + "request": "Shutdown", + "request_id":10 +} +``` + +**RESPONSE**: + +```.json +{ + "response":{ + "StoppedList":[] + }, + "response_id":10 +} +``` + +To see an example of how to use the json-api, take a look at the `nstar` implementation. diff --git a/north_common/src/api.rs b/north_common/src/api.rs new file mode 100644 index 000000000..c2aa37a96 --- /dev/null +++ b/north_common/src/api.rs @@ -0,0 +1,96 @@ +use serde::{Deserialize, Serialize}; + +use crate::manifest::{Manifest, Version}; + +type Name = String; +type MessageId = String; // UUID + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Message { + pub id: MessageId, // used to match response with a request + pub payload: Payload, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum Payload { + Request(Request), + Response(Response), + Notification(Notification), +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Notification { + // TODO +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum Request { + Containers, + Start(Name), + Stop(Name), + Uninstall { name: Name, version: Version }, + Install(String), // path to npk with new version + Shutdown, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Container { + pub manifest: Manifest, + pub process: Option, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Process { + pub pid: u32, + pub uptime: u64, + pub memory: Option, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub struct Memory { + pub size: u64, + pub resident: u64, + pub shared: u64, + pub text: u64, + pub data: u64, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum Response { + Containers(Vec), + Start { result: StartResult }, + Stop { result: StopResult }, + Uninstall { result: UninstallResult }, + Install { result: InstallationResult }, + Shutdown { result: ShutdownResult }, +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum StartResult { + Success, + Error(String), +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum StopResult { + Success, + Error(String), // TODO +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum UninstallResult { + Success, + Error(String), // TODO +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum InstallationResult { + Success, + Error(String), // TODO +} + +#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)] +pub enum ShutdownResult { + Success, + Error(String), // TODO +} diff --git a/north_common/src/lib.rs b/north_common/src/lib.rs index bfd298435..010150aae 100644 --- a/north_common/src/lib.rs +++ b/north_common/src/lib.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod api; pub mod manifest; diff --git a/nstar/Cargo.toml b/nstar/Cargo.toml new file mode 100644 index 000000000..5ce38c69c --- /dev/null +++ b/nstar/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "nstar" +version = "0.1.0" +authors = ["ESRLabs"] +edition = "2018" + +[dependencies] +ansi_term = "0.12.1" +anyhow = "1.0.32" +byteorder = "1.3.4" +clap = "2.33.3" +directories = "3.0.1" +env_logger = "0.7.1" +itertools = "0.9.0" +log = "0.4.11" +north_common = { path = "../north_common" } +prettytable-rs = "0.8.0" +rustyline = "6.2.0" +rustyline-derive = "0.3.1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.57" +strsim = "0.10.0" +structopt = "0.3.17" +uuid = { version = "0.8.1", features = ["v4"] } diff --git a/nstar/src/main.rs b/nstar/src/main.rs new file mode 100644 index 000000000..9d3944c62 --- /dev/null +++ b/nstar/src/main.rs @@ -0,0 +1,398 @@ +// Copyright (c) 2019 - 2020 ESRLabs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ansi_term::Color; +use anyhow::{anyhow, Context, Result}; +use api::{ + InstallationResult, Message, Payload, Request, Response, ShutdownResult, StartResult, + StopResult, UninstallResult, +}; +use byteorder::{BigEndian, ByteOrder}; +use itertools::Itertools; +use log::{info, warn}; +use net::TcpStream; +use north_common::api; +use prettytable::{format, Attr, Cell, Row, Table}; +use rustyline::{ + completion::{Completer, FilenameCompleter, Pair}, + config::OutputStreamType, + error::ReadlineError, + highlight::{Highlighter, MatchingBracketHighlighter}, + hint::{Hinter, HistoryHinter}, + Cmd, CompletionType, Config, EditMode, Editor, Helper, KeyPress, +}; +use rustyline_derive::Validator; +use std::{ + borrow::Cow::{self, Owned}, + env, fs, + io::{Read, Write}, + net::{self}, + time::Duration, +}; +use structopt::StructOpt; + +static PROMPT: &str = ">> "; + +#[derive(Debug, StructOpt)] +#[structopt(name = "nstar", about = "Northstar CLI")] +struct Opt { + /// File that contains the north configuration + #[structopt(short, long, default_value = "localhost:4200")] + pub host: String, + + /// Run in verbose mode + #[structopt(short, long)] + verbose: bool, + + /// Disable history + #[structopt(short, long)] + disable_history: bool, + + /// Print raw json payload + #[structopt(short, long)] + json: bool, + + /// Run command and exit + cmd: Option, +} + +#[derive(Validator)] +struct NstarHelper { + file_name: FilenameCompleter, + brackets_highliter: MatchingBracketHighlighter, + history_hinter: HistoryHinter, +} + +impl Completer for NstarHelper { + type Candidate = Pair; + + fn complete( + &self, + line: &str, + pos: usize, + ctx: &rustyline::Context<'_>, + ) -> Result<(usize, Vec), ReadlineError> { + self.file_name.complete(line, pos, ctx) + } +} + +impl Hinter for NstarHelper { + fn hint(&self, line: &str, pos: usize, ctx: &rustyline::Context<'_>) -> Option { + let hint = command_hint(line, pos, ctx); + if hint.is_some() { + hint + } else { + self.history_hinter.hint(line, pos, ctx) + } + } +} + +impl Highlighter for NstarHelper { + fn highlight_prompt<'b, 's: 'b, 'p: 'b>( + &'s self, + prompt: &'p str, + _default: bool, + ) -> Cow<'b, str> { + if prompt == PROMPT { + Owned(Color::Green.bold().paint(PROMPT).to_string()) + } else { + PROMPT.into() + } + } + + fn highlight_hint<'h>(&self, hint: &'h str) -> Cow<'h, str> { + Owned(Color::Fixed(240).paint(hint).to_string()) + } + + fn highlight<'l>(&self, line: &'l str, pos: usize) -> Cow<'l, str> { + self.brackets_highliter.highlight(line, pos) + } + + fn highlight_char(&self, line: &str, pos: usize) -> bool { + self.brackets_highliter.highlight_char(line, pos) + } +} + +impl Helper for NstarHelper {} + +fn command_hint(line: &str, pos: usize, _ctx: &rustyline::Context<'_>) -> Option { + did_you_mean(&line, &["containers", "shutdown", "start", "stop"]).and_then(|s| { + if s.len() > pos { + Some(s[pos..].into()) + } else { + None + } + }) +} + +fn did_you_mean<'a, T: ?Sized, I>(v: &str, possible_values: I) -> Option<&'a str> +where + T: AsRef + 'a, + I: IntoIterator, +{ + let mut candidate: Option<(f64, &str)> = None; + for pv in possible_values { + let confidence = strsim::jaro_winkler(v, pv.as_ref()); + if confidence > 0.8 && (candidate.is_none() || (candidate.as_ref().unwrap().0 < confidence)) + { + candidate = Some((confidence, pv.as_ref())); + } + } + match candidate { + None => None, + Some((_, candidate)) => Some(candidate), + } +} + +fn run(mut stream: S, req: Request) -> Result { + // Send request + let request_msg = Message { + id: uuid::Uuid::new_v4().to_string(), + payload: Payload::Request(req), + }; + let request = serde_json::to_string(&request_msg).context("Failed to serialize")?; + let mut buf = [0u8; 4]; + BigEndian::write_u32(&mut buf, request.as_bytes().len() as u32); + stream + .write_all(&buf) + .context("Failed to write to stream")?; + stream + .write_all(request.as_bytes()) + .context("Failed to write to stream")?; + + // Receive reply + let mut buffer = [0u8; 4]; + stream + .read_exact(&mut buffer) + .context("Failed to read frame length")?; + let frame_len = BigEndian::read_u32(&buffer) as usize; + let mut buffer = vec![0; frame_len]; + stream + .read_exact(&mut buffer) + .context("Failed to read frame")?; + + // Deserialize message + let message: Message = serde_json::from_slice(&buffer).context("Failed to parse reply")?; + + match message.payload { + Payload::Request(_) => Err(anyhow!("Invalid response")), + Payload::Response(r) => Ok(r), + Payload::Notification(_) => Err(anyhow!("Invalid response")), + } +} + +fn help() -> String { + r" +containers: List installed containers +shutdown: Stop the northstar runtime +start : Start application +stop : Stop application" + .into() +} + +fn run_cmd(cmd: &str, stream: S) -> Result> { + let mut cmd = cmd.trim().split_whitespace(); + let c = cmd.next().ok_or_else(|| anyhow!("Invalid command"))?; + + let response = match c { + "containers" => Some(run(stream, Request::Containers)?), + "shutdown" => Some(run(stream, Request::Shutdown)?), + "start" => { + if let Some(name) = cmd.next() { + Some(run(stream, Request::Start(name.into()))?) + } else { + None + } + } + "stop" => { + if let Some(name) = cmd.next() { + Some(run(stream, Request::Stop(name.into()))?) + } else { + None + } + } + _ => None, + }; + Ok(response) +} + +fn main() -> Result<()> { + let opt = Opt::from_args(); + + if opt.verbose { + env::set_var("RUST_LOG", "nstar=debug"); + env_logger::init(); + info!("Verbose mode is enabled"); + } else { + env::set_var("RUST_LOG", "nstar=warn"); + env_logger::init(); + } + + let stream = TcpStream::connect(&opt.host) + .with_context(|| format!("Failed to connect to {}", opt.host))?; + + if let Some(cmd) = opt.cmd { + if let Some(response) = run_cmd(cmd.trim(), &stream)? { + if opt.json { + println!("{}", serde_json::to_string_pretty(&response)?); + } else { + print_response(&response); + } + } else { + eprintln!("Invalid cmd \"{}\"", cmd); + } + Ok(()) + } else { + let config = Config::builder() + .auto_add_history(false) + .completion_type(CompletionType::List) + .edit_mode(EditMode::Vi) + .history_ignore_space(true) + .history_ignore_dups(true) + .output_stream(OutputStreamType::Stdout) + .max_history_size(1000) + .build(); + let h = NstarHelper { + file_name: FilenameCompleter::new(), + brackets_highliter: MatchingBracketHighlighter::new(), + history_hinter: HistoryHinter {}, + }; + let mut rl = Editor::with_config(config); + rl.set_helper(Some(h)); + rl.bind_sequence(KeyPress::Tab, Cmd::CompleteHint); + rl.bind_sequence(KeyPress::Ctrl('L'), Cmd::ClearScreen); + + let history = if !opt.disable_history { + let history = directories::ProjectDirs::from("com", "esrlabs", "nstar") + .map(|d| d.config_dir().join("history")) + .ok_or_else(|| anyhow!("Failed to get config directory"))?; + if history.exists() { + info!("Loading history from {:?}", history); + rl.load_history(&history) + .context("Failed to load history")?; + } + Some(history) + } else { + info!("History is disabled"); + None + }; + + // Prompt loop + loop { + match rl.readline(PROMPT) { + Ok(line) => { + if line.trim() == "help" { + println!("{}", help()); + } else if let Some(response) = run_cmd(&line, &stream)? { + rl.add_history_entry(line); + if !opt.disable_history { + if let Some(ref history) = history { + if let Some(parent) = history.parent() { + if !parent.exists() { + info!("Creating nstar config dir {:?}", parent); + fs::create_dir_all(parent).with_context(|| { + format!("Failed to create {}", parent.display()) + })?; + } + info!("Saving history to {:?}", history); + rl.save_history(history)?; + } + } + } + print_response(&response); + } else { + println!("Invalid command: {}", line); + } + } + Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => break, + Err(err) => { + warn!("Error: {}", err); + break; + } + } + } + Ok(()) + } +} + +// TODO: This can be done smarter +fn print_response(response: &Response) { + match response { + Response::Containers(containers) => { + let mut table = Table::new(); + table.set_format(*format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR); + table.set_titles(Row::new(vec![ + Cell::new("Name").with_style(Attr::Bold), + Cell::new("Version").with_style(Attr::Bold), + Cell::new("Type").with_style(Attr::Bold), + Cell::new("PID").with_style(Attr::Bold), + Cell::new("Uptime").with_style(Attr::Bold), + ])); + for container in containers + .iter() + .sorted_by_key(|c| &c.manifest.name) // Sort by name + .sorted_by_key(|c| c.manifest.init.is_none()) + { + table.add_row(Row::new(vec![ + Cell::new(&container.manifest.name).with_style(Attr::Bold), + Cell::new(&container.manifest.version.to_string()), + Cell::new( + container + .manifest + .init + .as_ref() + .map(|_| "App") + .unwrap_or("Resource"), + ), + Cell::new( + &container + .process + .as_ref() + .map(|p| p.pid.to_string()) + .unwrap_or_default(), + ) + .with_style(Attr::ForegroundColor(prettytable::color::GREEN)), + Cell::new( + &container + .process + .as_ref() + .map(|p| format!("{:?}", Duration::from_nanos(p.uptime))) + .unwrap_or_default(), + ), + ])); + } + table.printstd(); + } + Response::Start { result } => match result { + StartResult::Success => println!("Success"), + StartResult::Error(e) => println!("Failed: {}", e), + }, + Response::Stop { result } => match result { + StopResult::Success => println!("Success"), + StopResult::Error(e) => println!("Failed: {}", e), + }, + Response::Uninstall { result } => match result { + UninstallResult::Success => println!("Success"), + UninstallResult::Error(e) => println!("Failed: {}", e), + }, + Response::Install { result } => match result { + InstallationResult::Success => println!("Success"), + InstallationResult::Error(e) => println!("Failed: {}", e), + }, + Response::Shutdown { result } => match result { + ShutdownResult::Success => println!("Success"), + ShutdownResult::Error(e) => println!("Failed: {}", e), + }, + } +} diff --git a/rakefile.rb b/rakefile.rb index c5af8b470..bd69d0e77 100644 --- a/rakefile.rb +++ b/rakefile.rb @@ -72,7 +72,7 @@ def supported_targets desc 'Setup build environment' task :setup_environment do sh 'bundle install' - sh 'cargo install --path dcon' + sh 'cargo install --path nstar' sh 'cargo install --version 0.2.0 cross' require 'os' if OS.linux? @@ -93,15 +93,15 @@ def supported_targets sh 'cargo build --release --bin north' end - desc 'Build dcon client' - task :dcon do - sh 'cargo build --release --bin dcon' - end - desc 'Build everything' task :all do sh 'cargo build --release' end + + desc 'Build everything debug' + task :all_debug do + sh 'cargo build' + end end def targets @@ -174,9 +174,14 @@ def examples end end - desc 'Execute runtime with examples (use with sudo)' + desc 'Execute runtime with examples (use with sudo on linux)' task run: 'build:north' do - sh 'sudo ./target/release/north --config north.toml' + require 'os' + if OS.mac? + sh './target/release/north --config north.toml' + else + sh 'sudo ./target/release/north --config north.toml' + end end desc 'Clean example registry' @@ -256,6 +261,7 @@ def table(col_labels, arr) task :check do check_program('docker info >/dev/null', 'Docker is needed to run the check task') sh 'cargo +nightly fmt -- --color=always --check' + sh 'cargo +nightly clippy' targets.each do |target| sh "cross check --target #{target}"