Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
feat: Implement rest catalog tables and add test framework
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Sep 5, 2023
1 parent 5cd51a1 commit 885ae38
Show file tree
Hide file tree
Showing 13 changed files with 573 additions and 26 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ libtest-mimic = "0.6"
futures = { version = "0.3", features = ["executor"] }
testcontainers = { git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39" }
iceberg-rest-api = { path = "./rest_api" }
reqwest = "0.11"
urlencoding = "2"
2 changes: 2 additions & 0 deletions icelake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ serde_bytes = "0.11.12"
toml = "0.7.6"
csv = "1.2.2"
iceberg-rest-api = { workspace = true }
reqwest = { workspace = true }
urlencoding = { workspace = true }


[dev-dependencies]
Expand Down
195 changes: 182 additions & 13 deletions icelake/src/catalog/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,61 @@
use std::collections::HashMap;

use async_trait::async_trait;
use iceberg_rest_api::{
apis::{
catalog_api_api::list_tables, configuration::Configuration,
configuration_api_api::get_config,
},
models::TableIdentifier as RestTableIdentifier,
};
use reqwest::ClientBuilder;

use crate::{
table::{Namespace, TableIdentifier},
types::{PartitionSpec, Schema},
Table,
Error, ErrorKind, Table,
};

use super::{Catalog, UpdateTable};
use crate::error::Result;

/// Configuration for rest catalog.
#[derive(Debug, Default)]
pub struct RestCatalogConfig {
uri: String,
prefix: Option<String>,
warehouse: Option<String>,
}

/// Rest catalog implementation
pub struct RestCatalog {}
pub struct RestCatalog {
name: String,
config: RestCatalogConfig,
// rest client config
rest_client: Configuration,
}

#[async_trait]
impl Catalog for RestCatalog {
/// Return catalog's name.
fn name(&self) -> &str {
todo!()
&self.name
}

/// List tables under namespace.
async fn list_tables(&self, _ns: &Namespace) -> Result<Vec<TableIdentifier>> {
todo!()
async fn list_tables(&self, ns: &Namespace) -> Result<Vec<TableIdentifier>> {
let resp = list_tables(
&self.rest_client,
self.config.prefix.as_deref(),
&ns.encode_in_url()?,
)
.await?;
Ok(resp
.identifiers
.unwrap_or_default()
.iter()
.map(TableIdentifier::from)
.collect())
}

/// Creates a table.
Expand All @@ -38,32 +70,50 @@ impl Catalog for RestCatalog {
_location: &str,
_props: HashMap<String, String>,
) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Creating table in rest client is not implemented yet!",
))
}

/// Check table exists.
async fn table_exists(&self, _table_name: &TableIdentifier) -> Result<bool> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Table exists in rest client is not implemented yet!",
))
}

/// Drop table.
async fn drop_table(&self, _table_name: &TableIdentifier, _purge: bool) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Drop table in rest client is not implemented yet!",
))
}

/// Rename table.
async fn rename_table(&self, _from: &TableIdentifier, _to: &TableIdentifier) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Rename table in rest client is not implemented yet!",
))
}

/// Load table.
async fn load_table(&self, _table_name: &TableIdentifier) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Load table in rest client is not implemented yet!",
))
}

/// Invalidate table.
async fn invalidate_table(&self, _table_name: &TableIdentifier) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Invalidate table in rest client is not implemented yet!",
))
}

/// Register a table using metadata file location.
Expand All @@ -72,11 +122,130 @@ impl Catalog for RestCatalog {
_table_name: &TableIdentifier,
_metadata_file_location: &str,
) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Register table in rest client is not implemented yet!",
))
}

/// Update table.
async fn update_table(&self, _udpate_table: &UpdateTable) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Update table in rest client is not implemented yet!",
))
}
}

impl RestCatalog {
/// Creates rest catalog.
pub async fn new(name: impl AsRef<str>, config: HashMap<String, String>) -> Result<Self> {
let catalog_config = RestCatalog::init_config_from_server(config).await?;
let rest_client = RestCatalog::create_rest_client(&catalog_config)?;

Ok(Self {
name: name.as_ref().to_string(),
config: catalog_config,
rest_client,
})
}

fn create_rest_client(config: &RestCatalogConfig) -> Result<Configuration> {
let mut client = Configuration {
base_path: config.uri.to_string(),
..Default::default()
};

let req = { ClientBuilder::new().build()? };

client.client = req;

Ok(client)
}

async fn init_config_from_server(config: HashMap<String, String>) -> Result<RestCatalogConfig> {
log::info!("Creating rest catalog with user config: {config:?}");
let rest_catalog_config = RestCatalogConfig::try_from(&config)?;
let rest_client = RestCatalog::create_rest_client(&rest_catalog_config)?;

let mut server_config =
get_config(&rest_client, rest_catalog_config.warehouse.as_deref()).await?;

log::info!("Catalog config from rest catalog server: {server_config:?}");
server_config.defaults.extend(config);
server_config.defaults.extend(server_config.overrides);

let ret = RestCatalogConfig::try_from(&server_config.defaults)?;

log::info!("Result rest catalog config after merging with catalog server config: {ret:?}");
Ok(ret)
}
}

impl TryFrom<&HashMap<String, String>> for RestCatalogConfig {
type Error = Error;

fn try_from(value: &HashMap<String, String>) -> Result<RestCatalogConfig> {
let mut config = RestCatalogConfig {
uri: value
.get("uri")
.ok_or_else(|| {
Error::new(
ErrorKind::IcebergDataInvalid,
"uri is missing for rest catalog.",
)
})?
.clone(),
..Default::default()
};

if let Some(warehouse) = value.get("warehouse") {
config.warehouse = Some(warehouse.clone());
}

if let Some(prefix) = value.get("prefix") {
config.prefix = Some(prefix.clone());
}

Ok(config)
}
}

impl From<&RestTableIdentifier> for TableIdentifier {
fn from(value: &RestTableIdentifier) -> Self {
Self {
namespace: Namespace {
levels: value.namespace.clone(),
},
name: value.name.clone(),
}
}
}

impl Namespace {
/// Returns url encoded format.
pub fn encode_in_url(&self) -> Result<String> {
if self.levels.is_empty() {
return Err(Error::new(
ErrorKind::IcebergDataInvalid,
"Can't encode empty namespace in url!",
));
}

Ok(self.levels.join("\u{1F}").to_string())
}
}

#[cfg(test)]
mod tests {
use crate::table::Namespace;

#[test]
fn test_namespace_encode() {
let ns = Namespace {
levels: vec!["a".to_string(), "b".to_string()],
};

assert_eq!("a\u{1F}b", ns.encode_in_url().unwrap());
}
}
16 changes: 16 additions & 0 deletions icelake/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ impl From<std::num::ParseIntError> for Error {
}
}

impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Self::new(ErrorKind::Unexpected, "Failed to create client").set_source(value)
}
}

impl<T: std::fmt::Debug + Send + Sync + 'static> From<iceberg_rest_api::apis::Error<T>> for Error {
fn from(value: iceberg_rest_api::apis::Error<T>) -> Self {
Self::new(
ErrorKind::Unexpected,
"Failed to request from catalog server",
)
.set_source(value)
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
2 changes: 1 addition & 1 deletion icelake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(dead_code)]

mod table;
pub use table::Table;
pub use table::*;
mod error;
pub use error::Error;
pub use error::ErrorKind;
Expand Down
42 changes: 39 additions & 3 deletions icelake/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opendal::services::Fs;
use opendal::Operator;
use regex::Regex;
use url::Url;

use uuid::Uuid;

use crate::config::{TableConfig, TableConfigRef};
Expand All @@ -22,14 +23,49 @@ const VERSION_HINT_FILENAME: &str = "version-hint.text";
const VERSIONED_TABLE_METADATA_FILE_PATTERN: &str = r"v([0-9]+).metadata.json";

/// Namespace of tables
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Namespace {
levels: Vec<String>,
/// Levels in namespace.
pub levels: Vec<String>,
}

impl Namespace {
/// Creates namespace
pub fn new(levels: impl IntoIterator<Item = impl ToString>) -> Self {
Self {
levels: levels.into_iter().map(|s| s.to_string()).collect(),
}
}
}

/// Full qualified name of table.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct TableIdentifier {
namespace: Namespace,
name: String,
/// Namespace
pub namespace: Namespace,
/// Table name
pub name: String,
}

impl TableIdentifier {
/// Creates a full qualified table identifier from a list of names.
pub fn new(names: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
let mut names: Vec<String> = names.into_iter().map(|s| s.to_string()).collect();
if names.is_empty() {
return Err(Error::new(
ErrorKind::IcebergDataInvalid,
"Table identifier can't be empty!",
));
}

let table_name = names.pop().unwrap();
let ns = Namespace { levels: names };

Ok(Self {
namespace: ns,
name: table_name,
})
}
}

/// Table is the main entry point for the IceLake.
Expand Down
Loading

0 comments on commit 885ae38

Please sign in to comment.