Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

feat: Introduce load table api #181

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ edition = "2021"
license = "Apache-2.0"

[workspace]
members = ["icelake", "rest_api"]
members = ["icelake"]


[workspace.dependencies]
Expand Down Expand Up @@ -44,5 +44,5 @@ futures = { version = "0.3", features = ["executor"] }
testcontainers = { git = "https://github.com/liurenjie1024/testcontainers-rs.git", rev = "24fd08c05aa72ca7542198056c8c592a1899fd39" }
iceberg-rest-api = { path = "./rest_api" }
murmur3 = "0.5.2"
reqwest = "0.11"
reqwest = { version = "0.11", features = ["json"] }
urlencoding = "2"
1 change: 0 additions & 1 deletion icelake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ bitvec = "1.0.1"
serde_bytes = "0.11.12"
toml = "0.7.6"
csv = "1.2.2"
iceberg-rest-api = { workspace = true }
murmur3 = { workspace = true }
reqwest = { workspace = true }
urlencoding = { workspace = true }
Expand Down
247 changes: 187 additions & 60 deletions icelake/src/catalog/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,38 @@
use std::collections::HashMap;

use async_trait::async_trait;
use iceberg_rest_api::{
apis::{
catalog_api_api::list_tables, configuration::Configuration,
configuration_api_api::get_config,
},
models::TableIdentifier as RestTableIdentifier,
};
use reqwest::ClientBuilder;
use reqwest::{Client, ClientBuilder, Request, StatusCode};
use serde::de::DeserializeOwned;
use urlencoding::encode;

use crate::{
catalog::rest::_models::CatalogConfig,
table::{Namespace, TableIdentifier},
types::{PartitionSpec, Schema},
types::{PartitionSpec, Schema, TableMetadata},
Error, ErrorKind, Table,
};

use self::_models::{ListTablesResponse, LoadTableResult};

use super::{Catalog, UpdateTable};
use crate::error::Result;

const PATH_V1: &str = "v1";

/// Configuration for rest catalog.
#[derive(Debug, Default)]
pub struct RestCatalogConfig {
uri: String,
prefix: Option<String>,
warehouse: Option<String>,
}

/// Rest catalog implementation
pub struct RestCatalog {
name: String,
config: RestCatalogConfig,
endpoints: Endpoint,
// rest client config
rest_client: Configuration,
rest_client: Client,
}

#[async_trait]
Expand All @@ -47,16 +47,18 @@ impl Catalog for RestCatalog {

/// List tables under namespace.
async fn list_tables(&self, ns: &Namespace) -> Result<Vec<TableIdentifier>> {
let resp = list_tables(
&self.rest_client,
self.config.prefix.as_deref(),
&ns.encode_in_url()?,
)
.await?;
Ok(resp
let request = self.rest_client.get(self.endpoints.tables(ns)?).build()?;
Ok(self
.execute_request::<ListTablesResponse>(request, |status| match status {
StatusCode::NOT_FOUND => Some(Error::new(
ErrorKind::IcebergDataInvalid,
format!("Namespace {ns} not found!"),
)),
_ => None,
})
.await?
.identifiers
.unwrap_or_default()
.iter()
.into_iter()
.map(TableIdentifier::from)
.collect())
}
Expand Down Expand Up @@ -101,11 +103,33 @@ impl Catalog for RestCatalog {
}

/// Load table.
async fn load_table(&self, _table_name: &TableIdentifier) -> Result<Table> {
Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Load table in rest client is not implemented yet!",
))
async fn load_table(&self, table_name: &TableIdentifier) -> Result<Table> {
let resp = self
.execute_request::<LoadTableResult>(
self.rest_client
.get(self.endpoints.table(table_name)?)
.build()?,
|status| match status {
StatusCode::NOT_FOUND => Some(Error::new(
ErrorKind::IcebergDataInvalid,
format!("Talbe {table_name} not found!"),
)),
_ => None,
},
)
.await?;

let metadata_location = resp.metadata_location.ok_or_else(|| {
Error::new(
ErrorKind::IcebergFeatureUnsupported,
"Loading uncommitted table is not supported!",
)
})?;

log::info!("Table metadata location of {table_name} is {metadata_location}");

let table_metadata = TableMetadata::try_from(resp.metadata)?;
Ok(Table::read_only_table(table_metadata, &metadata_location))
}

/// Invalidate table.
Expand Down Expand Up @@ -141,44 +165,78 @@ impl RestCatalog {
/// Creates rest catalog.
pub async fn new(name: impl AsRef<str>, config: HashMap<String, String>) -> Result<Self> {
let catalog_config = RestCatalog::init_config_from_server(config).await?;
let endpoints = Endpoint::new(catalog_config.uri.clone());
let rest_client = RestCatalog::create_rest_client(&catalog_config)?;

Ok(Self {
name: name.as_ref().to_string(),
config: catalog_config,
rest_client,
endpoints,
})
}

fn create_rest_client(config: &RestCatalogConfig) -> Result<Configuration> {
let mut client = Configuration {
base_path: config.uri.to_string(),
..Default::default()
};

let req = { ClientBuilder::new().build()? };

client.client = req;
async fn execute_request<T: DeserializeOwned>(
&self,
request: Request,
error_handler: impl FnOnce(StatusCode) -> Option<Error>,
) -> Result<T> {
log::debug!("Executing request: {request:?}");

let resp = self.rest_client.execute(request).await?;

match resp.status() {
StatusCode::OK => {
let text = resp.text().await?;
log::debug!("Response text is: {text}");
Ok(serde_json::from_slice::<T>(text.as_bytes())?)
}
other => {
if let Some(error) = error_handler(other) {
Err(error)
} else {
let text = resp.text().await?;
Err(Error::new(
ErrorKind::Unexpected,
format!(
"Faile to execute http request, status code: {other}, message: {text}"
),
))
}
}
}
}

Ok(client)
fn create_rest_client(_config: &RestCatalogConfig) -> Result<Client> {
Ok(ClientBuilder::new().build()?)
}

async fn init_config_from_server(config: HashMap<String, String>) -> Result<RestCatalogConfig> {
log::info!("Creating rest catalog with user config: {config:?}");
let rest_catalog_config = RestCatalogConfig::try_from(&config)?;
let endpoint = Endpoint::new(rest_catalog_config.uri.clone());
let rest_client = RestCatalog::create_rest_client(&rest_catalog_config)?;

let mut server_config =
get_config(&rest_client, rest_catalog_config.warehouse.as_deref()).await?;

log::info!("Catalog config from rest catalog server: {server_config:?}");
server_config.defaults.extend(config);
server_config.defaults.extend(server_config.overrides);

let ret = RestCatalogConfig::try_from(&server_config.defaults)?;

log::info!("Result rest catalog config after merging with catalog server config: {ret:?}");
Ok(ret)
let resp = rest_client
.execute(rest_client.get(endpoint.config()).build()?)
.await?;

match resp.status() {
StatusCode::OK => {
let mut server_config = resp.json::<CatalogConfig>().await?;
log::info!("Catalog config from rest catalog server: {server_config:?}");
server_config.defaults.extend(config);
server_config.defaults.extend(server_config.overrides);

let ret = RestCatalogConfig::try_from(&server_config.defaults)?;

log::info!(
"Result rest catalog config after merging with catalog server config: {ret:?}"
);
Ok(ret)
}
_ => Err(Error::new(ErrorKind::Unexpected, resp.text().await?)),
}
}
}

Expand All @@ -203,22 +261,46 @@ impl TryFrom<&HashMap<String, String>> for RestCatalogConfig {
config.warehouse = Some(warehouse.clone());
}

if let Some(prefix) = value.get("prefix") {
config.prefix = Some(prefix.clone());
}

Ok(config)
}
}

impl From<&RestTableIdentifier> for TableIdentifier {
fn from(value: &RestTableIdentifier) -> Self {
Self {
namespace: Namespace {
levels: value.namespace.clone(),
},
name: value.name.clone(),
}
// TODO: Support prefix
struct Endpoint {
base: String,
}

impl Endpoint {
fn new(base: String) -> Self {
Self { base }
}
fn config(&self) -> String {
[&self.base, PATH_V1, "config"].join("/")
}

fn tables(&self, ns: &Namespace) -> Result<String> {
Ok([
&self.base,
PATH_V1,
"namespaces",
&ns.encode_in_url()?,
"tables",
]
.join("/")
.to_string())
}

fn table(&self, table: &TableIdentifier) -> Result<String> {
Ok([
&self.base,
PATH_V1,
"namespaces",
&table.namespace.encode_in_url()?,
"tables",
encode(&table.name).as_ref(),
]
.join("/")
.to_string())
}
}

Expand All @@ -232,7 +314,52 @@ impl Namespace {
));
}

Ok(self.levels.join("\u{1F}").to_string())
Ok(encode(&self.levels.join("\u{1F}")).to_string())
}
}

mod _models {
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use crate::{table, types::TableMetadataSerDe};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct TableIdentifier {
pub(super) namespace: Vec<String>,
pub(super) name: String,
}

impl From<TableIdentifier> for table::TableIdentifier {
fn from(value: TableIdentifier) -> Self {
Self {
namespace: table::Namespace::new(value.namespace),
name: value.name,
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct ListTablesResponse {
pub(super) identifiers: Vec<TableIdentifier>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(super) struct CatalogConfig {
pub(super) overrides: HashMap<String, String>,
pub(super) defaults: HashMap<String, String>,
}

#[derive(Serialize, Deserialize)]
pub(super) struct LoadTableResult {
/// May be null if the table is staged as part of a transaction
#[serde(rename = "metadata-location", skip_serializing_if = "Option::is_none")]
pub(super) metadata_location: Option<String>,
#[serde(rename = "metadata")]
pub(super) metadata: TableMetadataSerDe,
#[serde(rename = "config", skip_serializing_if = "Option::is_none")]
pub(super) config: Option<::std::collections::HashMap<String, String>>,
}
}

Expand All @@ -246,6 +373,6 @@ mod tests {
levels: vec!["a".to_string(), "b".to_string()],
};

assert_eq!("a\u{1F}b", ns.encode_in_url().unwrap());
assert_eq!("a%1Fb", ns.encode_in_url().unwrap());
}
}
10 changes: 0 additions & 10 deletions icelake/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,6 @@ impl From<reqwest::Error> for Error {
}
}

impl<T: std::fmt::Debug + Send + Sync + 'static> From<iceberg_rest_api::apis::Error<T>> for Error {
fn from(value: iceberg_rest_api::apis::Error<T>) -> Self {
Self::new(
ErrorKind::Unexpected,
"Failed to request from catalog server",
)
.set_source(value)
}
}

#[cfg(test)]
mod tests {
use anyhow::anyhow;
Expand Down
Loading