Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

feat: Implement rest catalog tables and add test framework #179

Merged
merged 2 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ futures = { version = "0.3", features = ["executor"] }
testcontainers = { git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39" }
iceberg-rest-api = { path = "./rest_api" }
murmur3 = "0.5.2"
reqwest = "0.11"
urlencoding = "2"
3 changes: 3 additions & 0 deletions icelake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ toml = "0.7.6"
csv = "1.2.2"
iceberg-rest-api = { workspace = true }
murmur3 = { workspace = true }
reqwest = { workspace = true }
urlencoding = { workspace = true }


[dev-dependencies]
tempfile = { workspace = true }
Expand Down
195 changes: 182 additions & 13 deletions icelake/src/catalog/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,61 @@
use std::collections::HashMap;

use async_trait::async_trait;
use iceberg_rest_api::{
apis::{
catalog_api_api::list_tables, configuration::Configuration,
configuration_api_api::get_config,
},
models::TableIdentifier as RestTableIdentifier,
};
use reqwest::ClientBuilder;

use crate::{
table::{Namespace, TableIdentifier},
types::{PartitionSpec, Schema},
Table,
Error, ErrorKind, Table,
};

use super::{Catalog, UpdateTable};
use crate::error::Result;

/// Configuration for rest catalog.
#[derive(Debug, Default)]
pub struct RestCatalogConfig {
uri: String,
prefix: Option<String>,
warehouse: Option<String>,
}

/// Rest catalog implementation
pub struct RestCatalog {}
pub struct RestCatalog {
name: String,
config: RestCatalogConfig,
// rest client config
rest_client: Configuration,
}

#[async_trait]
impl Catalog for RestCatalog {
/// Return catalog's name.
fn name(&self) -> &str {
todo!()
&self.name
}

/// List tables under namespace.
async fn list_tables(&self, _ns: &Namespace) -> Result<Vec<TableIdentifier>> {
todo!()
async fn list_tables(&self, ns: &Namespace) -> Result<Vec<TableIdentifier>> {
let resp = list_tables(
&self.rest_client,
self.config.prefix.as_deref(),
&ns.encode_in_url()?,
)
.await?;
Ok(resp
.identifiers
.unwrap_or_default()
.iter()
.map(TableIdentifier::from)
.collect())
}

/// Creates a table.
Expand All @@ -38,32 +70,50 @@ impl Catalog for RestCatalog {
_location: &str,
_props: HashMap<String, String>,
) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Creating table in rest client is not implemented yet!",
))
}

/// Check table exists.
async fn table_exists(&self, _table_name: &TableIdentifier) -> Result<bool> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Table exists in rest client is not implemented yet!",
))
}

/// Drop table.
async fn drop_table(&self, _table_name: &TableIdentifier, _purge: bool) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Drop table in rest client is not implemented yet!",
))
}

/// Rename table.
async fn rename_table(&self, _from: &TableIdentifier, _to: &TableIdentifier) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Rename table in rest client is not implemented yet!",
))
}

/// Load table.
async fn load_table(&self, _table_name: &TableIdentifier) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Load table in rest client is not implemented yet!",
))
}

/// Invalidate table.
async fn invalidate_table(&self, _table_name: &TableIdentifier) -> Result<()> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Invalidate table in rest client is not implemented yet!",
))
}

/// Register a table using metadata file location.
Expand All @@ -72,11 +122,130 @@ impl Catalog for RestCatalog {
_table_name: &TableIdentifier,
_metadata_file_location: &str,
) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Register table in rest client is not implemented yet!",
))
}

/// Update table.
async fn update_table(&self, _udpate_table: &UpdateTable) -> Result<Table> {
todo!()
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Update table in rest client is not implemented yet!",
))
}
}

impl RestCatalog {
/// Creates rest catalog.
pub async fn new(name: impl AsRef<str>, config: HashMap<String, String>) -> Result<Self> {
let catalog_config = RestCatalog::init_config_from_server(config).await?;
let rest_client = RestCatalog::create_rest_client(&catalog_config)?;

Ok(Self {
name: name.as_ref().to_string(),
config: catalog_config,
rest_client,
})
}

fn create_rest_client(config: &RestCatalogConfig) -> Result<Configuration> {
let mut client = Configuration {
base_path: config.uri.to_string(),
..Default::default()
};

let req = { ClientBuilder::new().build()? };

client.client = req;

Ok(client)
}

async fn init_config_from_server(config: HashMap<String, String>) -> Result<RestCatalogConfig> {
log::info!("Creating rest catalog with user config: {config:?}");
let rest_catalog_config = RestCatalogConfig::try_from(&config)?;
let rest_client = RestCatalog::create_rest_client(&rest_catalog_config)?;

let mut server_config =
get_config(&rest_client, rest_catalog_config.warehouse.as_deref()).await?;

log::info!("Catalog config from rest catalog server: {server_config:?}");
server_config.defaults.extend(config);
server_config.defaults.extend(server_config.overrides);

let ret = RestCatalogConfig::try_from(&server_config.defaults)?;

log::info!("Result rest catalog config after merging with catalog server config: {ret:?}");
Ok(ret)
}
}

impl TryFrom<&HashMap<String, String>> for RestCatalogConfig {
type Error = Error;

fn try_from(value: &HashMap<String, String>) -> Result<RestCatalogConfig> {
let mut config = RestCatalogConfig {
uri: value
.get("uri")
.ok_or_else(|| {
Error::new(
ErrorKind::IcebergDataInvalid,
"uri is missing for rest catalog.",
)
})?
.clone(),
..Default::default()
};

if let Some(warehouse) = value.get("warehouse") {
config.warehouse = Some(warehouse.clone());
}

if let Some(prefix) = value.get("prefix") {
config.prefix = Some(prefix.clone());
}

Ok(config)
}
}

impl From<&RestTableIdentifier> for TableIdentifier {
fn from(value: &RestTableIdentifier) -> Self {
Self {
namespace: Namespace {
levels: value.namespace.clone(),
},
name: value.name.clone(),
}
}
}

impl Namespace {
/// Returns url encoded format.
pub fn encode_in_url(&self) -> Result<String> {
if self.levels.is_empty() {
return Err(Error::new(
ErrorKind::IcebergDataInvalid,
"Can't encode empty namespace in url!",
));
}

Ok(self.levels.join("\u{1F}").to_string())
}
}

#[cfg(test)]
mod tests {
use crate::table::Namespace;

#[test]
fn test_namespace_encode() {
let ns = Namespace {
levels: vec!["a".to_string(), "b".to_string()],
};

assert_eq!("a\u{1F}b", ns.encode_in_url().unwrap());
}
}
16 changes: 16 additions & 0 deletions icelake/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,22 @@ impl From<std::num::ParseIntError> for Error {
}
}

impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Self::new(ErrorKind::Unexpected, "Failed to create client").set_source(value)
}
}

impl<T: std::fmt::Debug + Send + Sync + 'static> From<iceberg_rest_api::apis::Error<T>> for Error {
fn from(value: iceberg_rest_api::apis::Error<T>) -> Self {
Self::new(
ErrorKind::Unexpected,
"Failed to request from catalog server",
)
.set_source(value)
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
2 changes: 1 addition & 1 deletion icelake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#![allow(dead_code)]

mod table;
pub use table::Table;
pub use table::*;
mod error;
pub use error::Error;
pub use error::ErrorKind;
Expand Down
42 changes: 39 additions & 3 deletions icelake/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use opendal::services::Fs;
use opendal::Operator;
use regex::Regex;
use url::Url;

use uuid::Uuid;

use crate::config::{TableConfig, TableConfigRef};
Expand All @@ -22,14 +23,49 @@ const VERSION_HINT_FILENAME: &str = "version-hint.text";
const VERSIONED_TABLE_METADATA_FILE_PATTERN: &str = r"v([0-9]+).metadata.json";

/// Namespace of tables
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Namespace {
levels: Vec<String>,
/// Levels in namespace.
pub levels: Vec<String>,
}

impl Namespace {
/// Creates namespace
pub fn new(levels: impl IntoIterator<Item = impl ToString>) -> Self {
Self {
levels: levels.into_iter().map(|s| s.to_string()).collect(),
}
}
}

/// Full qualified name of table.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct TableIdentifier {
namespace: Namespace,
name: String,
/// Namespace
pub namespace: Namespace,
/// Table name
pub name: String,
}

impl TableIdentifier {
/// Creates a full qualified table identifier from a list of names.
pub fn new(names: impl IntoIterator<Item = impl ToString>) -> Result<Self> {
let mut names: Vec<String> = names.into_iter().map(|s| s.to_string()).collect();
if names.is_empty() {
return Err(Error::new(
ErrorKind::IcebergDataInvalid,
"Table identifier can't be empty!",
));
}

let table_name = names.pop().unwrap();
let ns = Namespace { levels: names };

Ok(Self {
namespace: ns,
name: table_name,
})
}
}

/// Table is the main entry point for the IceLake.
Expand Down
2 changes: 1 addition & 1 deletion icelake/tests/insert_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::utils::{mc_image, minio_image, set_up, spark_image, Poetry, TestFixture};
use testcontainers::clients::Cli;

mod utils;
pub use utils::*;

fn create_test_fixture<'a>(cli: &'a Cli, toml_file: &str) -> TestFixture<'a> {
set_up();
Expand Down
Loading