Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sql catalog support update table #862

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 308 additions & 15 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow};
use sqlx::{Any, AnyPool, Row, Transaction};
Expand Down Expand Up @@ -702,12 +701,7 @@ impl Catalog for SqlCatalog {
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
Uuid::new_v4()
);

let tbl_metadata_location = metadata_path(&location, Uuid::new_v4());
let file = self.fileio.new_output(&tbl_metadata_location)?;
file.write(serde_json::to_vec(&tbl_metadata)?.into())
.await?;
Expand Down Expand Up @@ -769,23 +763,94 @@ impl Catalog for SqlCatalog {
Ok(())
}

async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"Updating a table is not supported yet",
))
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let identifier = commit.identifier().clone();
if !self.table_exists(&identifier).await? {
return no_such_table_err(&identifier);
}

let requirements = commit.take_requirements();
let table_updates = commit.take_updates();

let table = self.load_table(&identifier).await?;
let mut update_table_metadata_builder =
TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None);

for table_update in table_updates {
update_table_metadata_builder = table_update.apply(update_table_metadata_builder)?;
}

for table_requirement in requirements {
table_requirement.check(Some(table.metadata()))?;
}

let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4());
let file = self.fileio.new_output(&new_table_meta_location)?;
let update_table_metadata = update_table_metadata_builder.build()?;
file.write(serde_json::to_vec(&update_table_metadata.metadata)?.into())
.await?;

let update = format!(
"UPDATE {CATALOG_TABLE_NAME}
SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
AND {CATALOG_FIELD_TABLE_NAME} = ?
AND (
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
)"
);

let namespace_name = identifier.namespace().join(".");
let args: Vec<Option<&str>> = vec![
Some(&new_table_meta_location),
Some(&self.name),
Some(&namespace_name),
Some(identifier.name()),
];

let update_result = self.execute(&update, args, None).await?;
if update_result.rows_affected() != 1 {
return Err(Error::new(
iceberg::ErrorKind::Unexpected,
format!(
"Failed to update Table {:?} from Catalog {:?}",
identifier, &self.name,
),
));
}

Ok(Table::builder()
.file_io(self.fileio.clone())
.identifier(identifier)
.metadata_location(new_table_meta_location)
.metadata(update_table_metadata.metadata)
.build()?)
}
}

/// Generate the metadata path for a table
#[inline]
pub fn metadata_path(meta_data_location: &str, uuid: Uuid) -> String {
format!("{}/metadata/0-{}.metadata.json", meta_data_location, uuid)
}

#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use std::hash::Hash;

use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::spec::{
NestedField, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortOrder, Type, MAIN_BRANCH,
};
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
use iceberg::{
Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
TableRequirement, TableUpdate,
};
use itertools::Itertools;
use regex::Regex;
use sqlx::migrate::MigrateDatabase;
Expand Down Expand Up @@ -1770,4 +1835,232 @@ mod tests {
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
);
}

#[tokio::test]
async fn test_update_table_throws_error_if_table_not_exist() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc.clone()).await;
let namespace_ident = NamespaceIdent::new("a".into());
let table_name = "tbl1";
let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
create_namespace(&catalog, &namespace_ident).await;
let table_commit = TableCommit::builder()
.ident(table_ident.clone())
.updates(vec![])
.requirements(vec![])
.build();
let err = catalog
.update_table(table_commit)
.await
.unwrap_err()
.to_string();
assert_eq!(
err,
"Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
);
}

#[tokio::test]
async fn test_update_table_add_snapshot() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc.clone()).await;
let namespace_ident = NamespaceIdent::new("a".into());
create_namespace(&catalog, &namespace_ident).await;

let table_name = "abc";
let location = warehouse_loc.clone();
let table_creation = TableCreation::builder()
.name(table_name.into())
.location(location.clone())
.schema(simple_table_schema())
.build();

let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());

assert_table_eq(
&catalog
.create_table(&namespace_ident, table_creation)
.await
.unwrap(),
&expected_table_ident,
&simple_table_schema(),
);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());

let table_snapshots_iter = table.metadata().snapshots();
assert_eq!(0, table_snapshots_iter.count());

// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let table_update = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
};
let requirements = vec![];
let table_commit = TableCommit::builder()
.ident(expected_table_ident.clone())
.updates(vec![table_update])
.requirements(requirements)
.build();
let table = catalog.update_table(table_commit).await.unwrap();
let snapshot_vec = table.metadata().snapshots().collect_vec();
assert_eq!(1, snapshot_vec.len());
let snapshot = &snapshot_vec[0];
assert_eq!(snapshot.snapshot_id(), 3051729675574597004);
assert_eq!(snapshot.timestamp_ms(), 9992191116217);
assert_eq!(snapshot.sequence_number(), 10);
assert_eq!(snapshot.schema_id().unwrap(), 0);
assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro");
assert_eq!(snapshot.summary().operation, Operation::Append);
assert_eq!(snapshot.summary().additional_properties, HashMap::new());

// Add another snapshot
// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597005,
"sequence-number": 11,
"timestamp-ms": 9992191117217,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s2.avro",
"schema-id": 0
}
"#;
let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let table_update = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
};
let requirement = TableRequirement::RefSnapshotIdMatch {
r#ref: "main".to_string(),
snapshot_id: Some(3051729675574597004),
};
let requirements = vec![requirement];
let table_commit = TableCommit::builder()
.ident(expected_table_ident.clone())
.updates(vec![table_update])
.requirements(requirements)
.build();
assert!(catalog.update_table(table_commit).await.is_err());
}

#[tokio::test]
async fn test_update_table_set_snapshot_ref() {
let warehouse_loc = temp_path();
let catalog = new_sql_catalog(warehouse_loc.clone()).await;
let namespace_ident = NamespaceIdent::new("a".into());
create_namespace(&catalog, &namespace_ident).await;

let table_name = "abc";
let location = warehouse_loc.clone();
let table_creation = TableCreation::builder()
.name(table_name.into())
.location(location.clone())
.schema(simple_table_schema())
.build();

let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());

assert_table_eq(
&catalog
.create_table(&namespace_ident, table_creation)
.await
.unwrap(),
&expected_table_ident,
&simple_table_schema(),
);

let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());

let table_snapshots_iter = table.metadata().snapshots();
assert_eq!(0, table_snapshots_iter.count());
let table = catalog.load_table(&expected_table_ident).await.unwrap();
assert_table_eq(&table, &expected_table_ident, &simple_table_schema());

let table_snapshots_iter = table.metadata().snapshots();
assert_eq!(0, table_snapshots_iter.count());

// Add snapshot
let record = r#"
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
"manifest-list": "s3://b/wh/.../s1.avro",
"schema-id": 0
}
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let table_update = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
};
let requirements = vec![];
let table_commit = TableCommit::builder()
.ident(expected_table_ident.clone())
.updates(vec![table_update])
.requirements(requirements)
.build();
let table = catalog.update_table(table_commit).await.unwrap();
let snapshot_vec = table.metadata().snapshots().collect_vec();
assert_eq!(1, snapshot_vec.len());
let snapshot = &snapshot_vec[0];
assert_eq!(snapshot.snapshot_id(), 3051729675574597004);
assert_eq!(snapshot.timestamp_ms(), 9992191116217);
assert_eq!(snapshot.sequence_number(), 10);
assert_eq!(snapshot.schema_id().unwrap(), 0);
assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro");
assert_eq!(snapshot.summary().operation, Operation::Append);
assert_eq!(snapshot.summary().additional_properties, HashMap::new());

let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef {
ref_name: MAIN_BRANCH.to_string(),
reference: SnapshotReference {
snapshot_id: snapshot.snapshot_id(),
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(10),
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
},
};

let table_commit = TableCommit::builder()
.ident(expected_table_ident.clone())
.updates(vec![table_update_set_snapshot_ref])
.requirements(vec![])
.build();
let table = catalog.update_table(table_commit).await.unwrap();
let snapshot_refs_map = table.metadata().snapshot_refs();
assert_eq!(1, snapshot_refs_map.len());
let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap();
let expected_snapshot_ref = SnapshotReference {
snapshot_id: 3051729675574597004,
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(10),
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
};
assert_eq!(snapshot_ref, &expected_snapshot_ref);
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ pub struct TableCreation {

/// TableCommit represents the commit of a table in the catalog.
#[derive(Debug, TypedBuilder)]
#[builder(build_method(vis = "pub(crate)"))]
#[builder(build_method(vis = "pub"))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we make TableCommit crate only is that we don't want to allow user to build it manually, all table commits construction should go through transaction api.

pub struct TableCommit {
/// The table ident.
ident: TableIdent,
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,12 @@ impl TableMetadata {

Ok(())
}

/// Returns snapshot references.
#[inline]
pub fn snapshot_refs(&self) -> &HashMap<String, SnapshotReference> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we add this method? We already have lookup method for snapshot

&self.refs
}
}

pub(super) mod _serde {
Expand Down
Loading