From 6b7a5843317ad83e31d1779a5b6809519dab3494 Mon Sep 17 00:00:00 2001 From: Tobias Puetz Date: Tue, 13 Aug 2024 09:38:33 +0200 Subject: [PATCH] support azdls --- Cargo.toml | 3 +- crates/iceberg/Cargo.toml | 4 ++- crates/iceberg/src/io/file_io.rs | 6 ++++ crates/iceberg/src/io/mod.rs | 4 +++ crates/iceberg/src/io/storage.rs | 25 +++++++++++-- crates/iceberg/src/io/storage_azdls.rs | 50 ++++++++++++++++++++++++++ 6 files changed, 87 insertions(+), 5 deletions(-) create mode 100644 crates/iceberg/src/io/storage_azdls.rs diff --git a/Cargo.toml b/Cargo.toml index 20a8d072f..d5a9b759e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ log = "0.4" mockito = "1" murmur3 = "0.5.2" once_cell = "1" -opendal = "0.49" +opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" } ordered-float = "4" parquet = "52" pilota = "0.11.2" @@ -84,6 +84,7 @@ serde_derive = "1" serde_json = "1" serde_repr = "0.1.16" serde_with = "3.4" +strum = "0.26.3" tempfile = "3.8" tokio = { version = "1", default-features = false } typed-builder = "0.19" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6218e98e5..1597a8c97 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -30,11 +30,12 @@ keywords = ["iceberg"] [features] default = ["storage-memory", "storage-fs", "storage-s3", "tokio"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-azdls", "storage-gcs"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] +storage-azdls = ["opendal/services-azdls"] storage-gcs = ["opendal/services-gcs"] async-std = ["dep:async-std"] @@ -74,6 +75,7 @@ serde_derive = { workspace = true } serde_json = { workspace = true } serde_repr = { workspace = true } serde_with = { workspace = true } +strum = { workspace = true } tokio = { workspace = true, optional = true } typed-builder = { workspace = true } url = { workspace = true } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 9af398270..8f63a9acd 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -67,6 +67,12 @@ impl FileIO { Ok(op.delete(relative_path).await?) } + /// Deletes all files in the directory. + pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.remove_all(relative_path).await?) + } + /// Check file exists. pub async fn is_exist(&self, path: impl AsRef) -> Result { let (op, relative_path) = self.inner.create_operator(&path)?; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 52a1da23a..50a3a1f8d 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -81,6 +81,10 @@ pub use storage_s3::*; pub(crate) mod object_cache; #[cfg(feature = "storage-fs")] mod storage_fs; +#[cfg(feature = "storage-azdls")] +mod storage_azdls; +#[cfg(feature = "storage-azdls")] +pub use storage_azdls::ConfigKeys as AzdlsConfigKeys; #[cfg(feature = "storage-fs")] use storage_fs::*; diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 682b1d33e..991dff6ac 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -22,7 +22,10 @@ use opendal::services::GcsConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; use opendal::{Operator, Scheme}; - +#[cfg(feature = "storage-azdls")] +use opendal::services::AzdlsConfig; +#[cfg(feature = "storage-azdls")] +use super::storage_azdls; use super::FileIOBuilder; use crate::{Error, ErrorKind}; @@ -44,6 +47,10 @@ pub(crate) enum Storage { client: reqwest::Client, config: Arc, }, + #[cfg(feature = "storage-azdls")] + Azdls { + config: Arc + }, #[cfg(feature = "storage-gcs")] Gcs { config: Arc }, } @@ -69,6 +76,13 @@ impl Storage { Scheme::Gcs => Ok(Self::Gcs { config: super::gcs_config_parse(props)?.into(), }), + #[cfg(feature = "storage-azdls")] + Scheme::Azdls => { + + Ok(Self::Azdls { + config: storage_azdls::azdls_config_parse(props)?.into(), + }) + } _ => Err(Error::new( ErrorKind::FeatureUnsupported, format!("Constructing file io from scheme: {scheme} not supported now",), @@ -147,11 +161,15 @@ impl Storage { )) } } + #[cfg(feature = "storage-azdls")] + Storage::Azdls { config } => { + Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..])) + } #[cfg(all( not(feature = "storage-s3"), not(feature = "storage-fs"), - not(feature = "storage-gcs") - ))] + not(feature = "storage-gcs"), + not(feature = "storage-azdls")))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, "No storage service has been enabled", @@ -165,6 +183,7 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "azdls" => Ok(Scheme::Azdls), "gs" => Ok(Scheme::Gcs), s => Ok(s.parse::()?), } diff --git a/crates/iceberg/src/io/storage_azdls.rs b/crates/iceberg/src/io/storage_azdls.rs new file mode 100644 index 000000000..3878e2410 --- /dev/null +++ b/crates/iceberg/src/io/storage_azdls.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::str::FromStr; +use opendal::services::AzdlsConfig; +use crate::{Error, ErrorKind, Result}; + +/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)] +#[strum(serialize_all = "snake_case")] +pub enum ConfigKeys { + /// Az endpoint to use + Endpoint, + /// Az client id, used for client credential flow, created in microsoft app registration + ClientId, + /// Az client secret, used for client credential flow, created in microsoft app registration + ClientSecret, + /// Az tenant id, required for client credential flow + TenantId, + /// Az account key, used for shared key authentication + AccountKey, + /// Az storage account name + AccountName, + /// Az filesystem to use, also known as container + Filesystem, + /// Az authority host, used for client credential flow + AuthorityHost +} + +pub(crate) fn azdls_config_parse(m: HashMap) -> Result { + let mut cfg = AzdlsConfig::default(); + for (k, v) in m.into_iter() { + let config_key = ConfigKeys::from_str(k.as_str()).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid azdls config key: {}", k), + ) + })?; + match config_key { + ConfigKeys::Endpoint => cfg.endpoint = Some(v), + ConfigKeys::ClientId => cfg.client_id = Some(v), + ConfigKeys::ClientSecret => cfg.client_secret = Some(v), + ConfigKeys::TenantId => cfg.tenant_id = Some(v), + ConfigKeys::AccountKey => cfg.account_key = Some(v), + ConfigKeys::AccountName => cfg.account_name = Some(v), + ConfigKeys::Filesystem => cfg.filesystem = v, + ConfigKeys::AuthorityHost => cfg.authority_host = Some(v) + } + } + + Ok(cfg) +} \ No newline at end of file