From db93b871b4aaba4301dfe52b29e35a713e406583 Mon Sep 17 00:00:00 2001 From: lijinglun Date: Mon, 3 Nov 2025 16:37:33 +0800 Subject: [PATCH 1/2] feat: support add sub-column to struct col --- python/python/tests/test_dataset.py | 8 +- rust/lance-encoding/src/version.rs | 4 + rust/lance/src/dataset/schema_evolution.rs | 386 +++++++++++- .../dataset/tests/dataset_schema_evolution.rs | 548 ++++++++++++++++++ rust/lance/src/dataset/tests/mod.rs | 1 + 5 files changed, 932 insertions(+), 15 deletions(-) create mode 100644 rust/lance/src/dataset/tests/dataset_schema_evolution.rs diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index d167c79dcfe..1c8578c1cd6 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2501,10 +2501,14 @@ def test_add_null_columns_with_conflict_names(tmp_path: Path): assert len(fragments) == 1 assert len(fragments[0].data_files()) == 1 - with pytest.raises(Exception, match=".*Column id already exists in the dataset.*"): + with pytest.raises( + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + ): ds.add_columns(pa.field("id", pa.float32())) - with pytest.raises(Exception, match=".*Column id already exists in the dataset.*"): + with pytest.raises( + Exception, match=".*Type conflicts between id\\(Int64\\) and id\\(Float32\\).*" + ): ds.add_columns([pa.field("id", pa.float32()), pa.field("good", pa.int32())]) diff --git a/rust/lance-encoding/src/version.rs b/rust/lance-encoding/src/version.rs index b7ae8129049..96fd7b6d7fe 100644 --- a/rust/lance-encoding/src/version.rs +++ b/rust/lance-encoding/src/version.rs @@ -81,6 +81,10 @@ impl LanceFileVersion { Self::iter().filter(|&v| v != Self::Stable && v != Self::Next && v != Self::Legacy) } + + pub fn support_add_sub_column(&self) -> bool { + self > &Self::V2_1 + } } impl std::fmt::Display for LanceFileVersion { diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index fda48b102a3..70cf26613a8 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -3,6 +3,11 @@ use std::{collections::HashSet, sync::Arc}; +use super::fragment::FileFragment; +use super::{ + transaction::{Operation, Transaction}, + Dataset, +}; use crate::{io::exec::Planner, Error, Result}; use arrow::compute::can_cast_types; use arrow::compute::CastOptions; @@ -13,15 +18,11 @@ use futures::stream::{StreamExt, TryStreamExt}; use lance_arrow::SchemaExt; use lance_core::datatypes::{Field, Schema}; use lance_datafusion::utils::StreamingWriteSource; +use lance_encoding::constants::{PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY}; +use lance_encoding::version::LanceFileVersion; use lance_table::format::Fragment; use snafu::location; -use super::fragment::FileFragment; -use super::{ - transaction::{Operation, Transaction}, - Dataset, -}; - mod optimize; use optimize::{ @@ -132,6 +133,73 @@ fn is_upcast_downcast(from_type: &DataType, to_type: &DataType) -> bool { } } +trait ArrowFieldExt { + fn is_packed(&self) -> bool; +} + +impl ArrowFieldExt for ArrowField { + fn is_packed(&self) -> bool { + let metadata = self.metadata(); + metadata + .get(PACKED_STRUCT_LEGACY_META_KEY) + .map(|v| v == "true") + .unwrap_or(metadata.contains_key(PACKED_STRUCT_META_KEY)) + } +} + +fn check_field_conflict( + left: &ArrowField, + right: &ArrowField, + version: &LanceFileVersion, +) -> Result<()> { + if left.name() != right.name() { + return Ok(()); + } + + match (left.data_type(), right.data_type()) { + (DataType::Struct(fl), DataType::Struct(fr)) => { + if !version.support_add_sub_column() { + return Err(Error::invalid_input( + format!("Column {} is a struct col, add sub column is not supported in Lance file version {}", left.name(), version), + location!(), + )); + } + + if left.is_packed() || right.is_packed() { + return Err(Error::invalid_input( + format!( + "Column {} is packed struct and already exists in the dataset", + left.name() + ), + location!(), + )); + } + + for l_field in fl.iter() { + if let Some((_, r_field)) = fr.find(l_field.name()) { + check_field_conflict(l_field, r_field, version)?; + } + } + Ok(()) + } + (DataType::List(fl), DataType::List(fr)) => check_field_conflict(fl, fr, version), + (DataType::LargeList(fl), DataType::LargeList(fr)) => check_field_conflict(fl, fr, version), + (DataType::FixedSizeList(fl, _), DataType::FixedSizeList(fr, _)) => { + check_field_conflict(fl, fr, version) + } + (_, _) => Err(Error::invalid_input( + format!( + "Type conflicts between {}({}) and {}({})", + left.name(), + left.data_type(), + right.name(), + right.data_type() + ), + location!(), + )), + } +} + pub(super) async fn add_columns_to_fragments( dataset: &Dataset, transforms: NewColumnTransform, @@ -141,17 +209,15 @@ pub(super) async fn add_columns_to_fragments( ) -> Result<(Vec, Schema)> { // Check names early (before calling add_columns_impl) to avoid extra work if // the names are wrong. + let version = dataset.manifest.data_storage_format.lance_file_version()?; let check_names = |output_schema: &ArrowSchema| { - let new_names = output_schema.field_names(); for field in &dataset.schema().fields { - if new_names.contains(&&field.name) { - return Err(Error::invalid_input( - format!("Column {} already exists in the dataset", field.name), - location!(), - )); + if let Ok(out_field) = output_schema.field_with_name(&field.name) { + let ds_field = ArrowField::from(field); + check_field_conflict(&ds_field, out_field, &version)?; } } - Ok(()) + Ok::<(), Error>(()) }; // Optimize the transforms @@ -657,6 +723,7 @@ pub(super) async fn drop_columns(dataset: &mut Dataset, columns: &[&str]) -> Res #[cfg(test)] mod test { + use std::collections::HashMap; use std::sync::Mutex; use crate::dataset::WriteParams; @@ -1800,4 +1867,297 @@ mod test { ]); assert_eq!(ArrowSchema::from(dataset.schema()), expected_schema); } + + #[test] + fn test_check_field_conflict() { + // same struct + let field1 = ArrowField::new( + "test", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // different struct + let field1 = ArrowField::new( + "test", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::Struct(vec![ArrowField::new("b", DataType::Int32, false)].into()), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + // same nested struct + let inner_struct1 = ArrowField::new( + "inner", + DataType::Struct(vec![ArrowField::new("x", DataType::Int32, false)].into()), + false, + ); + let inner_struct2 = ArrowField::new( + "inner", + DataType::Struct(vec![ArrowField::new("x", DataType::Int32, false)].into()), + false, + ); + let field1 = ArrowField::new("test", DataType::Struct(vec![inner_struct1].into()), false); + let field2 = ArrowField::new("test", DataType::Struct(vec![inner_struct2].into()), false); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // basic type with different name + let field1 = ArrowField::new("test1", DataType::Int32, false); + let field2 = ArrowField::new("test2", DataType::Int32, false); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + // basic type with same name + let field1 = ArrowField::new("test", DataType::Int32, false); + let field2 = ArrowField::new("test", DataType::Int32, false); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // different basic type + let field1 = ArrowField::new("test", DataType::Int32, false); + let field2 = ArrowField::new("test", DataType::Float64, false); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // partial conflict + let field1 = ArrowField::new( + "test", + DataType::Struct( + vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Utf8, false), + ] + .into(), + ), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::Struct( + vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("c", DataType::Utf8, false), + ] + .into(), + ), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // same list + let field1 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // list with struct + let field1 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // list with different struct + let field1 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("b", DataType::Int32, false)].into()), + false, + ))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + // list of struct and basic + let field1 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // FixedSizeList with struct + let field1 = ArrowField::new( + "test", + DataType::FixedSizeList( + Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + )), + 2, + ), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::FixedSizeList( + Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + )), + 2, + ), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // FixedSizeList with different struct + let field1 = ArrowField::new( + "test", + DataType::FixedSizeList( + Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + )), + 2, + ), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::FixedSizeList( + Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("b", DataType::Int32, false)].into()), + false, + )), + 2, + ), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + // LargeList with struct + let field1 = ArrowField::new( + "test", + DataType::LargeList(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::LargeList(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_err()); + + // LargeList with different struct + let field1 = ArrowField::new( + "test", + DataType::LargeList(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("a", DataType::Int32, false)].into()), + false, + ))), + false, + ); + let field2 = ArrowField::new( + "test", + DataType::LargeList(Arc::new(ArrowField::new( + "item", + DataType::Struct(vec![ArrowField::new("b", DataType::Int32, false)].into()), + false, + ))), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + // packed struct + let mut packed_meta = HashMap::new(); + packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string()); + + let packed_field = ArrowField::new( + "packed", + DataType::Struct(vec![ArrowField::new("foo", DataType::Int32, false)].into()), + false, + ) + .with_metadata(packed_meta.clone()); + + let field1 = ArrowField::new("test", DataType::Struct(vec![packed_field].into()), false); + let field2 = ArrowField::new( + "test", + DataType::Struct(vec![ArrowField::new("b", DataType::Int32, false)].into()), + false, + ); + assert!(check_field_conflict(&field1, &field2, &LanceFileVersion::V2_2).is_ok()); + + let new_packed_field = ArrowField::new( + "new_packed", + DataType::Struct(vec![ArrowField::new("foo", DataType::Int32, false)].into()), + false, + ) + .with_metadata(packed_meta.clone()); + let field3 = ArrowField::new( + "test", + DataType::Struct(vec![new_packed_field].into()), + false, + ); + assert!(check_field_conflict(&field1, &field3, &LanceFileVersion::V2_2).is_ok()); + + let conflict_field = ArrowField::new( + "packed", + DataType::Struct(vec![ArrowField::new("new_col", DataType::Int32, false)].into()), + false, + ) + .with_metadata(packed_meta); + let field4 = ArrowField::new("test", DataType::Struct(vec![conflict_field].into()), false); + assert!(check_field_conflict(&field1, &field4, &LanceFileVersion::V2_2).is_err()); + } } diff --git a/rust/lance/src/dataset/tests/dataset_schema_evolution.rs b/rust/lance/src/dataset/tests/dataset_schema_evolution.rs new file mode 100644 index 00000000000..fd988978991 --- /dev/null +++ b/rust/lance/src/dataset/tests/dataset_schema_evolution.rs @@ -0,0 +1,548 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use crate::dataset::{NewColumnTransform, WriteMode, WriteParams}; +use crate::Dataset; +use arrow_array::{ + Array, ArrayRef, FixedSizeListArray, Int32Array, ListArray, RecordBatch, RecordBatchIterator, + StringArray, StructArray, +}; +use arrow_schema::{ + DataType, Field as ArrowField, Field, Fields as ArrowFields, Fields, Schema as ArrowSchema, +}; +use lance_encoding::version::LanceFileVersion; +use rstest::rstest; +use std::collections::HashMap; +use std::sync::Arc; + +#[rstest] +#[tokio::test] +async fn test_add_sub_column_to_packed_struct_col( + #[values(LanceFileVersion::V2_2)] version: LanceFileVersion, +) { + let mut dataset = prepare_packed_struct_col(version).await; + + // Construct sub-column record batch. + let food_array = StringArray::from(vec!["omnivore"]); + let struct_array = StructArray::new( + ArrowFields::from(vec![ArrowField::new("food", DataType::Utf8, false)]), + vec![Arc::new(food_array) as ArrayRef], + None, + ); + + let new_added_struct_field = ArrowField::new( + "animal", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "food", + DataType::Utf8, + false, + )])), + false, + ); + let new_schema = Arc::new(ArrowSchema::new(vec![new_added_struct_field])); + let batch = RecordBatch::try_new(new_schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + // Verify add sub-column. + let error = dataset + .add_columns( + NewColumnTransform::Reader(Box::new(RecordBatchIterator::new( + vec![Ok(batch)], + new_schema, + ))), + None, + None, + ) + .await + .unwrap_err(); + assert!(error + .to_string() + .contains("Column animal is packed struct and already exists in the dataset")); +} + +#[rstest] +#[tokio::test] +async fn test_add_sub_column_to_struct_col_unsupported( + #[values( + LanceFileVersion::Legacy, + LanceFileVersion::V2_0, + LanceFileVersion::V2_1 + )] + version: LanceFileVersion, +) { + let mut dataset = prepare_initial_dataset_with_struct_col(version, 3).await; + + // add 2 sub-column of animal + let batch = prepare_sub_column_batch(3).await; + let new_schema = batch.schema(); + + let err = dataset + .add_columns( + NewColumnTransform::Reader(Box::new(RecordBatchIterator::new( + vec![Ok(batch)], + new_schema, + ))), + None, + None, + ) + .await + .unwrap_err(); + assert!(err + .to_string() + .contains("is a struct col, add sub column is not supported in Lance file version")); +} + +#[rstest] +#[tokio::test] +async fn test_add_sub_column_to_struct_col( + #[values(LanceFileVersion::V2_2)] version: LanceFileVersion, +) { + let mut dataset = prepare_initial_dataset_with_struct_col(version, 3).await; + + // add 2 sub-columns of animal + let batch = prepare_sub_column_batch(3).await; + let new_schema = batch.schema(); + + dataset + .add_columns( + NewColumnTransform::Reader(Box::new(RecordBatchIterator::new( + vec![Ok(batch)], + new_schema, + ))), + None, + None, + ) + .await + .unwrap(); + + // Verify schema + // root + // - fixed_list + // - list + // - struct + // - level_1 + // - level_0 + // - leaf + // - new_col + // - new_col + // - new_col + assert_eq!(dataset.schema().fields.len(), 1); + assert_eq!(dataset.schema().fields[0].name, "root"); + + let field = &dataset.schema().fields[0]; + assert_eq!(field.children[0].name, "fixed_list"); + assert_eq!(field.children[1].name, "list"); + assert_eq!(field.children[2].name, "struct"); + + let field = &field.children[2]; + assert_eq!(field.children[0].name, "level_1"); + assert_eq!(field.children[1].name, "new_col"); + + let field = &field.children[0]; + assert_eq!(field.children[0].name, "level_0"); + assert_eq!(field.children[1].name, "new_col"); + + let field = &field.children[0]; + assert_eq!(field.children[0].name, "leaf"); + assert_eq!(field.children[1].name, "new_col"); + + // verify data is updated + let batch = dataset + .scan() + .project(&[ + "root.struct.level_1.level_0.leaf", + "root.struct.new_col", + "root.struct.level_1.new_col", + "root.struct.level_1.level_0.new_col", + ]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 42); + + for i in 1..4 { + let col = batch + .column(i) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 100); + } +} + +async fn prepare_sub_column_batch(nested_level: usize) -> RecordBatch { + // add a sub-column of new_col + let leaf_col = ArrowField::new(String::from("new_col"), DataType::Int32, false); + let leaf_array = Arc::new(Int32Array::from(vec![100])) as ArrayRef; + + let mut current_field = leaf_col.clone(); + let mut current_struct_array = leaf_array.clone(); + + for i in 0..nested_level { + if i == 0 { + let struct_array = StructArray::try_new( + Fields::from(vec![current_field.clone()]), + vec![current_struct_array], + None, + ) + .unwrap(); + + current_struct_array = Arc::new(struct_array) as ArrayRef; + current_field = ArrowField::new( + format!("level_{}", i), + DataType::Struct(ArrowFields::from(vec![current_field])), + false, + ); + } else { + let struct_array = StructArray::try_new( + Fields::from(vec![current_field.clone(), leaf_col.clone()]), + vec![current_struct_array, leaf_array.clone()], + None, + ) + .unwrap(); + + current_struct_array = Arc::new(struct_array) as ArrayRef; + current_field = ArrowField::new( + format!("level_{}", i), + DataType::Struct(ArrowFields::from(vec![current_field, leaf_col.clone()])), + false, + ); + }; + } + + let current_field = ArrowField::new("struct", current_struct_array.data_type().clone(), false); + let root_struct_array = Arc::new( + StructArray::try_new( + Fields::from(vec![current_field]), + vec![current_struct_array], + None, + ) + .unwrap(), + ) as ArrayRef; + + let root_field = Field::new("root", root_struct_array.data_type().clone(), true); + + let schema = Arc::new(ArrowSchema::new(vec![root_field])); + RecordBatch::try_new(schema, vec![Arc::new(root_struct_array)]).unwrap() +} + +async fn prepare_initial_dataset_with_struct_col( + version: LanceFileVersion, + nested_level: usize, +) -> Dataset { + // nested column + let mut current_field = ArrowField::new(String::from("leaf"), DataType::Int32, false); + let mut current_array = Arc::new(Int32Array::from(vec![42])) as ArrayRef; + + for i in 0..nested_level { + let struct_array = StructArray::try_new( + Fields::from(vec![current_field.clone()]), + vec![current_array], + None, + ) + .unwrap(); + + current_array = Arc::new(struct_array) as ArrayRef; + current_field = ArrowField::new( + format!("level_{}", i), + DataType::Struct(ArrowFields::from(vec![current_field])), + false, + ); + } + + // list column + let values = Int32Array::from(vec![1]); + let offsets = + arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![0i32, 1i32])); + let list_data_type = DataType::Int32; + let list_array = ListArray::new( + Arc::new(ArrowField::new("list", list_data_type, false)), + offsets, + Arc::new(values), + None, + ); + + // fixed list column + let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let field = Arc::new(Field::new_list_field(DataType::Int32, true)); + let fixed_size_list_array = FixedSizeListArray::new(field, 6, Arc::new(values), None); + + // Root field + let root_fields = Fields::from(vec![ + Field::new( + "fixed_list", + fixed_size_list_array.data_type().clone(), + true, + ), + Field::new("list", list_array.data_type().clone(), true), + Field::new("struct", current_array.data_type().clone(), true), + ]); + let root_struct_array = StructArray::new( + root_fields.clone(), + vec![ + Arc::new(fixed_size_list_array) as ArrayRef, + Arc::new(list_array) as ArrayRef, + Arc::new(current_array) as ArrayRef, + ], + None, + ); + let root_field = ArrowField::new("root", root_struct_array.data_type().clone(), false); + + // create schema with struct column + let schema = Arc::new(ArrowSchema::new(vec![root_field])); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(root_struct_array)]).unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()); + let write_params = WriteParams { + mode: WriteMode::Create, + data_storage_version: Some(version), + ..Default::default() + }; + let mut dataset = Dataset::write(reader, "memory://test", Some(write_params)) + .await + .unwrap(); + + // verify initial schema + assert_eq!(dataset.schema().fields.len(), 1); + + // add conflict sub-column + let res = dataset + .add_columns( + NewColumnTransform::Reader(Box::new(RecordBatchIterator::new(vec![Ok(batch)], schema))), + None, + None, + ) + .await; + assert!(res.is_err()); + + dataset +} + +async fn prepare_packed_struct_col(version: LanceFileVersion) -> Dataset { + let mut metadata = HashMap::new(); + metadata.insert("lance-encoding:packed".to_string(), "true".to_string()); + + // create schema with struct column + let mut animal_struct_field = ArrowField::new( + "animal", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "name", + DataType::Utf8, + false, + )])), + false, + ); + animal_struct_field.set_metadata(metadata); + let schema = Arc::new(ArrowSchema::new(vec![animal_struct_field])); + + // create data with one record + let name_array = StringArray::from(vec!["bear"]); + let struct_array = StructArray::new( + ArrowFields::from(vec![ArrowField::new("name", DataType::Utf8, false)]), + vec![Arc::new(name_array) as ArrayRef], + None, + ); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()); + let write_params = WriteParams { + mode: WriteMode::Create, + data_storage_version: Some(version), + ..Default::default() + }; + let dataset = Dataset::write(reader, "memory://test", Some(write_params)) + .await + .unwrap(); + + // verify initial schema + assert_eq!(dataset.schema().fields.len(), 1); + assert_eq!(dataset.schema().fields[0].name, "animal"); + + dataset +} + +#[rstest] +#[tokio::test] +async fn test_add_sub_column_to_list_struct_col( + #[values(LanceFileVersion::V2_2)] version: LanceFileVersion, +) { + let mut dataset = prepare_initial_dataset_with_list_struct_col(version).await; + + // Prepare sub-column data to add to the struct inside list. + let all_cars = StringArray::from(vec!["Toyota", "Honda", "Mercedes", "Audi", "BMW", "Tesla"]); + + let car_struct = StructArray::new( + ArrowFields::from(vec![ArrowField::new("car", DataType::Utf8, false)]), + vec![Arc::new(all_cars) as ArrayRef], + None, + ); + + let car_list = ListArray::new( + Arc::new(ArrowField::new( + "item", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "car", + DataType::Utf8, + false, + )])), + false, + )), + arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ + 0i32, 2i32, 5i32, 6i32, + ])), + Arc::new(car_struct), + None, + ); + + let new_added_field = ArrowField::new("people", car_list.data_type().clone(), false); + let new_schema = Arc::new(ArrowSchema::new(vec![new_added_field])); + let batch = RecordBatch::try_new(new_schema.clone(), vec![Arc::new(car_list)]).unwrap(); + + // Add sub-column to the struct inside list. + dataset + .add_columns( + NewColumnTransform::Reader(Box::new(RecordBatchIterator::new( + vec![Ok(batch)], + new_schema, + ))), + None, + None, + ) + .await + .unwrap(); + + // Verify schema + // root + // - id + // - people + // - name + // - age + // - city + // - car + assert_eq!(dataset.schema().fields.len(), 2); + assert_eq!(dataset.schema().fields[0].name, "id"); + assert_eq!(dataset.schema().fields[1].name, "people"); + + let field = &dataset.schema().fields[1]; + assert_eq!(field.children[0].name, "item"); + + let field = &field.children[0]; + assert_eq!(field.children[0].name, "name"); + assert_eq!(field.children[1].name, "age"); + assert_eq!(field.children[2].name, "city"); + assert_eq!(field.children[3].name, "car"); + + // Verify the data + let batch = dataset.scan().try_into_batch().await.unwrap(); + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + + let list_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let list_value = list_array.value(0); + let struct_array = list_value.as_any().downcast_ref::().unwrap(); + let name = struct_array + .column_by_name("name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let car = struct_array + .column_by_name("car") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name.value(0), "Alice"); + assert_eq!(car.value(0), "Toyota"); +} + +async fn prepare_initial_dataset_with_list_struct_col(version: LanceFileVersion) -> Dataset { + // Create struct type for person + let person_struct_type = DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("name", DataType::Utf8, false), + ArrowField::new("age", DataType::Int32, false), + ArrowField::new("city", DataType::Utf8, false), + ])); + + // Create list of struct type + let list_of_struct_type = DataType::List(Arc::new(ArrowField::new( + "item", + person_struct_type.clone(), + false, + ))); + + // Create schema + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, false), + ArrowField::new("people", list_of_struct_type.clone(), false), + ])); + + // Create data - 3 rows as in the Python test + let all_names = StringArray::from(vec!["Alice", "Bob", "Charlie", "David", "Eve", "Frank"]); + let all_ages = Int32Array::from(vec![25, 30, 35, 28, 32, 40]); + let all_cities = StringArray::from(vec![ + "Beijing", + "Shanghai", + "Guangzhou", + "Shenzhen", + "Hangzhou", + "Chengdu", + ]); + let all_struct = StructArray::new( + ArrowFields::from(vec![ + ArrowField::new("name", DataType::Utf8, false), + ArrowField::new("age", DataType::Int32, false), + ArrowField::new("city", DataType::Utf8, false), + ]), + vec![ + Arc::new(all_names) as ArrayRef, + Arc::new(all_ages) as ArrayRef, + Arc::new(all_cities) as ArrayRef, + ], + None, + ); + let all_people = ListArray::new( + Arc::new(ArrowField::new("item", person_struct_type, false)), + arrow_buffer::OffsetBuffer::new(arrow_buffer::ScalarBuffer::from(vec![ + 0i32, 2i32, 5i32, 6i32, + ])), + Arc::new(all_struct), + None, + ); + + let ids = Int32Array::from(vec![1, 2, 3]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids) as ArrayRef, Arc::new(all_people) as ArrayRef], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let write_params = WriteParams { + mode: WriteMode::Create, + data_storage_version: Some(version), + ..Default::default() + }; + let dataset = Dataset::write(reader, "memory://test", Some(write_params)) + .await + .unwrap(); + + // verify initial schema + assert_eq!(dataset.schema().fields.len(), 2); + assert_eq!(dataset.schema().fields[0].name, "id"); + assert_eq!(dataset.schema().fields[1].name, "people"); + + dataset +} diff --git a/rust/lance/src/dataset/tests/mod.rs b/rust/lance/src/dataset/tests/mod.rs index 95c82c8c732..a13c159e0ba 100644 --- a/rust/lance/src/dataset/tests/mod.rs +++ b/rust/lance/src/dataset/tests/mod.rs @@ -8,5 +8,6 @@ mod dataset_index; mod dataset_io; mod dataset_merge_update; mod dataset_migrations; +mod dataset_schema_evolution; mod dataset_transactions; mod dataset_versioning; From d7681a3513a6fcc672838d8a242c2d5e5f049453 Mon Sep 17 00:00:00 2001 From: jinglun Date: Mon, 8 Dec 2025 21:53:54 +0800 Subject: [PATCH 2/2] fix --- rust/lance/src/dataset/schema_evolution.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 70cf26613a8..c4df5bd8f4b 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -187,6 +187,10 @@ fn check_field_conflict( (DataType::FixedSizeList(fl, _), DataType::FixedSizeList(fr, _)) => { check_field_conflict(fl, fr, version) } + (l_type, r_type) if l_type == r_type => Err(Error::invalid_input( + format!("Column {} already exists in the dataset", left.name()), + location!(), + )), (_, _) => Err(Error::invalid_input( format!( "Type conflicts between {}({}) and {}({})",