diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 39553f7554..ddbf6a4e01 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -41,9 +41,9 @@ use crate::client::{ HttpClient, deserialize_catalog_response, deserialize_unexpected_catalog_error, }; use crate::types::{ - CatalogConfig, CommitTableRequest, CommitTableResponse, CreateTableRequest, - ListNamespaceResponse, ListTableResponse, LoadTableResponse, NamespaceSerde, - RegisterTableRequest, RenameTableRequest, + CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, + CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult, + NamespaceResponse, RegisterTableRequest, RenameTableRequest, }; /// REST catalog URI @@ -466,13 +466,7 @@ impl Catalog for RestCatalog { deserialize_catalog_response::(http_response) .await?; - let ns_identifiers = response - .namespaces - .into_iter() - .map(NamespaceIdent::from_vec) - .collect::>>()?; - - namespaces.extend(ns_identifiers); + namespaces.extend(response.namespaces); match response.next_page_token { Some(token) => next_token = Some(token), @@ -502,9 +496,9 @@ impl Catalog for RestCatalog { let request = context .client .request(Method::POST, context.config.namespaces_endpoint()) - .json(&NamespaceSerde { - namespace: namespace.as_ref().clone(), - properties: Some(properties), + .json(&CreateNamespaceRequest { + namespace: namespace.clone(), + properties, }) .build()?; @@ -513,8 +507,8 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; - Namespace::try_from(response) + deserialize_catalog_response::(http_response).await?; + Ok(Namespace::from(response)) } StatusCode::CONFLICT => Err(Error::new( ErrorKind::Unexpected, @@ -537,8 +531,8 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; - Namespace::try_from(response) + deserialize_catalog_response::(http_response).await?; + Ok(Namespace::from(response)) } StatusCode::NOT_FOUND => Err(Error::new( ErrorKind::Unexpected, @@ -614,7 +608,7 @@ impl Catalog for RestCatalog { match http_response.status() { StatusCode::OK => { let response = - deserialize_catalog_response::(http_response).await?; + deserialize_catalog_response::(http_response).await?; identifiers.extend(response.identifiers); @@ -661,11 +655,7 @@ impl Catalog for RestCatalog { partition_spec: creation.partition_spec, write_order: creation.sort_order, stage_create: Some(false), - properties: if creation.properties.is_empty() { - None - } else { - Some(creation.properties) - }, + properties: creation.properties, }) .build()?; @@ -673,7 +663,7 @@ impl Catalog for RestCatalog { let response = match http_response.status() { StatusCode::OK => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -697,7 +687,6 @@ impl Catalog for RestCatalog { let config = response .config - .unwrap_or_default() .into_iter() .chain(self.user_config.props.clone()) .collect(); @@ -735,7 +724,7 @@ impl Catalog for RestCatalog { let response = match http_response.status() { StatusCode::OK | StatusCode::NOT_MODIFIED => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -748,7 +737,6 @@ impl Catalog for RestCatalog { let config = response .config - .unwrap_or_default() .into_iter() .chain(self.user_config.props.clone()) .collect(); @@ -861,9 +849,9 @@ impl Catalog for RestCatalog { let http_response = context.client.query_catalog(request).await?; - let response: LoadTableResponse = match http_response.status() { + let response: LoadTableResult = match http_response.status() { StatusCode::OK => { - deserialize_catalog_response::(http_response).await? + deserialize_catalog_response::(http_response).await? } StatusCode::NOT_FOUND => { return Err(Error::new( @@ -905,7 +893,7 @@ impl Catalog for RestCatalog { context.config.table_endpoint(commit.identifier()), ) .json(&CommitTableRequest { - identifier: commit.identifier().clone(), + identifier: Some(commit.identifier().clone()), requirements: commit.take_requirements(), updates: commit.take_updates(), }) @@ -2428,7 +2416,7 @@ mod tests { )) .unwrap(); let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap(); Table::builder() .metadata(resp.metadata) @@ -2568,7 +2556,7 @@ mod tests { )) .unwrap(); let reader = BufReader::new(file); - let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + let resp = serde_json::from_reader::<_, LoadTableResult>(reader).unwrap(); Table::builder() .metadata(resp.metadata) diff --git a/crates/catalog/rest/src/lib.rs b/crates/catalog/rest/src/lib.rs index 70cdeaabd0..6bee950970 100644 --- a/crates/catalog/rest/src/lib.rs +++ b/crates/catalog/rest/src/lib.rs @@ -56,3 +56,4 @@ mod client; mod types; pub use catalog::*; +pub use types::*; diff --git a/crates/catalog/rest/src/types.rs b/crates/catalog/rest/src/types.rs index 70ed72051a..ab44c40ee3 100644 --- a/crates/catalog/rest/src/types.rs +++ b/crates/catalog/rest/src/types.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Request and response types for the Iceberg REST API. + use std::collections::HashMap; use iceberg::spec::{Schema, SortOrder, TableMetadata, UnboundPartitionSpec}; @@ -30,7 +32,8 @@ pub(super) struct CatalogConfig { } #[derive(Debug, Serialize, Deserialize)] -pub(super) struct ErrorResponse { +/// Wrapper for all non-2xx error responses from the REST API +pub struct ErrorResponse { error: ErrorModel, } @@ -41,11 +44,16 @@ impl From for Error { } #[derive(Debug, Serialize, Deserialize)] -pub(super) struct ErrorModel { - pub(super) message: String, - pub(super) r#type: String, - pub(super) code: u16, - pub(super) stack: Option>, +/// Error payload returned in a response with further details on the error +pub struct ErrorModel { + /// Human-readable error message + pub message: String, + /// Internal type definition of the error + pub r#type: String, + /// HTTP response code + pub code: u16, + /// Optional error stack / context + pub stack: Option>, } impl From for Error { @@ -96,106 +104,255 @@ pub(super) struct TokenResponse { pub(super) issued_token_type: Option, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct NamespaceSerde { - pub(super) namespace: Vec, - pub(super) properties: Option>, -} - -impl TryFrom for Namespace { - type Error = Error; - fn try_from(value: NamespaceSerde) -> std::result::Result { - Ok(Namespace::with_properties( - NamespaceIdent::from_vec(value.namespace)?, - value.properties.unwrap_or_default(), - )) - } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Namespace response +pub struct NamespaceResponse { + /// Namespace identifier + pub namespace: NamespaceIdent, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + /// Properties stored on the namespace, if supported by the server. + pub properties: HashMap, } -impl From<&Namespace> for NamespaceSerde { +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Create namespace request +pub struct CreateNamespaceRequest { + /// Name of the namespace to create + pub namespace: NamespaceIdent, + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + /// Properties to set on the namespace + pub properties: HashMap, +} + +impl From<&Namespace> for NamespaceResponse { fn from(value: &Namespace) -> Self { Self { - namespace: value.name().as_ref().clone(), - properties: Some(value.properties().clone()), + namespace: value.name().clone(), + properties: value.properties().clone(), } } } -#[derive(Debug, Serialize, Deserialize)] +impl From for Namespace { + fn from(value: NamespaceResponse) -> Self { + Namespace::with_properties(value.namespace, value.properties) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct ListNamespaceResponse { - pub(super) namespaces: Vec>, - #[serde(default)] - pub(super) next_page_token: Option, +/// Response containing a list of namespace identifiers, with optional pagination support. +pub struct ListNamespaceResponse { + /// List of namespace identifiers returned by the server + pub namespaces: Vec, + /// Opaque token for pagination. If present, indicates there are more results available. + /// Use this value in subsequent requests to retrieve the next page. + pub next_page_token: Option, } -#[allow(dead_code)] -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct UpdateNamespacePropsRequest { - removals: Option>, - updates: Option>, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Request to update properties on a namespace. +/// +/// Properties that are not in the request are not modified or removed by this call. +/// Server implementations are not required to support namespace properties. +pub struct UpdateNamespacePropertiesRequest { + /// List of property keys to remove from the namespace + pub removals: Option>, + /// Map of property keys to values to set or update on the namespace + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub updates: HashMap, } -#[allow(dead_code)] -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct UpdateNamespacePropsResponse { - updated: Vec, - removed: Vec, - missing: Option>, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Response from updating namespace properties, indicating which properties were changed. +pub struct UpdateNamespacePropertiesResponse { + /// List of property keys that were added or updated + pub updated: Vec, + /// List of properties that were removed + pub removed: Vec, + /// List of properties requested for removal that were not found in the namespace's properties. + /// Represents a partial success response. Servers do not need to implement this. + #[serde(skip_serializing_if = "Option::is_none")] + pub missing: Option>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct ListTableResponse { - pub(super) identifiers: Vec, +/// Response containing a list of table identifiers, with optional pagination support. +pub struct ListTablesResponse { + /// List of table identifiers under the requested namespace + pub identifiers: Vec, + /// Opaque token for pagination. If present, indicates there are more results available. + /// Use this value in subsequent requests to retrieve the next page. #[serde(default)] - pub(super) next_page_token: Option, + pub next_page_token: Option, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct RenameTableRequest { - pub(super) source: TableIdent, - pub(super) destination: TableIdent, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Request to rename a table from one identifier to another. +/// +/// It's valid to move a table across namespaces, but the server implementation +/// is not required to support it. +pub struct RenameTableRequest { + /// Current table identifier to rename + pub source: TableIdent, + /// New table identifier to rename to + pub destination: TableIdent, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct LoadTableResponse { - pub(super) metadata_location: Option, - pub(super) metadata: TableMetadata, - pub(super) config: Option>, +/// Result returned when a table is successfully loaded or created. +/// +/// The table metadata JSON is returned in the `metadata` field. The corresponding file location +/// of table metadata should be returned in the `metadata_location` field, unless the metadata +/// is not yet committed. For example, a create transaction may return metadata that is staged +/// but not committed. +/// +/// The `config` map returns table-specific configuration for the table's resources, including +/// its HTTP client and FileIO. For example, config may contain a specific FileIO implementation +/// class for the table depending on its underlying storage. +pub struct LoadTableResult { + /// May be null if the table is staged as part of a transaction + pub metadata_location: Option, + /// The table's full metadata + pub metadata: TableMetadata, + /// Table-specific configuration overriding catalog configuration + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub config: HashMap, + /// Storage credentials for accessing table data. Clients should check this field + /// before falling back to credentials in the `config` field. + #[serde(skip_serializing_if = "Option::is_none")] + pub storage_credentials: Option>, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +/// Storage credential for a specific location prefix. +/// +/// Indicates a storage location prefix where the credential is relevant. Clients should +/// choose the most specific prefix (by selecting the longest prefix) if several credentials +/// of the same type are available. +pub struct StorageCredential { + /// Storage location prefix where this credential is relevant + pub prefix: String, + /// Configuration map containing credential information + pub config: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct CreateTableRequest { - pub(super) name: String, - pub(super) location: Option, - pub(super) schema: Schema, - pub(super) partition_spec: Option, - pub(super) write_order: Option, - pub(super) stage_create: Option, - pub(super) properties: Option>, +/// Request to create a new table in a namespace. +/// +/// If `stage_create` is false, the table is created immediately. +/// If `stage_create` is true, the table is not created, but table metadata is initialized +/// and returned. The service should prepare as needed for a commit to the table commit +/// endpoint to complete the create transaction. +pub struct CreateTableRequest { + /// Name of the table to create + pub name: String, + /// Optional table location. If not provided, the server will choose a location. + pub location: Option, + /// Table schema + pub schema: Schema, + /// Optional partition specification. If not provided, the table will be unpartitioned. + pub partition_spec: Option, + /// Optional sort order for the table + pub write_order: Option, + /// Whether to stage the create for a transaction (true) or create immediately (false) + pub stage_create: Option, + /// Optional properties to set on the table + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub properties: HashMap, } -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct CommitTableRequest { - pub(super) identifier: TableIdent, - pub(super) requirements: Vec, - pub(super) updates: Vec, +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +/// Request to commit updates to a table. +/// +/// Commits have two parts: requirements and updates. Requirements are assertions that will +/// be validated before attempting to make and commit changes. Updates are changes to make +/// to table metadata. +/// +/// Create table transactions that are started by createTable with `stage-create` set to true +/// are committed using this request. Transactions should include all changes to the table, +/// including table initialization, like AddSchemaUpdate and SetCurrentSchemaUpdate. +pub struct CommitTableRequest { + /// Table identifier to update; must be present for CommitTransactionRequest + #[serde(skip_serializing_if = "Option::is_none")] + pub identifier: Option, + /// List of requirements that must be satisfied before committing changes + pub requirements: Vec, + /// List of updates to apply to the table metadata + pub updates: Vec, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct CommitTableResponse { - pub(super) metadata_location: String, - pub(super) metadata: TableMetadata, +/// Response returned when a table is successfully updated. +/// +/// The table metadata JSON is returned in the metadata field. The corresponding file location +/// of table metadata must be returned in the metadata-location field. Clients can check whether +/// metadata has changed by comparing metadata locations. +pub struct CommitTableResponse { + /// Location of the updated table metadata file + pub metadata_location: String, + /// The table's updated metadata + pub metadata: TableMetadata, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] -pub(super) struct RegisterTableRequest { - pub(super) name: String, - pub(super) metadata_location: String, - pub(super) overwrite: Option, +/// Request to register a table using an existing metadata file location. +pub struct RegisterTableRequest { + /// Name of the table to register + pub name: String, + /// Location of the metadata file for the table + pub metadata_location: String, + /// Whether to overwrite table metadata if the table already exists + pub overwrite: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_namespace_response_serde() { + let json = serde_json::json!({ + "namespace": ["nested", "ns"], + "properties": { + "key1": "value1", + "key2": "value2" + } + }); + let ns_response: NamespaceResponse = + serde_json::from_value(json.clone()).expect("Deserialization failed"); + assert_eq!(ns_response, NamespaceResponse { + namespace: NamespaceIdent::from_vec(vec!["nested".to_string(), "ns".to_string()]) + .unwrap(), + properties: HashMap::from([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]), + }); + assert_eq!( + serde_json::to_value(&ns_response).expect("Serialization failed"), + json + ); + + // Without properties + let json_no_props = serde_json::json!({ + "namespace": ["db", "schema"] + }); + let ns_response_no_props: NamespaceResponse = + serde_json::from_value(json_no_props.clone()).expect("Deserialization failed"); + assert_eq!(ns_response_no_props, NamespaceResponse { + namespace: NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]) + .unwrap(), + properties: HashMap::new(), + }); + assert_eq!( + serde_json::to_value(&ns_response_no_props).expect("Serialization failed"), + json_no_props + ); + } }