diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 008ee6a595..0278f2abe5 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -25,9 +25,10 @@ use crate::error::Result; use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; use crate::transaction::snapshot::{ - DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, + generate_unique_snapshot_id, DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, }; use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction { @@ -38,6 +39,7 @@ pub struct FastAppendAction { snapshot_properties: HashMap, added_data_files: Vec, added_delete_files: Vec, + snapshot_id: Option, } impl FastAppendAction { @@ -49,6 +51,7 @@ impl FastAppendAction { snapshot_properties: HashMap::default(), added_data_files: vec![], added_delete_files: vec![], + snapshot_id: None, } } @@ -90,11 +93,33 @@ impl FastAppendAction { self.snapshot_properties = snapshot_properties; self } + + /// Set snapshot id + pub fn set_snapshot_id(mut self, snapshot_id: i64) -> Self { + self.snapshot_id = Some(snapshot_id); + self + } } #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { + let snapshot_id = if let Some(snapshot_id) = self.snapshot_id { + if table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot id {} already exists", snapshot_id), + )); + } + snapshot_id + } else { + generate_unique_snapshot_id(table) + }; + let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), @@ -102,6 +127,7 @@ impl TransactionAction for FastAppendAction { self.snapshot_properties.clone(), self.added_data_files.clone(), self.added_delete_files.clone(), + snapshot_id, ); // validate added files diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 53a92e5a97..15a21682bd 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -90,10 +90,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties: HashMap, added_data_files: Vec, added_delete_files: Vec, + snapshot_id: i64, ) -> Self { Self { table, - snapshot_id: Self::generate_unique_snapshot_id(table), + snapshot_id, commit_uuid, key_metadata, snapshot_properties, @@ -161,28 +162,6 @@ impl<'a> SnapshotProducer<'a> { Ok(()) } - fn generate_unique_snapshot_id(table: &Table) -> i64 { - let generate_random_id = || -> i64 { - let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); - let snapshot_id = (lhs ^ rhs) as i64; - if snapshot_id < 0 { - -snapshot_id - } else { - snapshot_id - } - }; - let mut snapshot_id = generate_random_id(); - - while table - .metadata() - .snapshots() - .any(|s| s.snapshot_id() == snapshot_id) - { - snapshot_id = generate_random_id(); - } - snapshot_id - } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", @@ -474,3 +453,25 @@ impl<'a> SnapshotProducer<'a> { Ok(ActionCommit::new(updates, requirements)) } } + +pub fn generate_unique_snapshot_id(table: &Table) -> i64 { + let generate_random_id = || -> i64 { + let (lhs, rhs) = Uuid::new_v4().as_u64_pair(); + let snapshot_id = (lhs ^ rhs) as i64; + if snapshot_id < 0 { + -snapshot_id + } else { + snapshot_id + } + }; + let mut snapshot_id = generate_random_id(); + + while table + .metadata() + .snapshots() + .any(|s| s.snapshot_id() == snapshot_id) + { + snapshot_id = generate_random_id(); + } + snapshot_id +} diff --git a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs index dd54ac98f4..4121c5210b 100644 --- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; @@ -128,4 +128,24 @@ async fn test_append_data_file() { let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); assert_eq!(batches.len(), 1); assert_eq!(batches[0], batch); + + // commit result again + let tx = Transaction::new(&table); + let append_action = tx.fast_append().add_data_files(data_file.clone()); + let tx = append_action.apply(tx).unwrap(); + let table = tx.commit(&rest_catalog).await.unwrap(); + + // check result again + let batch_stream = table + .scan() + .select_all() + .build() + .unwrap() + .to_arrow() + .await + .unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0], batch); + assert_eq!(batches[1], batch); } diff --git a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs index 36a1c3643f..14e90c9aaa 100644 --- a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs +++ b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs @@ -25,10 +25,10 @@ use iceberg::spec::{Literal, PrimitiveLiteral, Struct, Transform, UnboundPartiti use iceberg::table::Table; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; diff --git a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs index 551116bf91..fcc5a3f21f 100644 --- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs @@ -23,10 +23,10 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog; diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs b/crates/integration_tests/tests/shared_tests/scan_all_type.rs index 51f4093927..0d9a0f65ab 100644 --- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs +++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs @@ -30,15 +30,15 @@ use arrow_schema::{DataType, Field, Fields}; use futures::TryStreamExt; use iceberg::arrow::{DEFAULT_MAP_FIELD_NAME, UTC_TIME_ZONE}; use iceberg::spec::{ - LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedField, - PrimitiveType, Schema, StructType, Type, + ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, LIST_FIELD_NAME, + MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, }; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; -use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; +use iceberg::writer::file_writer::ParquetWriterBuilder; use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, TableCreation}; use iceberg_catalog_rest::RestCatalog;