Skip to content

Commit

Permalink
add list io
Browse files Browse the repository at this point in the history
  • Loading branch information
twuebi committed Sep 13, 2024
1 parent f971738 commit 0c8ea22
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 37 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ log = "0.4"
mockito = "1"
murmur3 = "0.5.2"
once_cell = "1"
opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" }
#opendal = { git="https://github.com/twuebi/opendal.git", rev = "a9e3d88e97" }
opendal = { path = "../opendal/core"}
ordered-float = "4"
parquet = "52"
pilota = "0.11.2"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use uuid::Uuid;

use crate::spec::{
FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder,
UnboundPartitionSpec, ViewRepresentations, ViewVersion
UnboundPartitionSpec, ViewRepresentations, ViewVersion,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
Expand Down
39 changes: 38 additions & 1 deletion crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use opendal::Operator;
use futures::Stream;
use futures::stream::BoxStream;
use opendal::{Entry, Operator};
use url::Url;

use super::storage::Storage;
Expand Down Expand Up @@ -73,6 +75,41 @@ impl FileIO {
Ok(op.remove_all(relative_path).await?)
}

/// Lists all files in the directory.
pub async fn list(&self, path: impl AsRef<str>, recursive: bool) -> Result<Vec<Entry>> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Ok(op.list_with(relative_path).recursive(recursive).await?)
}

/// Lists all files in the directory with pagination.
pub async fn list_recursive_paginated(
&self,
path: impl AsRef<str>,
recursive: bool,
page_size: usize,
) -> BoxStream<Result<Vec<Entry>>> {
let path = path.as_ref().to_string();
Box::pin(async_stream::try_stream! {

let (op, relative_path) = self.inner.create_operator(&path)?;

let mut next_future = op.list_with(relative_path).recursive(recursive).limit(page_size);

loop {
let entries = next_future.await?;

let last_path = entries.last().map(|e| e.path().to_string());
yield entries;

if let Some(last) = last_path {
next_future = op.list_with(relative_path).recursive(recursive).start_after(&last).limit(page_size);
} else {
break
}
}
})
}

/// Check file exists.
pub async fn is_exist(&self, path: impl AsRef<str>) -> Result<bool> {
let (op, relative_path) = self.inner.create_operator(&path)?;
Expand Down
5 changes: 2 additions & 3 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,12 @@ mod storage_s3;
#[cfg(feature = "storage-s3")]
pub use storage_s3::*;
pub(crate) mod object_cache;
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-azdls")]
mod storage_azdls;
#[cfg(feature = "storage-fs")]
mod storage_fs;
#[cfg(feature = "storage-azdls")]
pub use storage_azdls::ConfigKeys as AzdlsConfigKeys;

#[cfg(feature = "storage-fs")]
use storage_fs::*;
#[cfg(feature = "storage-gcs")]
Expand Down
28 changes: 13 additions & 15 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

use std::sync::Arc;

#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;
#[cfg(feature = "storage-gcs")]
use opendal::services::GcsConfig;
#[cfg(feature = "storage-s3")]
use opendal::services::S3Config;
use opendal::{Operator, Scheme};
#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;

#[cfg(feature = "storage-azdls")]
use super::storage_azdls;
use super::FileIOBuilder;
Expand All @@ -48,9 +49,7 @@ pub(crate) enum Storage {
config: Arc<S3Config>,
},
#[cfg(feature = "storage-azdls")]
Azdls {
config: Arc<AzdlsConfig>
},
Azdls { config: Arc<AzdlsConfig> },
#[cfg(feature = "storage-gcs")]
Gcs { config: Arc<GcsConfig> },
}
Expand All @@ -77,12 +76,9 @@ impl Storage {
config: super::gcs_config_parse(props)?.into(),
}),
#[cfg(feature = "storage-azdls")]
Scheme::Azdls => {

Ok(Self::Azdls {
config: storage_azdls::azdls_config_parse(props)?.into(),
})
}
Scheme::Azdls => Ok(Self::Azdls {
config: storage_azdls::azdls_config_parse(props)?.into(),
}),
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
Expand Down Expand Up @@ -162,14 +158,16 @@ impl Storage {
}
}
#[cfg(feature = "storage-azdls")]
Storage::Azdls { config } => {
Ok((Operator::from_config(config.as_ref().clone())?.finish(), &path["azdls://".len()..]))
}
Storage::Azdls { config } => Ok((
Operator::from_config(config.as_ref().clone())?.finish(),
&path["azdls://".len()..],
)),
#[cfg(all(
not(feature = "storage-s3"),
not(feature = "storage-fs"),
not(feature = "storage-gcs"),
not(feature = "storage-azdls")))]
not(feature = "storage-azdls")
))]
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
"No storage service has been enabled",
Expand Down
12 changes: 8 additions & 4 deletions crates/iceberg/src/io/storage_azdls.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::collections::HashMap;
use std::str::FromStr;

use opendal::services::AzdlsConfig;

use crate::{Error, ErrorKind, Result};

/// Azdls configuration keys with conversions to [`opendal::Operator`] configuration keys.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display)]
#[derive(
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, strum::EnumString, strum::Display,
)]
#[strum(serialize_all = "snake_case")]
pub enum ConfigKeys {
/// Az endpoint to use
Expand All @@ -22,7 +26,7 @@ pub enum ConfigKeys {
/// Az filesystem to use, also known as container
Filesystem,
/// Az authority host, used for client credential flow
AuthorityHost
AuthorityHost,
}

pub(crate) fn azdls_config_parse(m: HashMap<String, String>) -> Result<AzdlsConfig> {
Expand All @@ -42,9 +46,9 @@ pub(crate) fn azdls_config_parse(m: HashMap<String, String>) -> Result<AzdlsConf
ConfigKeys::AccountKey => cfg.account_key = Some(v),
ConfigKeys::AccountName => cfg.account_name = Some(v),
ConfigKeys::Filesystem => cfg.filesystem = v,
ConfigKeys::AuthorityHost => cfg.authority_host = Some(v)
ConfigKeys::AuthorityHost => cfg.authority_host = Some(v),
}
}

Ok(cfg)
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mod catalog;

pub use catalog::{
Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement,
TableUpdate, ViewCreation, ViewUpdate
TableUpdate, ViewCreation, ViewUpdate,
};

pub mod table;
Expand Down
16 changes: 9 additions & 7 deletions crates/iceberg/src/spec/view_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::sync::Arc;

use _serde::ViewMetadataEnum;
use chrono::{DateTime, Utc};
use itertools::{FoldWhile, Itertools};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
Expand All @@ -33,11 +34,11 @@ use super::view_version::{ViewVersion, ViewVersionId, ViewVersionRef};
use super::{Schema, SchemaId, SchemaRef, ViewRepresentation};
use crate::catalog::ViewCreation;
use crate::error::{timestamp_ms_to_utc, Result};

use crate::spec::view_properties::{
REPLACE_DROP_DIALECT_ALLOWED, REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VERSION_HISTORY_SIZE,
VERSION_HISTORY_SIZE_DEFAULT,
};
use crate::Error;
use itertools::{FoldWhile, Itertools};
use crate::spec::view_properties::{REPLACE_DROP_DIALECT_ALLOWED, REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VERSION_HISTORY_SIZE, VERSION_HISTORY_SIZE_DEFAULT};


/// Reference to [`ViewMetadata`].
pub type ViewMetadataRef = Arc<ViewMetadata>;
Expand Down Expand Up @@ -305,7 +306,9 @@ fn is_same_version(a: &ViewVersion, b: &ViewVersion) -> bool {
}

fn is_same_schema(a: &Schema, b: &Schema) -> bool {
a.as_struct() == b.as_struct() && a.identifier_field_ids().collect::<HashSet<_>>() == b.identifier_field_ids().collect::<HashSet<_>>()
a.as_struct() == b.as_struct()
&& a.identifier_field_ids().collect::<HashSet<_>>()
== b.identifier_field_ids().collect::<HashSet<_>>()
}

/// Manipulating view metadata.
Expand Down Expand Up @@ -827,9 +830,8 @@ mod tests {
use std::sync::Arc;

use anyhow::Result;
use uuid::Uuid;

use pretty_assertions::assert_eq;
use uuid::Uuid;

use super::{ViewFormatVersion, ViewMetadataBuilder, ViewVersionLog};
use crate::spec::{
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/spec/view_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl IntoIterator for ViewRepresentations {
}
}


/// A builder for [`ViewRepresentations`].
pub struct ViewRepresentationsBuilder(Vec<ViewRepresentation>);

Expand All @@ -185,9 +184,10 @@ impl ViewRepresentationsBuilder {

/// Add a SQL representation to the list.
pub fn add_sql_representation(self, sql: String, dialect: String) -> Self {
self.add_representation(ViewRepresentation::Sql(
SqlViewRepresentation { sql, dialect },
))
self.add_representation(ViewRepresentation::Sql(SqlViewRepresentation {
sql,
dialect,
}))
}

/// Build the list of representations.
Expand Down

0 comments on commit 0c8ea22

Please sign in to comment.