diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 82e9afde..ed1d1768 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,5 +1,5 @@ # DevContainer image -FROM rust:1.85-slim +FROM rust:1.86-slim RUN \ adduser --system --disabled-password --shell /bin/bash --home /home/vscode vscode && \ # install docker diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 2421b296..854ba2b1 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -11,7 +11,7 @@ jobs: - name: Install Rust + components uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: 1.85 + toolchain: 1.86 components: rustfmt,clippy - name: Install Rust code coverage uses: taiki-e/install-action@cargo-llvm-cov diff --git a/Cargo.toml b/Cargo.toml index 02b31214..278c3bd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ missing_asserts_for_indexing = { level = "allow", priority = 127 } # missing missing_docs_in_private_items = { level = "allow", priority = 127 } # missing docs on private ok missing_inline_in_public_items = { level = "allow", priority = 127 } # let rust compiler determine best inline logic missing_trait_methods = { level = "allow", priority = 127 } # allow in favor of rustc `implement the missing item` +multiple_inherent_impl = { level = "allow", priority = 127 } # required in best practice to limit exposure over UniFFI must_use_candidate = { level = "allow", priority = 127 } # omitting #[must_use] ok mod_module_files = { level = "allow", priority = 127 } # mod directories ok non_ascii_literal = { level = "allow", priority = 127 } # non-ascii char in string literal ok diff --git a/cspell.json b/cspell.json index bee1a6fa..483caed0 100644 --- a/cspell.json +++ b/cspell.json @@ -14,6 +14,8 @@ "nanocore", "numpy", "graphviz", + "uniffi", + "cffi" ], "ignoreWords": [ "relpath", diff --git a/src/crypto.rs b/src/core/crypto.rs similarity index 92% rename from src/crypto.rs rename to src/core/crypto.rs index 3e8883c2..453c385f 100644 --- a/src/crypto.rs +++ b/src/core/crypto.rs @@ -1,7 +1,9 @@ use crate::{ - error::Result, - model::{Blob, BlobKind}, - util::get, + core::util::get, + uniffi::{ + error::Result, + model::{Blob, BlobKind}, + }, }; use serde_yaml; use sha2::{Digest as _, Sha256}; @@ -21,7 +23,7 @@ use std::{ clippy::indexing_slicing, reason = "Reading less than 0 is impossible." )] -pub fn hash_stream(stream: &mut impl Read) -> Result { +pub(crate) fn hash_stream(stream: &mut impl Read) -> Result { const BUFFER_SIZE: usize = 8 << 10; // 8KB chunks to match with page size typically found let mut hash = Sha256::new(); @@ -79,7 +81,7 @@ pub fn hash_dir(dirpath: impl AsRef) -> Result { /// # Errors /// /// Will return error if hashing fails on file or directory. -pub fn hash_blob( +pub(crate) fn hash_blob( namespace_lookup: &HashMap, blob: Blob, ) -> Result { diff --git a/src/core/error.rs b/src/core/error.rs new file mode 100644 index 00000000..a644bae5 --- /dev/null +++ b/src/core/error.rs @@ -0,0 +1,62 @@ +use crate::uniffi::error::{Kind, OrcaError}; +use bollard::errors::Error as BollardError; +use glob; +use serde_json; +use serde_yaml; +use std::{ + fmt::{self, Display, Formatter}, + io, path, +}; + +impl Display for OrcaError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.kind) + } +} +impl From for OrcaError { + fn from(error: BollardError) -> Self { + Self { + kind: Kind::BollardError(error), + } + } +} +impl From for OrcaError { + fn from(error: glob::PatternError) -> Self { + Self { + kind: Kind::GlobPatternError(error), + } + } +} +impl From for OrcaError { + fn from(error: io::Error) -> Self { + Self { + kind: Kind::IoError(error), + } + } +} +impl From for OrcaError { + fn from(error: path::StripPrefixError) -> Self { + Self { + kind: Kind::PathPrefixError(error), + } + } +} +impl From for OrcaError { + fn from(error: serde_json::Error) -> Self { + Self { + kind: Kind::SerdeJsonError(error), + } + } +} +impl From for OrcaError { + fn from(error: serde_yaml::Error) -> Self { + Self { + kind: Kind::SerdeYamlError(error), + } + } +} +impl From for OrcaError { + fn from(kind: Kind) -> Self { + Self { kind } + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 00000000..3d58aee8 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,8 @@ +/// State change verification via cryptographic utilities. +pub mod crypto; +pub(crate) mod error; +/// Components of the data model. +pub mod model; +pub(crate) mod orchestrator; +pub(crate) mod store; +pub(crate) mod util; diff --git a/src/core/model.rs b/src/core/model.rs new file mode 100644 index 00000000..5e67c1ee --- /dev/null +++ b/src/core/model.rs @@ -0,0 +1,90 @@ +use crate::{ + core::util::get_type_name, + uniffi::{ + error::Result, + model::{Pod, PodJob}, + }, +}; +use heck::ToSnakeCase as _; +use serde::{Deserialize as _, Deserializer, Serialize, Serializer}; +use serde_yaml; +use std::{ + collections::{BTreeMap, HashMap}, + result, +}; +/// Converts a model instance into a consistent yaml. +/// +/// # Errors +/// +/// Will return `Err` if there is an issue converting an `instance` into YAML (w/o annotation). +pub fn to_yaml(instance: &T) -> Result { + let mut yaml = serde_yaml::to_string(instance)?; + yaml.insert_str( + 0, + &format!("class: {}\n", get_type_name::().to_snake_case()), + ); // replace class at top + + Ok(yaml) +} + +pub(crate) fn serialize_hashmap( + map: &HashMap, + serializer: S, +) -> result::Result +where + S: Serializer, +{ + let sorted = map.iter().collect::>(); + sorted.serialize(serializer) +} + +#[expect(clippy::ref_option, reason = "Serde requires this signature.")] +pub(crate) fn serialize_hashmap_option( + map_option: &Option>, + serializer: S, +) -> result::Result +where + S: Serializer, +{ + let sorted = map_option + .as_ref() + .map(|map| map.iter().collect::>()); + sorted.serialize(serializer) +} + +pub(crate) fn serialize_pod(pod: &Pod, serializer: S) -> result::Result +where + S: Serializer, +{ + serializer.serialize_str(&pod.hash) +} + +pub(crate) fn deserialize_pod<'de, D>(deserializer: D) -> result::Result +where + D: Deserializer<'de>, +{ + Ok(Pod { + hash: String::deserialize(deserializer)?, + ..Pod::default() + }) +} + +pub(crate) fn serialize_pod_job( + pod_job: &PodJob, + serializer: S, +) -> result::Result +where + S: Serializer, +{ + serializer.serialize_str(&pod_job.hash) +} + +pub(crate) fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result +where + D: Deserializer<'de>, +{ + Ok(PodJob { + hash: String::deserialize(deserializer)?, + ..PodJob::default() + }) +} diff --git a/src/orchestrator/docker.rs b/src/core/orchestrator/docker.rs similarity index 56% rename from src/orchestrator/docker.rs rename to src/core/orchestrator/docker.rs index cfad79ca..46ec04ac 100644 --- a/src/orchestrator/docker.rs +++ b/src/core/orchestrator/docker.rs @@ -1,23 +1,17 @@ use crate::{ - error::{Kind, OrcaError, Result}, - model::{Input, Pod, PodJob, PodResult}, - orchestrator::{ImageKind, Orchestrator, PodRun, RunInfo, Status}, - util::get, + core::util::get, + uniffi::{ + error::{Kind, OrcaError, Result}, + model::{Input, PodJob}, + orchestrator::{RunInfo, Status, docker::LocalDockerOrchestrator}, + }, }; use bollard::{ - Docker, - container::{ - Config, CreateContainerOptions, ListContainersOptions, RemoveContainerOptions, - StartContainerOptions, WaitContainerOptions, - }, - image::{CreateImageOptions, ImportImageOptions}, + container::{Config, CreateContainerOptions, ListContainersOptions}, models::{ContainerStateStatusEnum, HostConfig}, }; use chrono::DateTime; -use futures_util::{ - future::join_all, - stream::{StreamExt as _, TryStreamExt as _}, -}; +use futures_util::future::join_all; use names::{Generator, Name}; use regex::Regex; use std::{ @@ -26,205 +20,9 @@ use std::{ path::{self, PathBuf}, sync::LazyLock, }; -use tokio::{fs::File, runtime::Runtime}; -use tokio_util::{ - bytes::{Bytes, BytesMut}, - codec::{BytesCodec, FramedRead}, -}; - -/// Support for an orchestration engine using a local docker installation. -#[derive(Debug)] -pub struct LocalDockerOrchestrator { - api: Docker, - async_driver: Runtime, -} - -impl Orchestrator for LocalDockerOrchestrator { - fn start_with_altimage_blocking( - &self, - namespace_lookup: &HashMap, - pod_job: &PodJob, - image: &ImageKind, - ) -> Result { - self.async_driver - .block_on(self.start_with_altimage(namespace_lookup, pod_job, image)) - } - fn start_blocking( - &self, - namespace_lookup: &HashMap, - pod_job: &PodJob, - ) -> Result { - self.async_driver - .block_on(self.start(namespace_lookup, pod_job)) - } - fn list_blocking(&self) -> Result> { - self.async_driver.block_on(self.list()) - } - fn delete_blocking(&self, pod_run: &PodRun) -> Result<()> { - self.async_driver.block_on(self.delete(pod_run)) - } - fn get_info_blocking(&self, pod_run: &PodRun) -> Result { - self.async_driver.block_on(self.get_info(pod_run)) - } - fn get_result_blocking(&self, pod_run: &PodRun) -> Result { - self.async_driver.block_on(self.get_result(pod_run)) - } - #[expect( - clippy::try_err, - reason = r#" - - `map_err` workaround needed since `import_image_stream` requires resolved bytes - - Raising an error manually on occurrence to halt so we don't just ignore - - Should not get as far as `Ok(_)` - "# - )] - async fn start_with_altimage( - &self, - namespace_lookup: &HashMap, - pod_job: &PodJob, - image: &ImageKind, - ) -> Result { - let (assigned_name, container_options, container_config) = match image { - ImageKind::Published(remote_image) => Self::prepare_container_start_inputs( - namespace_lookup, - pod_job, - remote_image.clone(), - )?, - ImageKind::Tarball(image_info) => { - let location = namespace_lookup[&image_info.namespace].join(&image_info.path); - let byte_stream = FramedRead::new(File::open(&location).await?, BytesCodec::new()) - .map_err(|err| -> Result { - let resolved_error = Err::(err.into())?; - Ok(resolved_error) - }) - .map(|result| result.ok().map_or(Bytes::new(), BytesMut::freeze)); - let mut stream = - self.api - .import_image_stream(ImportImageOptions::default(), byte_stream, None); - let mut local_image = String::new(); - while let Some(response) = stream.next().await { - local_image = RE_IMAGE_TAG - .captures_iter(&response?.stream.ok_or(OrcaError::from( - Kind::EmptyResponseWhenLoadingContainerAltImage { - path: location.clone(), - }, - ))?) - .find_map(|x| x.name("image").map(|name| name.as_str().to_owned())) - .ok_or(OrcaError::from(Kind::NoTagFoundInContainerAltImage { - path: location.clone(), - }))?; - } - Self::prepare_container_start_inputs( - namespace_lookup, - pod_job, - local_image.clone(), - )? - } - }; - self.api - .create_container(container_options, container_config) - .await?; - self.api - .start_container(&assigned_name, None::>) - .await?; - Ok(PodRun::new::(pod_job, assigned_name)) - } - async fn start( - &self, - namespace_lookup: &HashMap, - pod_job: &PodJob, - ) -> Result { - let image_options = Some(CreateImageOptions { - from_image: pod_job.pod.image.clone(), - ..Default::default() - }); - self.api - .create_image(image_options, None, None) - .try_collect::>() - .await?; - self.start_with_altimage( - namespace_lookup, - pod_job, - &ImageKind::Published(pod_job.pod.image.clone()), - ) - .await - } - async fn list(&self) -> Result> { - self.list_containers(HashMap::from([( - "label".to_owned(), - vec!["org.orcapod=true".to_owned()], - )])) - .await? - .map(|(assigned_name, run_info)| { - let mut pod: Pod = serde_json::from_str(get(&run_info.labels, "org.orcapod.pod")?)?; - pod.annotation = - serde_json::from_str(get(&run_info.labels, "org.orcapod.pod.annotation")?)?; - pod.hash - .clone_from(get(&run_info.labels, "org.orcapod.pod.hash")?); - let mut pod_job: PodJob = - serde_json::from_str(get(&run_info.labels, "org.orcapod.pod_job")?)?; - pod_job.annotation = - serde_json::from_str(get(&run_info.labels, "org.orcapod.pod_job.annotation")?)?; - pod_job - .hash - .clone_from(get(&run_info.labels, "org.orcapod.pod_job.hash")?); - pod_job.pod = pod; - Ok(PodRun::new::(&pod_job, assigned_name)) - }) - .collect() - } - async fn delete(&self, pod_run: &PodRun) -> Result<()> { - self.api - .remove_container( - &pod_run.assigned_name, - Some(RemoveContainerOptions { - force: true, - ..Default::default() - }), - ) - .await?; - Ok(()) - } - async fn get_info(&self, pod_run: &PodRun) -> Result { - let labels = vec![ - "org.orcapod=true".to_owned(), - format!( - "org.orcapod.pod_job.annotation={}", - serde_json::to_string(&pod_run.pod_job.annotation)? - ), - format!("org.orcapod.pod_job.hash={}", pod_run.pod_job.hash), - ]; - let (_, run_info) = self - .list_containers(HashMap::from([("label".to_owned(), labels)])) - .await? - .next() - .ok_or(OrcaError::from(Kind::NoMatchingPodRun { - pod_job_hash: pod_run.pod_job.hash.clone(), - }))?; - Ok(run_info) - } - async fn get_result(&self, pod_run: &PodRun) -> Result { - self.api - .wait_container(&pod_run.assigned_name, None::>) - .try_collect::>() - .await?; - let result_info = self.get_info(pod_run).await?; - PodResult::new( - None, - pod_run.pod_job.clone(), - pod_run.assigned_name.clone(), - result_info.status, - result_info.created, - result_info.terminated.ok_or(OrcaError::from( - Kind::InvalidPodResultTerminatedDatetime { - pod_job_hash: pod_run.pod_job.hash.clone(), - }, - ))?, - ) - } -} #[expect(clippy::expect_used, reason = "Valid static regex")] -static RE_IMAGE_TAG: LazyLock = LazyLock::new(|| { +pub static RE_IMAGE_TAG: LazyLock = LazyLock::new(|| { Regex::new( r"(?x) \s @@ -236,18 +34,6 @@ static RE_IMAGE_TAG: LazyLock = LazyLock::new(|| { }); impl LocalDockerOrchestrator { - /// How to create a local docker orchestrator with an absolute path on docker host where binds - /// will be mounted from. - /// - /// # Errors - /// - /// Will return `Err` if there is an issue creating a local docker orchestrator. - pub fn new() -> Result { - Ok(Self { - api: Docker::connect_with_local_defaults()?, - async_driver: Runtime::new()?, - }) - } fn prepare_mount_binds( namespace_lookup: &HashMap, pod_job: &PodJob, @@ -321,7 +107,7 @@ impl LocalDockerOrchestrator { - Pod commands will always have at least 1 element "# )] - fn prepare_container_start_inputs( + pub(crate) fn prepare_container_start_inputs( namespace_lookup: &HashMap, pod_job: &PodJob, image: String, @@ -404,7 +190,7 @@ impl LocalDockerOrchestrator { - Containers will always have at least 1 name with at least 2 characters "# )] - async fn list_containers( + pub(crate) async fn list_containers( &self, filters: HashMap>, // https://docs.rs/bollard/latest/bollard/container/struct.ListContainersOptions.html#structfield.filters ) -> Result> { diff --git a/src/core/orchestrator/mod.rs b/src/core/orchestrator/mod.rs new file mode 100644 index 00000000..1c62d47a --- /dev/null +++ b/src/core/orchestrator/mod.rs @@ -0,0 +1,19 @@ +use crate::{ + core::util::get_type_name, + uniffi::{ + model::PodJob, + orchestrator::{Orchestrator, PodRun}, + }, +}; + +impl PodRun { + pub(crate) fn new(pod_job: &PodJob, assigned_name: String) -> Self { + Self { + pod_job: pod_job.clone(), + orchestrator_source: get_type_name::(), + assigned_name, + } + } +} + +pub mod docker; diff --git a/src/store/filestore.rs b/src/core/store/filestore.rs similarity index 65% rename from src/store/filestore.rs rename to src/core/store/filestore.rs index 352065c3..d0bded42 100644 --- a/src/store/filestore.rs +++ b/src/core/store/filestore.rs @@ -1,8 +1,10 @@ use crate::{ - error::{Kind, OrcaError, Result}, - model::{Annotation, Pod, PodJob, PodResult, to_yaml}, - store::{ModelID, ModelInfo, Store}, - util::get_type_name, + core::{model::to_yaml, util::get_type_name}, + uniffi::{ + error::{Kind, OrcaError, Result}, + model::Annotation, + store::{ModelID, ModelInfo, Store as _, filestore::LocalFileStore}, + }, }; use colored::Colorize as _; use glob::glob; @@ -15,72 +17,6 @@ use std::{ path::{Path, PathBuf}, sync::LazyLock, }; -/// Support for a storage backend on a local filesystem directory. -#[derive(Debug)] -pub struct LocalFileStore { - /// A local path to a directory where store will be located. - directory: PathBuf, -} - -impl Store for LocalFileStore { - fn save_pod(&self, pod: &Pod) -> Result<()> { - self.save_model(pod, &pod.hash, pod.annotation.as_ref()) - } - fn load_pod(&self, model_id: &ModelID) -> Result { - let (mut pod, annotation, hash) = self.load_model::(model_id)?; - pod.annotation = annotation; - pod.hash = hash; - Ok(pod) - } - fn list_pod(&self) -> Result> { - self.list_model::() - } - fn delete_pod(&self, model_id: &ModelID) -> Result<()> { - self.delete_model::(model_id) - } - fn save_pod_job(&self, pod_job: &PodJob) -> Result<()> { - self.save_pod(&pod_job.pod)?; - self.save_model(pod_job, &pod_job.hash, pod_job.annotation.as_ref()) - } - fn load_pod_job(&self, model_id: &ModelID) -> Result { - let (mut pod_job, annotation, hash) = self.load_model::(model_id)?; - pod_job.annotation = annotation; - pod_job.hash = hash; - pod_job.pod = self.load_pod(&ModelID::Hash(pod_job.pod.hash))?; - Ok(pod_job) - } - fn list_pod_job(&self) -> Result> { - self.list_model::() - } - fn delete_pod_job(&self, model_id: &ModelID) -> Result<()> { - self.delete_model::(model_id) - } - fn save_pod_result(&self, pod_result: &PodResult) -> Result<()> { - self.save_pod_job(&pod_result.pod_job)?; - self.save_model(pod_result, &pod_result.hash, pod_result.annotation.as_ref()) - } - fn load_pod_result(&self, model_id: &ModelID) -> Result { - let (mut pod_result, annotation, hash) = self.load_model::(model_id)?; - pod_result.annotation = annotation; - pod_result.hash = hash; - pod_result.pod_job = self.load_pod_job(&ModelID::Hash(pod_result.pod_job.hash))?; - Ok(pod_result) - } - fn list_pod_result(&self) -> Result> { - self.list_model::() - } - fn delete_pod_result(&self, model_id: &ModelID) -> Result<()> { - self.delete_model::(model_id) - } - fn delete_annotation(&self, name: &str, version: &str) -> Result<()> { - let hash = self.lookup_hash::(name, version)?; - let annotation_file = - self.make_path::(&hash, Self::make_annotation_relpath(name, version)); - fs::remove_file(&annotation_file)?; - - Ok(()) - } -} #[expect(clippy::expect_used, reason = "Valid static regex")] static RE_MODEL_METADATA: LazyLock = LazyLock::new(|| { @@ -139,18 +75,12 @@ impl LocalFileStore { }); Ok(paths) } - /// Get the directory where store is located. - pub fn get_directory(&self) -> &Path { - &self.directory - } - /// Construct a local file store instance in a specific directory. - pub fn new(directory: impl AsRef) -> Self { - Self { - directory: directory.as_ref().into(), - } - } - - fn lookup_hash(&self, name: &str, version: &str) -> Result { + /// Find hash using name and version. + /// + /// # Errors + /// + /// Will return error if unable to find. + pub(crate) fn lookup_hash(&self, name: &str, version: &str) -> Result { let model_info = Self::find_model_metadata( &self.make_path::("*", Self::make_annotation_relpath(name, version)), )? @@ -172,8 +102,12 @@ impl LocalFileStore { fs::write(file, content)?; Ok(()) } - - fn save_model( + /// How any model is stored. + /// + /// # Errors + /// + /// Will return `Err` if there is an issue storing the model. + pub(crate) fn save_model( &self, model: &T, hash: &str, @@ -220,8 +154,13 @@ impl LocalFileStore { } Ok(()) } - - fn load_model( + /// How to load any stored model into an instance. + /// + /// # Errors + /// + /// Will return `Err` if there is an issue loading the model from the store using `name` and + /// `version`. + pub(crate) fn load_model( &self, model_id: &ModelID, ) -> Result<(T, Option, String)> { @@ -247,12 +186,21 @@ impl LocalFileStore { } } } - - fn list_model(&self) -> Result> { + /// How to query any stored models. + /// + /// # Errors + /// + /// Will return `Err` if there is an issue querying metadata from existing models in the store. + pub(crate) fn list_model(&self) -> Result> { Ok(Self::find_model_metadata(&self.make_path::("**", "*"))?.collect()) } - - fn delete_model(&self, model_id: &ModelID) -> Result<()> { + /// How to explicitly delete any stored model and all associated annotations (does not propagate). + /// + /// # Errors + /// + /// Will return `Err` if there is an issue deleting a model from the store using `name` and + /// `version`. + pub(crate) fn delete_model(&self, model_id: &ModelID) -> Result<()> { // assumes propagate = false let hash = match model_id { ModelID::Hash(hash) => hash, diff --git a/src/core/store/mod.rs b/src/core/store/mod.rs new file mode 100644 index 00000000..fe5e8e56 --- /dev/null +++ b/src/core/store/mod.rs @@ -0,0 +1 @@ +pub mod filestore; diff --git a/src/util.rs b/src/core/util.rs similarity index 92% rename from src/util.rs rename to src/core/util.rs index 83d9c146..ae110ee2 100644 --- a/src/util.rs +++ b/src/core/util.rs @@ -1,4 +1,4 @@ -use crate::error::{Kind, Result}; +use crate::uniffi::error::{Kind, Result}; use std::{any::type_name, collections::HashMap}; #[expect( diff --git a/src/lib.rs b/src/lib.rs index 72d49eaf..e2d52d43 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,7 @@ //! Intuitive compute pipeline orchestration with reproducibility, performance, and scalability in //! mind. -/// State change verification via cryptographic utilities. -pub mod crypto; -/// Error handling based on enumeration. -pub mod error; -/// Components of the data model. -pub mod model; -/// Interface into container orchestration engine. -pub mod orchestrator; -/// Data persistence provided by a store backend. -pub mod store; -mod util; +/// Pure Rust source. +pub mod core; +/// Exposed `CFFI` client based on `UniFFI`. +pub mod uniffi; diff --git a/src/error.rs b/src/uniffi/error.rs similarity index 62% rename from src/error.rs rename to src/uniffi/error.rs index 7d9b54bf..7bb92d16 100644 --- a/src/error.rs +++ b/src/uniffi/error.rs @@ -1,11 +1,7 @@ use bollard::errors::Error as BollardError; -use glob; -use serde_json; -use serde_yaml; use std::{ - fmt::{self, Display, Formatter}, - io, path, - path::PathBuf, + io, + path::{self, PathBuf}, result, }; use thiserror::Error; @@ -52,9 +48,14 @@ pub(crate) enum Kind { SerdeYamlError(#[from] serde_yaml::Error), } /// A stable error API interface. +#[expect( + clippy::field_scoped_visibility_modifiers, + reason = "Allow access from `core::error`." +)] #[derive(Error, Debug)] pub struct OrcaError { - kind: Kind, + /// Type of error returned. + pub(crate) kind: Kind, } impl OrcaError { /// Returns `true` if the error was caused by an invalid model annotation. @@ -66,55 +67,3 @@ impl OrcaError { matches!(self.kind, Kind::NoMatchingPodRun { .. }) } } -impl Display for OrcaError { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.kind) - } -} -impl From for OrcaError { - fn from(error: BollardError) -> Self { - Self { - kind: Kind::BollardError(error), - } - } -} -impl From for OrcaError { - fn from(error: glob::PatternError) -> Self { - Self { - kind: Kind::GlobPatternError(error), - } - } -} -impl From for OrcaError { - fn from(error: io::Error) -> Self { - Self { - kind: Kind::IoError(error), - } - } -} -impl From for OrcaError { - fn from(error: path::StripPrefixError) -> Self { - Self { - kind: Kind::PathPrefixError(error), - } - } -} -impl From for OrcaError { - fn from(error: serde_json::Error) -> Self { - Self { - kind: Kind::SerdeJsonError(error), - } - } -} -impl From for OrcaError { - fn from(error: serde_yaml::Error) -> Self { - Self { - kind: Kind::SerdeYamlError(error), - } - } -} -impl From for OrcaError { - fn from(kind: Kind) -> Self { - Self { kind } - } -} diff --git a/src/uniffi/mod.rs b/src/uniffi/mod.rs new file mode 100644 index 00000000..e02fd6c9 --- /dev/null +++ b/src/uniffi/mod.rs @@ -0,0 +1,8 @@ +/// Error handling. +pub mod error; +/// Components of the data model. +pub mod model; +/// Interface into container orchestration engine. +pub mod orchestrator; +/// Data persistence provided by a store backend. +pub mod store; diff --git a/src/model.rs b/src/uniffi/model.rs similarity index 78% rename from src/model.rs rename to src/uniffi/model.rs index 6a4e09d3..9337cf66 100644 --- a/src/model.rs +++ b/src/uniffi/model.rs @@ -1,56 +1,15 @@ use crate::{ - crypto::{hash_blob, hash_buffer}, - error::Result, - orchestrator::Status, - util::get_type_name, + core::{ + crypto::{hash_blob, hash_buffer}, + model::{ + deserialize_pod, deserialize_pod_job, serialize_hashmap, serialize_hashmap_option, + serialize_pod, serialize_pod_job, to_yaml, + }, + }, + uniffi::{error::Result, orchestrator::Status}, }; -use heck::ToSnakeCase as _; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use serde_yaml; -use std::{ - collections::{BTreeMap, HashMap}, - path::PathBuf, - result, -}; -/// Converts a model instance into a consistent yaml. -/// -/// # Errors -/// -/// Will return `Err` if there is an issue converting an `instance` into YAML (w/o annotation). -pub fn to_yaml(instance: &T) -> Result { - let mut yaml = serde_yaml::to_string(instance)?; - yaml.insert_str( - 0, - &format!("class: {}\n", get_type_name::().to_snake_case()), - ); // replace class at top - - Ok(yaml) -} - -fn serialize_hashmap( - map: &HashMap, - serializer: S, -) -> result::Result -where - S: Serializer, -{ - let sorted = map.iter().collect::>(); - sorted.serialize(serializer) -} - -#[expect(clippy::ref_option, reason = "Serde requires this signature.")] -fn serialize_hashmap_option( - map_option: &Option>, - serializer: S, -) -> result::Result -where - S: Serializer, -{ - let sorted = map_option - .as_ref() - .map(|map| map.iter().collect::>()); - sorted.serialize(serializer) -} +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, path::PathBuf}; // --- core model structs --- @@ -72,12 +31,17 @@ pub struct Pod { pub input_stream: HashMap, /// Exposed, internal output directory. pub output_dir: PathBuf, + /// Exposed, internal output streams. #[serde(serialize_with = "serialize_hashmap")] - output_stream: HashMap, - source_commit_url: String, - recommended_cpus: f32, - recommended_memory: u64, - required_gpu: Option, + pub output_stream: HashMap, + /// Link to source associated with image binary. + pub source_commit_url: String, + /// Recommendation for CPU in fractional cores. + pub recommended_cpus: f32, + /// Recommendation for memory in bytes. + pub recommended_memory: u64, + /// If applicable, recommendation for GPU configuration. + pub required_gpu: Option, } impl Pod { @@ -118,23 +82,6 @@ impl Pod { } } -fn serialize_pod(pod: &Pod, serializer: S) -> result::Result -where - S: Serializer, -{ - serializer.serialize_str(&pod.hash) -} - -fn deserialize_pod<'de, D>(deserializer: D) -> result::Result -where - D: Deserializer<'de>, -{ - Ok(Pod { - hash: String::deserialize(deserializer)?, - ..Pod::default() - }) -} - /// A compute job that specifies resource requests and input/output targets. #[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)] pub struct PodJob { @@ -212,23 +159,6 @@ impl PodJob { } } -fn serialize_pod_job(pod_job: &PodJob, serializer: S) -> result::Result -where - S: Serializer, -{ - serializer.serialize_str(&pod_job.hash) -} - -fn deserialize_pod_job<'de, D>(deserializer: D) -> result::Result -where - D: Deserializer<'de>, -{ - Ok(PodJob { - hash: String::deserialize(deserializer)?, - ..PodJob::default() - }) -} - /// Result from a compute job run. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct PodResult { diff --git a/src/uniffi/orchestrator/docker.rs b/src/uniffi/orchestrator/docker.rs new file mode 100644 index 00000000..6124d4cb --- /dev/null +++ b/src/uniffi/orchestrator/docker.rs @@ -0,0 +1,227 @@ +use crate::{ + core::{orchestrator::docker::RE_IMAGE_TAG, util::get}, + uniffi::{ + error::{Kind, OrcaError, Result}, + model::{Pod, PodJob, PodResult}, + orchestrator::{ImageKind, Orchestrator, PodRun, RunInfo}, + }, +}; +use bollard::{ + Docker, + container::{RemoveContainerOptions, StartContainerOptions, WaitContainerOptions}, + image::{CreateImageOptions, ImportImageOptions}, +}; +use futures_util::stream::{StreamExt as _, TryStreamExt as _}; +use std::{collections::HashMap, path::PathBuf}; +use tokio::{fs::File, runtime::Runtime}; +use tokio_util::{ + bytes::{Bytes, BytesMut}, + codec::{BytesCodec, FramedRead}, +}; + +/// Support for an orchestration engine using a local docker installation. +#[derive(Debug)] +pub struct LocalDockerOrchestrator { + /// API to interact with Docker daemon. + pub api: Docker, + async_driver: Runtime, +} + +impl Orchestrator for LocalDockerOrchestrator { + fn start_with_altimage_blocking( + &self, + namespace_lookup: &HashMap, + pod_job: &PodJob, + image: &ImageKind, + ) -> Result { + self.async_driver + .block_on(self.start_with_altimage(namespace_lookup, pod_job, image)) + } + fn start_blocking( + &self, + namespace_lookup: &HashMap, + pod_job: &PodJob, + ) -> Result { + self.async_driver + .block_on(self.start(namespace_lookup, pod_job)) + } + fn list_blocking(&self) -> Result> { + self.async_driver.block_on(self.list()) + } + fn delete_blocking(&self, pod_run: &PodRun) -> Result<()> { + self.async_driver.block_on(self.delete(pod_run)) + } + fn get_info_blocking(&self, pod_run: &PodRun) -> Result { + self.async_driver.block_on(self.get_info(pod_run)) + } + fn get_result_blocking(&self, pod_run: &PodRun) -> Result { + self.async_driver.block_on(self.get_result(pod_run)) + } + #[expect( + clippy::try_err, + reason = r#" + - `map_err` workaround needed since `import_image_stream` requires resolved bytes + - Raising an error manually on occurrence to halt so we don't just ignore + - Should not get as far as `Ok(_)` + "# + )] + async fn start_with_altimage( + &self, + namespace_lookup: &HashMap, + pod_job: &PodJob, + image: &ImageKind, + ) -> Result { + let (assigned_name, container_options, container_config) = match image { + ImageKind::Published(remote_image) => Self::prepare_container_start_inputs( + namespace_lookup, + pod_job, + remote_image.clone(), + )?, + ImageKind::Tarball(image_info) => { + let location = namespace_lookup[&image_info.namespace].join(&image_info.path); + let byte_stream = FramedRead::new(File::open(&location).await?, BytesCodec::new()) + .map_err(|err| -> Result { + let resolved_error = Err::(err.into())?; + Ok(resolved_error) + }) + .map(|result| result.ok().map_or(Bytes::new(), BytesMut::freeze)); + let mut stream = + self.api + .import_image_stream(ImportImageOptions::default(), byte_stream, None); + let mut local_image = String::new(); + while let Some(response) = stream.next().await { + local_image = RE_IMAGE_TAG + .captures_iter(&response?.stream.ok_or(OrcaError::from( + Kind::EmptyResponseWhenLoadingContainerAltImage { + path: location.clone(), + }, + ))?) + .find_map(|x| x.name("image").map(|name| name.as_str().to_owned())) + .ok_or(OrcaError::from(Kind::NoTagFoundInContainerAltImage { + path: location.clone(), + }))?; + } + Self::prepare_container_start_inputs( + namespace_lookup, + pod_job, + local_image.clone(), + )? + } + }; + self.api + .create_container(container_options, container_config) + .await?; + self.api + .start_container(&assigned_name, None::>) + .await?; + Ok(PodRun::new::(pod_job, assigned_name)) + } + async fn start( + &self, + namespace_lookup: &HashMap, + pod_job: &PodJob, + ) -> Result { + let image_options = Some(CreateImageOptions { + from_image: pod_job.pod.image.clone(), + ..Default::default() + }); + self.api + .create_image(image_options, None, None) + .try_collect::>() + .await?; + self.start_with_altimage( + namespace_lookup, + pod_job, + &ImageKind::Published(pod_job.pod.image.clone()), + ) + .await + } + async fn list(&self) -> Result> { + self.list_containers(HashMap::from([( + "label".to_owned(), + vec!["org.orcapod=true".to_owned()], + )])) + .await? + .map(|(assigned_name, run_info)| { + let mut pod: Pod = serde_json::from_str(get(&run_info.labels, "org.orcapod.pod")?)?; + pod.annotation = + serde_json::from_str(get(&run_info.labels, "org.orcapod.pod.annotation")?)?; + pod.hash + .clone_from(get(&run_info.labels, "org.orcapod.pod.hash")?); + let mut pod_job: PodJob = + serde_json::from_str(get(&run_info.labels, "org.orcapod.pod_job")?)?; + pod_job.annotation = + serde_json::from_str(get(&run_info.labels, "org.orcapod.pod_job.annotation")?)?; + pod_job + .hash + .clone_from(get(&run_info.labels, "org.orcapod.pod_job.hash")?); + pod_job.pod = pod; + Ok(PodRun::new::(&pod_job, assigned_name)) + }) + .collect() + } + async fn delete(&self, pod_run: &PodRun) -> Result<()> { + self.api + .remove_container( + &pod_run.assigned_name, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await?; + Ok(()) + } + async fn get_info(&self, pod_run: &PodRun) -> Result { + let labels = vec![ + "org.orcapod=true".to_owned(), + format!( + "org.orcapod.pod_job.annotation={}", + serde_json::to_string(&pod_run.pod_job.annotation)? + ), + format!("org.orcapod.pod_job.hash={}", pod_run.pod_job.hash), + ]; + let (_, run_info) = self + .list_containers(HashMap::from([("label".to_owned(), labels)])) + .await? + .next() + .ok_or(OrcaError::from(Kind::NoMatchingPodRun { + pod_job_hash: pod_run.pod_job.hash.clone(), + }))?; + Ok(run_info) + } + async fn get_result(&self, pod_run: &PodRun) -> Result { + self.api + .wait_container(&pod_run.assigned_name, None::>) + .try_collect::>() + .await?; + let result_info = self.get_info(pod_run).await?; + PodResult::new( + None, + pod_run.pod_job.clone(), + pod_run.assigned_name.clone(), + result_info.status, + result_info.created, + result_info.terminated.ok_or(OrcaError::from( + Kind::InvalidPodResultTerminatedDatetime { + pod_job_hash: pod_run.pod_job.hash.clone(), + }, + ))?, + ) + } +} + +impl LocalDockerOrchestrator { + /// How to create a local docker orchestrator with an absolute path on docker host where binds + /// will be mounted from. + /// + /// # Errors + /// + /// Will return `Err` if there is an issue creating a local docker orchestrator. + pub fn new() -> Result { + Ok(Self { + api: Docker::connect_with_local_defaults()?, + async_driver: Runtime::new()?, + }) + } +} diff --git a/src/orchestrator/mod.rs b/src/uniffi/orchestrator/mod.rs similarity index 95% rename from src/orchestrator/mod.rs rename to src/uniffi/orchestrator/mod.rs index e4e800ab..e247c0fa 100644 --- a/src/orchestrator/mod.rs +++ b/src/uniffi/orchestrator/mod.rs @@ -1,7 +1,6 @@ -use crate::{ +use crate::uniffi::{ error::Result, model::{OrcaPath, PodJob, PodResult}, - util::get_type_name, }; use serde::{Deserialize, Serialize}; use std::{collections::HashMap, future::Future, path::PathBuf}; @@ -58,16 +57,6 @@ pub struct PodRun { pub assigned_name: String, } -impl PodRun { - fn new(pod_job: &PodJob, assigned_name: String) -> Self { - Self { - pod_job: pod_job.clone(), - orchestrator_source: get_type_name::(), - assigned_name, - } - } -} - /// API for standard behavior of any container orchestration engine supported. pub trait Orchestrator { /// How to synchronously start containers with an alternate image. diff --git a/src/uniffi/store/filestore.rs b/src/uniffi/store/filestore.rs new file mode 100644 index 00000000..a6023adf --- /dev/null +++ b/src/uniffi/store/filestore.rs @@ -0,0 +1,84 @@ +use crate::uniffi::{ + error::Result, + model::{Pod, PodJob, PodResult}, + store::{ModelID, ModelInfo, Store}, +}; +use std::{ + fs, + path::{Path, PathBuf}, +}; +/// Support for a storage backend on a local filesystem directory. +#[derive(Debug)] +pub struct LocalFileStore { + /// A local path to a directory where store will be located. + pub directory: PathBuf, +} + +impl Store for LocalFileStore { + fn save_pod(&self, pod: &Pod) -> Result<()> { + self.save_model(pod, &pod.hash, pod.annotation.as_ref()) + } + fn load_pod(&self, model_id: &ModelID) -> Result { + let (mut pod, annotation, hash) = self.load_model::(model_id)?; + pod.annotation = annotation; + pod.hash = hash; + Ok(pod) + } + fn list_pod(&self) -> Result> { + self.list_model::() + } + fn delete_pod(&self, model_id: &ModelID) -> Result<()> { + self.delete_model::(model_id) + } + fn save_pod_job(&self, pod_job: &PodJob) -> Result<()> { + self.save_pod(&pod_job.pod)?; + self.save_model(pod_job, &pod_job.hash, pod_job.annotation.as_ref()) + } + fn load_pod_job(&self, model_id: &ModelID) -> Result { + let (mut pod_job, annotation, hash) = self.load_model::(model_id)?; + pod_job.annotation = annotation; + pod_job.hash = hash; + pod_job.pod = self.load_pod(&ModelID::Hash(pod_job.pod.hash))?; + Ok(pod_job) + } + fn list_pod_job(&self) -> Result> { + self.list_model::() + } + fn delete_pod_job(&self, model_id: &ModelID) -> Result<()> { + self.delete_model::(model_id) + } + fn save_pod_result(&self, pod_result: &PodResult) -> Result<()> { + self.save_pod_job(&pod_result.pod_job)?; + self.save_model(pod_result, &pod_result.hash, pod_result.annotation.as_ref()) + } + fn load_pod_result(&self, model_id: &ModelID) -> Result { + let (mut pod_result, annotation, hash) = self.load_model::(model_id)?; + pod_result.annotation = annotation; + pod_result.hash = hash; + pod_result.pod_job = self.load_pod_job(&ModelID::Hash(pod_result.pod_job.hash))?; + Ok(pod_result) + } + fn list_pod_result(&self) -> Result> { + self.list_model::() + } + fn delete_pod_result(&self, model_id: &ModelID) -> Result<()> { + self.delete_model::(model_id) + } + fn delete_annotation(&self, name: &str, version: &str) -> Result<()> { + let hash = self.lookup_hash::(name, version)?; + let annotation_file = + self.make_path::(&hash, Self::make_annotation_relpath(name, version)); + fs::remove_file(&annotation_file)?; + + Ok(()) + } +} + +impl LocalFileStore { + /// Construct a local file store instance in a specific directory. + pub fn new(directory: impl AsRef) -> Self { + Self { + directory: directory.as_ref().into(), + } + } +} diff --git a/src/store/mod.rs b/src/uniffi/store/mod.rs similarity index 99% rename from src/store/mod.rs rename to src/uniffi/store/mod.rs index 8856b506..5464393e 100644 --- a/src/store/mod.rs +++ b/src/uniffi/store/mod.rs @@ -1,8 +1,7 @@ -use crate::{ +use crate::uniffi::{ error::Result, model::{Pod, PodJob, PodResult}, }; - /// Options for identifying a model. #[derive(Debug, Clone)] pub enum ModelID { diff --git a/tests/crypto.rs b/tests/crypto.rs index af3fcb62..c348bdd2 100644 --- a/tests/crypto.rs +++ b/tests/crypto.rs @@ -1,8 +1,8 @@ #![expect(missing_docs, clippy::panic_in_result_fn, reason = "OK in tests.")] use orcapod::{ - crypto::{hash_buffer, hash_dir, hash_file}, - error::Result, + core::crypto::{hash_buffer, hash_dir, hash_file}, + uniffi::error::Result, }; use std::fs::read; diff --git a/tests/fixture/mod.rs b/tests/fixture/mod.rs index c497de58..405fbb5c 100644 --- a/tests/fixture/mod.rs +++ b/tests/fixture/mod.rs @@ -8,7 +8,7 @@ )] use names::{Generator, Name}; -use orcapod::{ +use orcapod::uniffi::{ error::Result, model::{Annotation, Blob, BlobKind, Input, OrcaPath, Pod, PodJob, PodResult, StreamInfo}, orchestrator::Status, diff --git a/tests/model.rs b/tests/model.rs index 1a73a452..2cc90f09 100644 --- a/tests/model.rs +++ b/tests/model.rs @@ -3,7 +3,7 @@ pub mod fixture; use fixture::{NAMESPACE_LOOKUP_READ_ONLY, pod_job_style, pod_result_style, pod_style}; use indoc::indoc; -use orcapod::{error::Result, model::to_yaml}; +use orcapod::{core::model::to_yaml, uniffi::error::Result}; #[test] fn hash_pod() -> Result<()> { diff --git a/tests/orchestrator.rs b/tests/orchestrator.rs index ffa35226..ad455f46 100644 --- a/tests/orchestrator.rs +++ b/tests/orchestrator.rs @@ -7,7 +7,7 @@ pub mod fixture; use fixture::{TestContainerImage, TestDirs, container_image_style, pod_job_style}; -use orcapod::{ +use orcapod::uniffi::{ error::Result, model::OrcaPath, orchestrator::{ImageKind, Orchestrator as _, PodRun, Status, docker::LocalDockerOrchestrator}, diff --git a/tests/store.rs b/tests/store.rs index 98265317..3b52d989 100644 --- a/tests/store.rs +++ b/tests/store.rs @@ -11,10 +11,12 @@ use fixture::{ NAMESPACE_LOOKUP_READ_ONLY, TestDirs, TestSetup, pod_job_style, pod_result_style, pod_style, }; use orcapod::{ - crypto::hash_buffer, - error::Result, - model::{Annotation, Pod, to_yaml}, - store::{ModelID, ModelInfo, Store as _, filestore::LocalFileStore}, + core::{crypto::hash_buffer, model::to_yaml}, + uniffi::{ + error::Result, + model::{Annotation, Pod}, + store::{ModelID, ModelInfo, Store as _, filestore::LocalFileStore}, + }, }; use std::{collections::HashMap, fmt::Debug, path::Path};