diff --git a/core/Cargo.lock b/core/Cargo.lock index 27894c616d86..72f3ca18b3ca 100644 --- a/core/Cargo.lock +++ b/core/Cargo.lock @@ -1384,6 +1384,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "bstr" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "bumpalo" version = "3.19.0" @@ -3508,6 +3518,19 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "globset" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52dfc19153a48bde0cbd630453615c8151bce3a5adfac7a0aebfbf0a1e1f57e3" +dependencies = [ + "aho-corasick", + "bstr", + "log", + "regex-automata", + "regex-syntax", +] + [[package]] name = "gloo-timers" version = "0.3.0" @@ -5589,6 +5612,7 @@ dependencies = [ "opendal-layer-prometheus", "opendal-layer-prometheus-client", "opendal-layer-retry", + "opendal-layer-route", "opendal-layer-tail-cut", "opendal-layer-throttle", "opendal-layer-timeout", @@ -5944,6 +5968,17 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "opendal-layer-route" +version = "0.55.0" +dependencies = [ + "futures", + "globset", + "opendal-core", + "opendal-service-fs", + "tokio", +] + [[package]] name = "opendal-layer-tail-cut" version = "0.55.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 55b3962b9363..d25ea8ecce13 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -109,6 +109,7 @@ layers-otel-trace = ["dep:opendal-layer-oteltrace"] layers-prometheus = ["dep:opendal-layer-prometheus"] layers-prometheus-client = ["dep:opendal-layer-prometheus-client"] layers-retry = ["dep:opendal-layer-retry"] +layers-route = ["dep:opendal-layer-route"] layers-tail-cut = ["dep:opendal-layer-tail-cut"] layers-throttle = ["dep:opendal-layer-throttle"] layers-timeout = ["dep:opendal-layer-timeout"] @@ -222,6 +223,7 @@ opendal-layer-oteltrace = { path = "layers/oteltrace", version = "0.55.0", optio opendal-layer-prometheus = { path = "layers/prometheus", version = "0.55.0", optional = true, default-features = false } opendal-layer-prometheus-client = { path = "layers/prometheus-client", version = "0.55.0", optional = true, default-features = false } opendal-layer-retry = { path = "layers/retry", version = "0.55.0", optional = true, default-features = false } +opendal-layer-route = { path = "layers/route", version = "0.55.0", optional = true, default-features = false } opendal-layer-tail-cut = { path = "layers/tail-cut", version = "0.55.0", optional = true, default-features = false } opendal-layer-throttle = { path = "layers/throttle", version = "0.55.0", optional = true, default-features = false } opendal-layer-timeout = { path = "layers/timeout", version = "0.55.0", optional = true, default-features = false } diff --git a/core/layers/route/Cargo.toml b/core/layers/route/Cargo.toml new file mode 100644 index 000000000000..6be11127f13a --- /dev/null +++ b/core/layers/route/Cargo.toml @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +description = "Apache OpenDAL route layer" +name = "opendal-layer-route" + +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[package.metadata.docs.rs] +all-features = true + +[dependencies] +globset = "0.4.15" +opendal-core = { path = "../../core", version = "0.55.0", default-features = false } + +[dev-dependencies] +futures = { workspace = true } +opendal-core = { path = "../../core", version = "0.55.0" } +opendal-service-fs = { path = "../../services/fs", version = "0.55.0" } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/core/layers/route/src/lib.rs b/core/layers/route/src/lib.rs new file mode 100644 index 000000000000..6e555cb1fa2e --- /dev/null +++ b/core/layers/route/src/lib.rs @@ -0,0 +1,435 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Route layer implementation for Apache OpenDAL. + +#![cfg_attr(docsrs, feature(doc_cfg))] +#![deny(missing_docs)] + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use globset::Glob; +use globset::GlobSet; +use globset::GlobSetBuilder; +use opendal_core::raw::*; +use opendal_core::*; + +/// Route operations to different operators by matching paths with glob patterns. +#[derive(Clone, Debug)] +pub struct RouteLayer { + router: Arc, +} + +impl RouteLayer { + /// Create a builder for `RouteLayer`. + pub fn builder() -> RouteLayerBuilder { + RouteLayerBuilder::default() + } +} + +/// Builder for `RouteLayer`. +#[derive(Default)] +pub struct RouteLayerBuilder { + routes: Vec, +} + +impl RouteLayerBuilder { + /// Add a route with a glob pattern. + pub fn route(mut self, pattern: impl AsRef, op: Operator) -> Self { + self.routes.push(RouteEntry { + pattern: pattern.as_ref().to_string(), + accessor: op.into_inner(), + }); + self + } + + /// Build the `RouteLayer`. + /// + /// Returns an error if any glob pattern fails to compile. + pub fn build(self) -> Result { + let mut builder = GlobSetBuilder::new(); + let mut targets = Vec::with_capacity(self.routes.len()); + + for entry in self.routes { + let glob = Glob::new(&entry.pattern).map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "invalid route glob pattern") + .with_context("pattern", entry.pattern.clone()) + .with_context("source", err.to_string()) + })?; + builder.add(glob); + targets.push(entry.accessor); + } + + let glob = builder.build().map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "failed to build route glob set") + .with_context("source", err.to_string()) + })?; + + Ok(RouteLayer { + router: Arc::new(RouteRouter { glob, targets }), + }) + } +} + +struct RouteEntry { + pattern: String, + accessor: Accessor, +} + +#[derive(Debug)] +struct RouteRouter { + glob: GlobSet, + targets: Vec, +} + +impl RouteRouter { + fn match_index(&self, path: &str) -> Option { + self.glob.matches(path).into_iter().min() + } + + fn select(&self, path: &str, default: &Accessor) -> Accessor { + self.match_index(path) + .and_then(|idx| self.targets.get(idx).cloned()) + .unwrap_or_else(|| default.clone()) + } + + fn target(&self, idx: usize, default: &Accessor) -> Accessor { + self.targets + .get(idx) + .cloned() + .unwrap_or_else(|| default.clone()) + } +} + +impl Layer for RouteLayer { + type LayeredAccess = RouteAccessor; + + fn layer(&self, inner: Accessor) -> Self::LayeredAccess { + RouteAccessor { + inner, + router: self.router.clone(), + } + } +} + +/// Accessor that routes operations to different targets based on path. +pub struct RouteAccessor { + inner: Accessor, + router: Arc, +} + +impl Debug for RouteAccessor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl RouteAccessor { + fn select(&self, path: &str) -> Accessor { + self.router.select(path, &self.inner) + } +} + +impl LayeredAccess for RouteAccessor { + type Inner = Accessor; + type Reader = oio::Reader; + type Writer = oio::Writer; + type Lister = oio::Lister; + type Deleter = RouteDeleter; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.select(path).create_dir(path, args).await + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.select(path).read(path, args).await + } + + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.select(path).write(path, args).await + } + + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { + self.select(from).copy(from, to, args).await + } + + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.select(from).rename(from, to, args).await + } + + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.select(path).stat(path, args).await + } + + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + RouteDeleter::new(self.inner.clone(), self.router.clone()), + )) + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.select(path).list(path, args).await + } + + async fn presign(&self, path: &str, args: OpPresign) -> Result { + self.select(path).presign(path, args).await + } +} + +/// Deleter that batches deletions per routed accessor. +pub struct RouteDeleter { + default: Accessor, + router: Arc, + default_deleter: Option, + target_deleters: Vec>, +} + +impl RouteDeleter { + fn new(default: Accessor, router: Arc) -> Self { + let mut target_deleters = Vec::with_capacity(router.targets.len()); + target_deleters.resize_with(router.targets.len(), || None); + Self { + default, + router, + default_deleter: None, + target_deleters, + } + } + + async fn get_deleter(&mut self, key: RouteKey) -> Result<&mut oio::Deleter> { + match key { + RouteKey::Default => { + if self.default_deleter.is_none() { + let (_, deleter) = self.default.delete().await?; + self.default_deleter = Some(deleter); + } + Ok(self + .default_deleter + .as_mut() + .expect("default deleter must exist")) + } + RouteKey::Target(idx) => { + if idx >= self.target_deleters.len() { + if self.default_deleter.is_none() { + let (_, deleter) = self.default.delete().await?; + self.default_deleter = Some(deleter); + } + return Ok(self + .default_deleter + .as_mut() + .expect("default deleter must exist")); + } + if self.target_deleters[idx].is_none() { + let accessor = self.router.target(idx, &self.default); + let (_, deleter) = accessor.delete().await?; + self.target_deleters[idx] = Some(deleter); + } + Ok(self.target_deleters[idx] + .as_mut() + .expect("target deleter must exist")) + } + } + } +} + +impl oio::Delete for RouteDeleter { + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + let key = match self.router.match_index(path) { + Some(idx) => RouteKey::Target(idx), + None => RouteKey::Default, + }; + let deleter = self.get_deleter(key).await?; + deleter.delete(path, args).await + } + + async fn close(&mut self) -> Result<()> { + let mut first_err = None; + if let Some(deleter) = self.default_deleter.as_mut() { + if let Err(err) = deleter.close().await { + first_err = Some(err); + } + } + for deleter in self.target_deleters.iter_mut().flatten() { + if let Err(err) = deleter.close().await { + if first_err.is_none() { + first_err = Some(err); + } + } + } + + match first_err { + Some(err) => Err(err), + None => Ok(()), + } + } +} + +#[derive(Debug, Hash, PartialEq, Eq)] +enum RouteKey { + Default, + Target(usize), +} + +#[cfg(test)] +mod tests { + use std::path::Path; + use std::path::PathBuf; + use std::time::SystemTime; + use std::time::UNIX_EPOCH; + + use opendal_service_fs::Fs; + + use super::*; + + fn build_memory_operator() -> Result { + Ok(Operator::new(services::Memory::default())?.finish()) + } + + async fn build_fs_operator(label: &str) -> Result<(Operator, PathBuf)> { + let root = fs_root(label); + tokio::fs::create_dir_all(&root) + .await + .map_err(new_std_io_error)?; + let op = Operator::new(Fs::default().root(&root.to_string_lossy()))?.finish(); + Ok((op, root)) + } + + fn fs_root(label: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time before UNIX_EPOCH") + .as_nanos(); + let mut root = std::env::temp_dir(); + root.push(format!( + "opendal-route-{label}-{nanos}-{}", + std::process::id() + )); + root + } + + async fn cleanup_fs_root(root: &Path) { + let _ = tokio::fs::remove_dir_all(root).await; + } + + async fn assert_missing(op: &Operator, path: &str) -> Result<()> { + let err = op.stat(path).await.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::NotFound); + Ok(()) + } + + #[tokio::test] + async fn test_first_match_wins() -> Result<()> { + let default_op = build_memory_operator()?; + let (fast_op, fast_root) = build_fs_operator("first-match-fast").await?; + let slow_op = build_memory_operator()?; + + let routed = default_op.clone().layer( + RouteLayer::builder() + .route("data/*.txt", fast_op.clone()) + .route("data/**", slow_op.clone()) + .build()?, + ); + + routed.write("data/file.txt", "v").await?; + + assert!(fast_op.stat("data/file.txt").await.is_ok()); + assert_missing(&slow_op, "data/file.txt").await?; + assert_missing(&default_op, "data/file.txt").await?; + + cleanup_fs_root(&fast_root).await; + Ok(()) + } + + #[tokio::test] + async fn test_fallback_to_default() -> Result<()> { + let default_op = build_memory_operator()?; + let (hot_op, hot_root) = build_fs_operator("fallback-hot").await?; + + let routed = default_op.clone().layer( + RouteLayer::builder() + .route("hot/**", hot_op.clone()) + .build()?, + ); + + routed.write("cold/file.txt", "v").await?; + + assert!(default_op.stat("cold/file.txt").await.is_ok()); + assert_missing(&hot_op, "cold/file.txt").await?; + + cleanup_fs_root(&hot_root).await; + Ok(()) + } + + #[tokio::test] + async fn test_copy_and_rename_route_by_from() -> Result<()> { + let default_op = build_memory_operator()?; + let (hot_op, hot_root) = build_fs_operator("copy-rename-hot").await?; + + let routed = default_op.clone().layer( + RouteLayer::builder() + .route("hot/**", hot_op.clone()) + .build()?, + ); + + routed.write("hot/src.txt", "v").await?; + routed.copy("hot/src.txt", "cold/copied.txt").await?; + + assert!(hot_op.stat("hot/src.txt").await.is_ok()); + assert!(hot_op.stat("cold/copied.txt").await.is_ok()); + assert_missing(&default_op, "cold/copied.txt").await?; + + routed.write("hot/src2.txt", "v").await?; + routed.rename("hot/src2.txt", "cold/renamed.txt").await?; + + assert_missing(&hot_op, "hot/src2.txt").await?; + assert!(hot_op.stat("cold/renamed.txt").await.is_ok()); + assert_missing(&default_op, "cold/renamed.txt").await?; + + cleanup_fs_root(&hot_root).await; + Ok(()) + } + + #[tokio::test] + async fn test_delete_iter_routes_per_path() -> Result<()> { + let default_op = build_memory_operator()?; + let (hot_op, hot_root) = build_fs_operator("delete-hot").await?; + + let routed = default_op.clone().layer( + RouteLayer::builder() + .route("hot/**", hot_op.clone()) + .build()?, + ); + + routed.write("hot/a.txt", "v").await?; + routed.write("cold/b.txt", "v").await?; + + routed.delete_iter(["hot/a.txt", "cold/b.txt"]).await?; + + assert_missing(&hot_op, "hot/a.txt").await?; + assert_missing(&default_op, "cold/b.txt").await?; + + cleanup_fs_root(&hot_root).await; + Ok(()) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index da5589d779d4..ddd9ba2ae4b9 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -405,6 +405,8 @@ pub mod layers { pub use opendal_layer_prometheus_client::*; #[cfg(feature = "layers-retry")] pub use opendal_layer_retry::*; + #[cfg(feature = "layers-route")] + pub use opendal_layer_route::*; #[cfg(feature = "layers-tail-cut")] pub use opendal_layer_tail_cut::*; #[cfg(feature = "layers-throttle")]