diff --git a/python/python/tests/test_schema_evolution.py b/python/python/tests/test_schema_evolution.py index 6560d8c7e7d..5cff0bc70f3 100644 --- a/python/python/tests/test_schema_evolution.py +++ b/python/python/tests/test_schema_evolution.py @@ -540,3 +540,119 @@ def test_add_cols_all_null_with_sql(tmp_path: Path): "b": pa.int32(), } ) + + +def test_struct_fields_evolution_meta_only(tmp_path: Path): + struct_ab = pa.struct([("a", pa.int64()), ("b", pa.string())]) + base = pa.table( + { + "id": pa.array([1, 2, 3], pa.int64()), + "struct_fields": pa.array( + [ + {"a": 1, "b": "tom"}, + {"a": 2, "b": "jerry"}, + {"a": 3, "b": "jack"}, + ], + type=struct_ab, + ), + } + ) + ds = lance.write_dataset(base, tmp_path / "add_struct_column_only.lance") + assert ds.schema == pa.schema( + [pa.field("id", pa.int64()), pa.field("struct_fields", struct_ab)] + ) + + ds.add_columns(pa.field("embedding", pa.list_(pa.float32(), 128))) + expected = base.append_column( + "embedding", pa.array([None, None, None], pa.list_(pa.float32(), 128)) + ) + assert ds.schema == pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("struct_fields", struct_ab), + pa.field("embedding", pa.list_(pa.float32(), 128)), + ] + ) + assert ds.to_table() == expected + + new_schema = pa.schema([("label", pa.string()), ("score", pa.float32())]) + ds.add_columns(new_schema) + expected = expected.append_column( + "label", pa.array([None, None, None], pa.string()) + ).append_column("score", pa.array([None, None, None], pa.float32())) + assert ds.schema == pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("struct_fields", struct_ab), + pa.field("embedding", pa.list_(pa.float32(), 128)), + pa.field("label", pa.string()), + pa.field("score", pa.float32()), + ] + ) + assert ds.to_table() == expected + + new_schema_2 = pa.schema( + [("struct_fields", pa.struct([("c", pa.string()), ("d", pa.string())]))] + ) + ds.add_columns(new_schema_2) + struct_abcd = pa.struct( + [("a", pa.int64()), ("b", pa.string()), ("c", pa.string()), ("d", pa.string())] + ) + struct_vals_abcd = pa.array( + [{"a": 1, "b": "tom"}, {"a": 2, "b": "jerry"}, {"a": 3, "b": "jack"}], + type=struct_abcd, + ) + expected = pa.table( + { + "id": pa.array([1, 2, 3], pa.int64()), + "struct_fields": struct_vals_abcd, + "embedding": pa.array([None, None, None], pa.list_(pa.float32(), 128)), + "label": pa.array([None, None, None], pa.string()), + "score": pa.array([None, None, None], pa.float32()), + } + ) + assert ds.schema == pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("struct_fields", struct_abcd), + pa.field("embedding", pa.list_(pa.float32(), 128)), + pa.field("label", pa.string()), + pa.field("score", pa.float32()), + ] + ) + assert ds.to_table() == expected + + insert_vals = pa.array( + [ + {"a": 11, "b": "tom11", "c": "jerry11", "d": "jack11"}, + {"a": 22, "b": "tom22", "c": "jerry22", "d": "jack22"}, + {"a": 33, "b": "tom33", "c": "jerry33", "d": "jack33"}, + ], + type=struct_abcd, + ) + insert_tab = pa.table( + {"id": pa.array([11, 22, 33], pa.int64()), "struct_fields": insert_vals} + ) + ds.insert(insert_tab) + + final_expected = pa.table( + { + "id": pa.array([1, 2, 3, 11, 22, 33], pa.int64()), + "struct_fields": pa.concat_arrays([struct_vals_abcd, insert_vals]), + "embedding": pa.array( + [None, None, None, None, None, None], pa.list_(pa.float32(), 128) + ), + "label": pa.array([None, None, None, None, None, None], pa.string()), + "score": pa.array([None, None, None, None, None, None], pa.float32()), + } + ) + assert ds.schema == pa.schema( + [ + pa.field("id", pa.int64()), + pa.field("struct_fields", struct_abcd), + pa.field("embedding", pa.list_(pa.float32(), 128)), + pa.field("label", pa.string()), + pa.field("score", pa.float32()), + ] + ) + assert ds.to_table() == final_expected diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 19a03aa5043..c21494b7c82 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -110,6 +110,40 @@ impl<'a> Iterator for SchemaFieldIterPreOrder<'a> { } } +struct SchemaLeafFieldIterPreOrder<'a> { + leaf_field_stack: Vec<&'a Field>, +} + +impl<'a> SchemaLeafFieldIterPreOrder<'a> { + #[allow(dead_code)] + fn new(schema: &'a Schema) -> Self { + let mut field_stack = Vec::with_capacity(schema.fields.len() * 2); + for field in schema.fields.iter().rev() { + field_stack.push(field); + } + Self { + leaf_field_stack: field_stack, + } + } +} + +/// Iterator implementation for a pre-order traversal of leaf fields +impl<'a> Iterator for SchemaLeafFieldIterPreOrder<'a> { + type Item = &'a Field; + + fn next(&mut self) -> Option { + while let Some(next_field) = self.leaf_field_stack.pop() { + for child in next_field.children.iter().rev() { + self.leaf_field_stack.push(child); + } + if next_field.children.is_empty() { + return Some(next_field); + } + } + None + } +} + impl Schema { /// The unenforced primary key fields in the schema pub fn unenforced_primary_key(&self) -> Vec<&Field> { @@ -331,6 +365,12 @@ impl Schema { SchemaFieldIterPreOrder::new(self) } + /// Iterates over the fields using a pre-order traversal + /// Only leaf fields (fields that don't have any children) are visited. + pub fn leaf_fields_pre_order(&self) -> impl Iterator { + SchemaLeafFieldIterPreOrder::new(self) + } + /// Returns a new schema that only contains the fields in `column_ids`. /// /// This projection can filter out both top-level and nested fields diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index fda48b102a3..b75912fc565 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -132,6 +132,50 @@ fn is_upcast_downcast(from_type: &DataType, to_type: &DataType) -> bool { } } +// Checks for duplicate column names (including nested paths) between the existing dataset schema and a new output schema. +// +// This function is called early in the schema evolution process to avoid unnecessary work if column name conflicts exist. +// It converts both schemas to a common format, extracts all field paths (including nested ones), and checks for overlaps. +// If any path exists in both schemas, an error is returned. +// +// # Arguments +// * `dataset` - The existing dataset containing the current schema +// * `output_schema` - The new output schema being proposed +// +// # Returns +// * `Ok(())` if no duplicate column paths are found +// * `Err(Error)` with a descriptive message if a duplicate column path is detected +// +// # Errors +// Returns an error if any column path in the new schema already exists in the dataset +fn check_names(dataset: &Dataset, output_schema: &ArrowSchema) -> Result<()> { + // Convert the output schema to Lance schema so we can inspect nested paths + let new_schema = Schema::try_from(output_schema)?; + + // Collect all existing field paths from the dataset (including nested) + let existing_schema = dataset.schema(); + let existing_paths: HashSet = existing_schema + .leaf_fields_pre_order() + .map(|f| existing_schema.field_path(f.id).unwrap()) + .collect(); + + // Collect all new field paths (including nested) + let new_paths: HashSet = new_schema + .leaf_fields_pre_order() + .map(|f| new_schema.field_path(f.id).unwrap()) + .collect(); + + // If any path overlaps, report the first overlapping node + if let Some(dup_path) = new_paths.iter().find(|p| existing_paths.contains(*p)) { + return Err(Error::invalid_input( + format!("Column {} already exists in the dataset", dup_path), + location!(), + )); + } + + Ok(()) +} + pub(super) async fn add_columns_to_fragments( dataset: &Dataset, transforms: NewColumnTransform, @@ -139,21 +183,6 @@ pub(super) async fn add_columns_to_fragments( fragments: &[FileFragment], batch_size: Option, ) -> Result<(Vec, Schema)> { - // Check names early (before calling add_columns_impl) to avoid extra work if - // the names are wrong. - let check_names = |output_schema: &ArrowSchema| { - let new_names = output_schema.field_names(); - for field in &dataset.schema().fields { - if new_names.contains(&&field.name) { - return Err(Error::invalid_input( - format!("Column {} already exists in the dataset", field.name), - location!(), - )); - } - } - Ok(()) - }; - // Optimize the transforms let mut optimizer = ChainedNewColumnTransformOptimizer::new(vec![]); // ALlNull transform can not performed on legacy files @@ -164,7 +193,7 @@ pub(super) async fn add_columns_to_fragments( let (output_schema, fragments) = match transforms { NewColumnTransform::BatchUDF(udf) => { - check_names(udf.output_schema.as_ref())?; + check_names(dataset, udf.output_schema.as_ref())?; let fragments = add_columns_impl( fragments, read_columns, @@ -221,7 +250,7 @@ pub(super) async fn add_columns_to_fragments( }) .collect::>>()?, )); - check_names(output_schema.as_ref())?; + check_names(dataset, output_schema.as_ref())?; let schema_ref = output_schema.clone(); let mapper = move |batch: &RecordBatch| { @@ -243,19 +272,19 @@ pub(super) async fn add_columns_to_fragments( } NewColumnTransform::Stream(stream) => { let output_schema = stream.schema(); - check_names(output_schema.as_ref())?; + check_names(dataset, output_schema.as_ref())?; let fragments = add_columns_from_stream(fragments, stream, None, batch_size).await?; Ok((output_schema, fragments)) } NewColumnTransform::Reader(reader) => { let output_schema = reader.schema(); - check_names(output_schema.as_ref())?; + check_names(dataset, output_schema.as_ref())?; let stream = reader.into_stream(); let fragments = add_columns_from_stream(fragments, stream, None, batch_size).await?; Ok((output_schema, fragments)) } NewColumnTransform::AllNulls(output_schema) => { - check_names(output_schema.as_ref())?; + check_names(dataset, output_schema.as_ref())?; // Check that the schema is compatible considering all the new columns must be nullable let schema = Schema::try_from(output_schema.as_ref())?; @@ -662,7 +691,7 @@ mod test { use crate::dataset::WriteParams; use super::*; - use arrow_array::{Int32Array, RecordBatchIterator}; + use arrow_array::{ArrayRef, Int32Array, RecordBatchIterator, StructArray}; use arrow_schema::Fields as ArrowFields; use lance_core::utils::tempfile::TempStrDir; use lance_file::version::LanceFileVersion; @@ -1800,4 +1829,182 @@ mod test { ]); assert_eq!(ArrowSchema::from(dataset.schema()), expected_schema); } + + #[tokio::test] + async fn test_check_names_same_name_at_different_levels() -> Result<()> { + // {"\"id\"": int32, "s": {"a": int32}, "a": {"b": int32}} + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("\"id\"", DataType::Int32, false), + ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])), + true, + ), + ArrowField::new( + "a", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "b", + DataType::Int32, + true, + )])), + true, + ), + ])); + + // {"id": 1, "s": {"a": 1}, "t": {"b": 2}} + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StructArray::from(vec![( + Arc::new(ArrowField::new("a", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + )])), + Arc::new(StructArray::from(vec![( + Arc::new(ArrowField::new("b", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![2])) as ArrayRef, + )])), + ], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let tmp = TempStrDir::default(); + let dataset = Dataset::write(reader, &tmp, None).await?; + + // {"id": int32, "t": {"a": int32}, "a": {"h": int32}} + let sibling_ok = ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new( + "t", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])), + true, + ), + ArrowField::new( + "a", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "h", + DataType::Int32, + true, + )])), + true, + ), + ]); + + // dataset = {"\"id\"": int32, "s": {"a": int32}, "a": {"b": int32}} + // sibling_ok = {"id": int32, "t": {"a": int32}, "a": {"h": int32}} + assert!(check_names(&dataset, &sibling_ok).is_ok()); + + Ok(()) + } + + #[tokio::test] + async fn test_check_names_nested() -> Result<()> { + // {"id": int32, "s": {"a": int32, "b": int32}} + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])), + true, + ), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(ArrowField::new("a", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![1])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("b", DataType::Int32, true)), + Arc::new(Int32Array::from(vec![2])) as ArrayRef, + ), + ])), + ], + )?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let tmp = TempStrDir::default(); + let dataset = Dataset::write(reader, &tmp, None).await?; + + // Conflict: s.a already exists + // {"s": {"a": int32}} + let nested_conflict = ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "a", + DataType::Int32, + true, + )])), + true, + )]); + + // dataset = {"id": int32, "s": {"a": int32, "b": int32}} + // nested_conflict = {"s": {"a": int32}} + let err = check_names(&dataset, &nested_conflict).unwrap_err(); + assert!(matches!(err, Error::InvalidInput { .. })); + + // No conflict: s.c is new nested path + // {"s": {"c": float32}} + let nested_ok = ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "c", + DataType::Float32, + true, + )])), + true, + )]); + // dataset = {"id": int32, "s": {"a": int32, "b": int32}} + // nested_ok = {"s": {"c": float32}} + assert!(check_names(&dataset, &nested_ok).is_ok()); + + Ok(()) + } + + #[tokio::test] + async fn test_check_names_non_nested() -> Result<()> { + // {"id": int32} + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int32, + false, + )])); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))])?; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let tmp = TempStrDir::default(); + let dataset = Dataset::write(reader, &tmp, None).await?; + + // {"id": int32} + // Conflict: id already exists + let conflict_output = ArrowSchema::new(vec![ArrowField::new("id", DataType::Int32, true)]); + + // dataset = {"id": int32} + // conflict_output = {"id": int32} + let err = check_names(&dataset, &conflict_output).unwrap_err(); + assert!(matches!(err, Error::InvalidInput { .. })); + + // No conflict: id2 is new + let ok_output = ArrowSchema::new(vec![ArrowField::new("id2", DataType::Int32, true)]); + + // dataset = {"id": int32} + // ok_output = {"id2": int32} + assert!(check_names(&dataset, &ok_output).is_ok()); + + Ok(()) + } }