Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions python/python/tests/test_schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,119 @@ def test_add_cols_all_null_with_sql(tmp_path: Path):
"b": pa.int32(),
}
)


def test_struct_fields_evolution_meta_only(tmp_path: Path):
struct_ab = pa.struct([("a", pa.int64()), ("b", pa.string())])
base = pa.table(
{
"id": pa.array([1, 2, 3], pa.int64()),
"struct_fields": pa.array(
[
{"a": 1, "b": "tom"},
{"a": 2, "b": "jerry"},
{"a": 3, "b": "jack"},
],
type=struct_ab,
),
}
)
ds = lance.write_dataset(base, tmp_path / "add_struct_column_only.lance")
assert ds.schema == pa.schema(
[pa.field("id", pa.int64()), pa.field("struct_fields", struct_ab)]
)

ds.add_columns(pa.field("embedding", pa.list_(pa.float32(), 128)))
expected = base.append_column(
"embedding", pa.array([None, None, None], pa.list_(pa.float32(), 128))
)
assert ds.schema == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("struct_fields", struct_ab),
pa.field("embedding", pa.list_(pa.float32(), 128)),
]
)
assert ds.to_table() == expected

new_schema = pa.schema([("label", pa.string()), ("score", pa.float32())])
ds.add_columns(new_schema)
expected = expected.append_column(
"label", pa.array([None, None, None], pa.string())
).append_column("score", pa.array([None, None, None], pa.float32()))
assert ds.schema == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("struct_fields", struct_ab),
pa.field("embedding", pa.list_(pa.float32(), 128)),
pa.field("label", pa.string()),
pa.field("score", pa.float32()),
]
)
assert ds.to_table() == expected

new_schema_2 = pa.schema(
[("struct_fields", pa.struct([("c", pa.string()), ("d", pa.string())]))]
)
ds.add_columns(new_schema_2)
struct_abcd = pa.struct(
[("a", pa.int64()), ("b", pa.string()), ("c", pa.string()), ("d", pa.string())]
)
struct_vals_abcd = pa.array(
[{"a": 1, "b": "tom"}, {"a": 2, "b": "jerry"}, {"a": 3, "b": "jack"}],
type=struct_abcd,
)
expected = pa.table(
{
"id": pa.array([1, 2, 3], pa.int64()),
"struct_fields": struct_vals_abcd,
"embedding": pa.array([None, None, None], pa.list_(pa.float32(), 128)),
"label": pa.array([None, None, None], pa.string()),
"score": pa.array([None, None, None], pa.float32()),
}
)
assert ds.schema == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("struct_fields", struct_abcd),
pa.field("embedding", pa.list_(pa.float32(), 128)),
pa.field("label", pa.string()),
pa.field("score", pa.float32()),
]
)
assert ds.to_table() == expected

insert_vals = pa.array(
[
{"a": 11, "b": "tom11", "c": "jerry11", "d": "jack11"},
{"a": 22, "b": "tom22", "c": "jerry22", "d": "jack22"},
{"a": 33, "b": "tom33", "c": "jerry33", "d": "jack33"},
],
type=struct_abcd,
)
insert_tab = pa.table(
{"id": pa.array([11, 22, 33], pa.int64()), "struct_fields": insert_vals}
)
ds.insert(insert_tab)

final_expected = pa.table(
{
"id": pa.array([1, 2, 3, 11, 22, 33], pa.int64()),
"struct_fields": pa.concat_arrays([struct_vals_abcd, insert_vals]),
"embedding": pa.array(
[None, None, None, None, None, None], pa.list_(pa.float32(), 128)
),
"label": pa.array([None, None, None, None, None, None], pa.string()),
"score": pa.array([None, None, None, None, None, None], pa.float32()),
}
)
assert ds.schema == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("struct_fields", struct_abcd),
pa.field("embedding", pa.list_(pa.float32(), 128)),
pa.field("label", pa.string()),
pa.field("score", pa.float32()),
]
)
assert ds.to_table() == final_expected
40 changes: 40 additions & 0 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,40 @@ impl<'a> Iterator for SchemaFieldIterPreOrder<'a> {
}
}

struct SchemaLeafFieldIterPreOrder<'a> {
leaf_field_stack: Vec<&'a Field>,
}

impl<'a> SchemaLeafFieldIterPreOrder<'a> {
#[allow(dead_code)]
fn new(schema: &'a Schema) -> Self {
let mut field_stack = Vec::with_capacity(schema.fields.len() * 2);
for field in schema.fields.iter().rev() {
field_stack.push(field);
}
Self {
leaf_field_stack: field_stack,
}
}
}

/// Iterator implementation for a pre-order traversal of leaf fields
impl<'a> Iterator for SchemaLeafFieldIterPreOrder<'a> {
type Item = &'a Field;

fn next(&mut self) -> Option<Self::Item> {
while let Some(next_field) = self.leaf_field_stack.pop() {
for child in next_field.children.iter().rev() {
self.leaf_field_stack.push(child);
}
if next_field.children.is_empty() {
return Some(next_field);
}
}
None
}
}

impl Schema {
/// The unenforced primary key fields in the schema
pub fn unenforced_primary_key(&self) -> Vec<&Field> {
Expand Down Expand Up @@ -331,6 +365,12 @@ impl Schema {
SchemaFieldIterPreOrder::new(self)
}

/// Iterates over the fields using a pre-order traversal
/// Only leaf fields (fields that don't have any children) are visited.
pub fn leaf_fields_pre_order(&self) -> impl Iterator<Item = &Field> {
SchemaLeafFieldIterPreOrder::new(self)
}

/// Returns a new schema that only contains the fields in `column_ids`.
///
/// This projection can filter out both top-level and nested fields
Expand Down
Loading
Loading