From f915486e6dde333f78dc360335ac58be9dc49616 Mon Sep 17 00:00:00 2001
From: Li0k <yuli@singularity-data.com>
Date: Tue, 31 Dec 2024 15:08:29 +0800
Subject: [PATCH 1/3] feat(catalog): sql catalog support update table

---
 crates/catalog/sql/src/catalog.rs         | 311 +++++++++++++++++++++-
 crates/iceberg/src/catalog/mod.rs         |   2 +-
 crates/iceberg/src/spec/table_metadata.rs |   6 +
 3 files changed, 305 insertions(+), 14 deletions(-)

diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index b6bff7896..c3ff95dd5 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -702,12 +702,7 @@ impl Catalog for SqlCatalog {
         let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
             .build()?
             .metadata;
-        let tbl_metadata_location = format!(
-            "{}/metadata/0-{}.metadata.json",
-            location.clone(),
-            Uuid::new_v4()
-        );
-
+        let tbl_metadata_location = metadata_path(&location, Uuid::new_v4());
         let file = self.fileio.new_output(&tbl_metadata_location)?;
         file.write(serde_json::to_vec(&tbl_metadata)?.into())
             .await?;
@@ -769,23 +764,85 @@ impl Catalog for SqlCatalog {
         Ok(())
     }
 
-    async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
-        Err(Error::new(
-            ErrorKind::FeatureUnsupported,
-            "Updating a table is not supported yet",
-        ))
+    async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
+        let identifier = commit.identifier().clone();
+        if !self.table_exists(&identifier).await? {
+            return no_such_table_err(&identifier);
+        }
+
+        let requirements = commit.take_requirements();
+        let table_updates = commit.take_updates();
+
+        let table = self.load_table(&identifier).await?;
+        let mut update_table_metadata_builder =
+            TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None);
+
+        for table_update in table_updates {
+            update_table_metadata_builder = table_update.apply(update_table_metadata_builder)?;
+        }
+
+        for table_requirement in requirements {
+            table_requirement.check(Some(table.metadata()))?;
+        }
+
+        let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4());
+        let file = self.fileio.new_output(&new_table_meta_location)?;
+        let update_table_metadata = update_table_metadata_builder.build()?;
+        file.write(serde_json::to_vec(&update_table_metadata.metadata)?.into())
+            .await?;
+
+        let update = format!(
+            "UPDATE {CATALOG_TABLE_NAME} 
+             SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ? 
+             WHERE {CATALOG_FIELD_CATALOG_NAME} = ? 
+             AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? 
+             AND {CATALOG_FIELD_TABLE_NAME} = ? 
+             AND (
+                    {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' 
+                    OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+                  )"
+        );
+
+        let namespace_name = identifier.namespace().join(".");
+        let args: Vec<Option<&str>> = vec![
+            Some(&new_table_meta_location),
+            Some(&self.name),
+            Some(&namespace_name),
+            Some(identifier.name()),
+        ];
+
+        self.execute(&update, args, None).await?;
+
+        Ok(Table::builder()
+            .file_io(self.fileio.clone())
+            .identifier(identifier)
+            .metadata_location(new_table_meta_location)
+            .metadata(update_table_metadata.metadata)
+            .build()?)
     }
 }
 
+/// Generate the metadata path for a table
+#[inline]
+pub fn metadata_path(meta_data_location: &str, uuid: Uuid) -> String {
+    format!("{}/metadata/0-{}.metadata.json", meta_data_location, uuid)
+}
+
 #[cfg(test)]
 mod tests {
     use std::collections::{HashMap, HashSet};
     use std::hash::Hash;
 
     use iceberg::io::FileIOBuilder;
-    use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
+    use iceberg::spec::{
+        NestedField, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
+        SnapshotRetention, SortOrder, Type, MAIN_BRANCH,
+    };
     use iceberg::table::Table;
-    use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent};
+    use iceberg::{
+        Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
+        TableRequirement, TableUpdate,
+    };
     use itertools::Itertools;
     use regex::Regex;
     use sqlx::migrate::MigrateDatabase;
@@ -1770,4 +1827,232 @@ mod tests {
             "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
         );
     }
+
+    #[tokio::test]
+    async fn test_update_table_throws_error_if_table_not_exist() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        let table_name = "tbl1";
+        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+        create_namespace(&catalog, &namespace_ident).await;
+        let table_commit = TableCommit::builder()
+            .ident(table_ident.clone())
+            .updates(vec![])
+            .requirements(vec![])
+            .build();
+        let err = catalog
+            .update_table(table_commit)
+            .await
+            .unwrap_err()
+            .to_string();
+        assert_eq!(
+            err,
+            "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_update_table_add_snapshot() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        let table_name = "abc";
+        let location = warehouse_loc.clone();
+        let table_creation = TableCreation::builder()
+            .name(table_name.into())
+            .location(location.clone())
+            .schema(simple_table_schema())
+            .build();
+
+        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+
+        assert_table_eq(
+            &catalog
+                .create_table(&namespace_ident, table_creation)
+                .await
+                .unwrap(),
+            &expected_table_ident,
+            &simple_table_schema(),
+        );
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+        let table_snapshots_iter = table.metadata().snapshots();
+        assert_eq!(0, table_snapshots_iter.count());
+
+        // Add snapshot
+        let record = r#"
+                {
+                    "snapshot-id": 3051729675574597004,
+                    "sequence-number": 10,
+                    "timestamp-ms": 9992191116217,
+                    "summary": {
+                        "operation": "append"
+                    },
+                    "manifest-list": "s3://b/wh/.../s1.avro",
+                    "schema-id": 0
+                }
+                "#;
+
+        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
+        let table_update = TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        };
+        let requirements = vec![];
+        let table_commit = TableCommit::builder()
+            .ident(expected_table_ident.clone())
+            .updates(vec![table_update])
+            .requirements(requirements)
+            .build();
+        let table = catalog.update_table(table_commit).await.unwrap();
+        let snapshot_vec = table.metadata().snapshots().collect_vec();
+        assert_eq!(1, snapshot_vec.len());
+        let snapshot = &snapshot_vec[0];
+        assert_eq!(snapshot.snapshot_id(), 3051729675574597004);
+        assert_eq!(snapshot.timestamp_ms(), 9992191116217);
+        assert_eq!(snapshot.sequence_number(), 10);
+        assert_eq!(snapshot.schema_id().unwrap(), 0);
+        assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro");
+        assert_eq!(snapshot.summary().operation, Operation::Append);
+        assert_eq!(snapshot.summary().additional_properties, HashMap::new());
+
+        // Add another snapshot
+        // Add snapshot
+        let record = r#"
+              {
+                  "snapshot-id": 3051729675574597005,
+                  "sequence-number": 11,
+                  "timestamp-ms": 9992191117217,
+                  "summary": {
+                      "operation": "append"
+                  },
+                  "manifest-list": "s3://b/wh/.../s2.avro",
+                  "schema-id": 0
+              }
+              "#;
+        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
+        let table_update = TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        };
+        let requirement = TableRequirement::RefSnapshotIdMatch {
+            r#ref: "main".to_string(),
+            snapshot_id: Some(3051729675574597004),
+        };
+        let requirements = vec![requirement];
+        let table_commit = TableCommit::builder()
+            .ident(expected_table_ident.clone())
+            .updates(vec![table_update])
+            .requirements(requirements)
+            .build();
+        assert!(catalog.update_table(table_commit).await.is_err());
+    }
+
+    #[tokio::test]
+    async fn test_update_table_set_snapshot_ref() {
+        let warehouse_loc = temp_path();
+        let catalog = new_sql_catalog(warehouse_loc.clone()).await;
+        let namespace_ident = NamespaceIdent::new("a".into());
+        create_namespace(&catalog, &namespace_ident).await;
+
+        let table_name = "abc";
+        let location = warehouse_loc.clone();
+        let table_creation = TableCreation::builder()
+            .name(table_name.into())
+            .location(location.clone())
+            .schema(simple_table_schema())
+            .build();
+
+        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
+
+        assert_table_eq(
+            &catalog
+                .create_table(&namespace_ident, table_creation)
+                .await
+                .unwrap(),
+            &expected_table_ident,
+            &simple_table_schema(),
+        );
+
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+        let table_snapshots_iter = table.metadata().snapshots();
+        assert_eq!(0, table_snapshots_iter.count());
+        let table = catalog.load_table(&expected_table_ident).await.unwrap();
+        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
+
+        let table_snapshots_iter = table.metadata().snapshots();
+        assert_eq!(0, table_snapshots_iter.count());
+
+        // Add snapshot
+        let record = r#"
+                {
+                    "snapshot-id": 3051729675574597004,
+                    "sequence-number": 10,
+                    "timestamp-ms": 9992191116217,
+                    "summary": {
+                        "operation": "append"
+                    },
+                    "manifest-list": "s3://b/wh/.../s1.avro",
+                    "schema-id": 0
+                }
+                "#;
+
+        let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
+        let table_update = TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        };
+        let requirements = vec![];
+        let table_commit = TableCommit::builder()
+            .ident(expected_table_ident.clone())
+            .updates(vec![table_update])
+            .requirements(requirements)
+            .build();
+        let table = catalog.update_table(table_commit).await.unwrap();
+        let snapshot_vec = table.metadata().snapshots().collect_vec();
+        assert_eq!(1, snapshot_vec.len());
+        let snapshot = &snapshot_vec[0];
+        assert_eq!(snapshot.snapshot_id(), 3051729675574597004);
+        assert_eq!(snapshot.timestamp_ms(), 9992191116217);
+        assert_eq!(snapshot.sequence_number(), 10);
+        assert_eq!(snapshot.schema_id().unwrap(), 0);
+        assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro");
+        assert_eq!(snapshot.summary().operation, Operation::Append);
+        assert_eq!(snapshot.summary().additional_properties, HashMap::new());
+
+        let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef {
+            ref_name: MAIN_BRANCH.to_string(),
+            reference: SnapshotReference {
+                snapshot_id: snapshot.snapshot_id(),
+                retention: SnapshotRetention::Branch {
+                    min_snapshots_to_keep: Some(10),
+                    max_snapshot_age_ms: None,
+                    max_ref_age_ms: None,
+                },
+            },
+        };
+
+        let table_commit = TableCommit::builder()
+            .ident(expected_table_ident.clone())
+            .updates(vec![table_update_set_snapshot_ref])
+            .requirements(vec![])
+            .build();
+        let table = catalog.update_table(table_commit).await.unwrap();
+        let snapshot_refs_map = table.metadata().snapshot_refs();
+        assert_eq!(1, snapshot_refs_map.len());
+        let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap();
+        let expected_snapshot_ref = SnapshotReference {
+            snapshot_id: 3051729675574597004,
+            retention: SnapshotRetention::Branch {
+                min_snapshots_to_keep: Some(10),
+                max_snapshot_age_ms: None,
+                max_ref_age_ms: None,
+            },
+        };
+        assert_eq!(snapshot_ref, &expected_snapshot_ref);
+    }
 }
diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs
index cbda6c905..c499b5368 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -255,7 +255,7 @@ pub struct TableCreation {
 
 /// TableCommit represents the commit of a table in the catalog.
 #[derive(Debug, TypedBuilder)]
-#[builder(build_method(vis = "pub(crate)"))]
+#[builder(build_method(vis = "pub"))]
 pub struct TableCommit {
     /// The table ident.
     ident: TableIdent,
diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs
index 38204fc15..8a8f33788 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -626,6 +626,12 @@ impl TableMetadata {
 
         Ok(())
     }
+
+    /// Returns snapshot references.
+    #[inline]
+    pub fn snapshot_refs(&self) -> &HashMap<String, SnapshotReference> {
+        &self.refs
+    }
 }
 
 pub(super) mod _serde {

From 8266322d72e5cf2b0b90a59186961b0068c56d6c Mon Sep 17 00:00:00 2001
From: Li0k <yuli@singularity-data.com>
Date: Tue, 31 Dec 2024 16:02:18 +0800
Subject: [PATCH 2/3] typo

---
 crates/catalog/sql/src/catalog.rs | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index c3ff95dd5..911d48eea 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -23,8 +23,7 @@ use iceberg::io::FileIO;
 use iceberg::spec::{TableMetadata, TableMetadataBuilder};
 use iceberg::table::Table;
 use iceberg::{
-    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
-    TableIdent,
+    Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
 };
 use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow};
 use sqlx::{Any, AnyPool, Row, Transaction};

From 053ccb07acdbc071628f40b859857cac467c2d25 Mon Sep 17 00:00:00 2001
From: Li0k <yuli@singularity-data.com>
Date: Mon, 13 Jan 2025 18:17:34 +0800
Subject: [PATCH 3/3] fix(catalog): Fix conflict detection for update table

---
 crates/catalog/sql/src/catalog.rs | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index 911d48eea..21ac81285 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -810,7 +810,16 @@ impl Catalog for SqlCatalog {
             Some(identifier.name()),
         ];
 
-        self.execute(&update, args, None).await?;
+        let update_result = self.execute(&update, args, None).await?;
+        if update_result.rows_affected() != 1 {
+            return Err(Error::new(
+                iceberg::ErrorKind::Unexpected,
+                format!(
+                    "Failed to update Table {:?} from  Catalog {:?}",
+                    identifier, &self.name,
+                ),
+            ));
+        }
 
         Ok(Table::builder()
             .file_io(self.fileio.clone())