Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions python/python/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,59 @@ def test_json_filter_append_missing_json_cast(tmp_path: Path):
"PLoS One",
"Nature",
]


def test_json_limit_offset_batch_transfer_preserves_extension_metadata(tmp_path: Path):
"""Ensure JSON extension metadata survives limit/offset scans.

This covers recreating a table by reading a source dataset in chunks and
appending each chunk into a new dataset.
"""

source_path = tmp_path / "json_source.lance"
dest_path = tmp_path / "json_dest.lance"

num_rows = 25
batch_size = 10

table = pa.table(
{
"id": pa.array(range(num_rows), type=pa.int32()),
"meta": pa.array(
[json.dumps({"i": i}) for i in range(num_rows)], type=pa.json_()
),
}
)

lance.write_dataset(table, source_path)
source = lance.dataset(source_path)

first_batch = source.to_table(limit=batch_size)
meta_field = first_batch.schema.field("meta")
assert (
str(meta_field.type) == "extension<arrow.json>" or meta_field.type == pa.utf8()
)

lance.write_dataset(first_batch, dest_path, mode="overwrite")

offset = batch_size
while True:
batch = source.to_table(limit=batch_size, offset=offset)
if batch.num_rows == 0:
break

assert batch.schema == first_batch.schema
meta_field = batch.schema.field("meta")
assert (
str(meta_field.type) == "extension<arrow.json>"
or meta_field.type == pa.utf8()
)

lance.write_dataset(batch, dest_path, mode="append")
offset += batch_size

dest = lance.dataset(dest_path)
assert dest.count_rows() == num_rows

# Ensure JSON functions still recognize the column as JSON.
assert dest.to_table(filter="json_get(meta, 'i') IS NOT NULL").num_rows == num_rows
37 changes: 34 additions & 3 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,19 +408,24 @@ impl ProjectionPlan {
}

pub fn output_schema(&self) -> Result<ArrowSchema> {
let exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
let physical_schema = self.physical_projection.to_arrow_schema();
let exprs = self.to_physical_exprs(&physical_schema)?;
let fields = exprs
.iter()
.map(|(expr, name)| {
let metadata = expr.return_field(&physical_schema)?.metadata().clone();
Ok(ArrowField::new(
name,
expr.data_type(&physical_schema)?,
expr.nullable(&physical_schema)?,
))
)
.with_metadata(metadata))
})
.collect::<Result<Vec<_>>>()?;
Ok(ArrowSchema::new(fields))
Ok(ArrowSchema::new_with_metadata(
fields,
physical_schema.metadata().clone(),
))
}

#[instrument(skip_all, level = "debug")]
Expand All @@ -447,3 +452,29 @@ impl ProjectionPlan {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

use lance_arrow::json::{is_json_field, json_field};

#[test]
fn test_output_schema_preserves_json_extension_metadata() {
let arrow_schema = ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
json_field("meta", true),
]);
let base_schema = Schema::try_from(&arrow_schema).unwrap();
let base = Arc::new(base_schema.clone());

let plan = ProjectionPlan::from_schema(base, &base_schema, BlobVersion::default()).unwrap();

let physical = plan.physical_projection.to_arrow_schema();
assert!(is_json_field(physical.field_with_name("meta").unwrap()));

let output = plan.output_schema().unwrap();
let output_field = output.field_with_name("meta").unwrap();
assert!(is_json_field(output_field));
}
}
39 changes: 38 additions & 1 deletion rust/lance-namespace/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,11 @@ pub fn convert_json_arrow_field(json_field: &JsonArrowField) -> Result<Field> {
let data_type = convert_json_arrow_type(&json_field.r#type)?;
let nullable = json_field.nullable;

Ok(Field::new(&json_field.name, data_type, nullable))
let field = Field::new(&json_field.name, data_type, nullable);
Ok(match json_field.metadata.as_ref() {
Some(metadata) => field.with_metadata(metadata.clone()),
None => field,
})
}

/// Convert JsonArrowDataType to Arrow DataType
Expand Down Expand Up @@ -291,6 +295,39 @@ mod tests {
use std::collections::HashMap;
use std::sync::Arc;

#[test]
fn test_extension_metadata_preserved_in_json_roundtrip() {
const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
const LANCE_JSON_EXT_NAME: &str = "lance.json";

let meta_field =
Field::new("meta", DataType::Binary, true).with_metadata(HashMap::from([(
ARROW_EXT_NAME_KEY.to_string(),
LANCE_JSON_EXT_NAME.to_string(),
)]));
let arrow_schema =
ArrowSchema::new(vec![Field::new("id", DataType::Int32, false), meta_field]);

let json_schema = arrow_schema_to_json(&arrow_schema).unwrap();
let meta_json_field = json_schema
.fields
.iter()
.find(|f| f.name == "meta")
.unwrap();
assert!(meta_json_field
.metadata
.as_ref()
.unwrap()
.contains_key(ARROW_EXT_NAME_KEY));

let roundtrip = convert_json_arrow_schema(&json_schema).unwrap();
let meta_field = roundtrip.field_with_name("meta").unwrap();
assert_eq!(
meta_field.metadata().get(ARROW_EXT_NAME_KEY),
Some(&LANCE_JSON_EXT_NAME.to_string())
);
}

#[test]
fn test_convert_basic_types() {
// Test int32
Expand Down
39 changes: 39 additions & 0 deletions rust/lance/src/dataset/tests/dataset_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::vec;

use crate::index::vector::VectorIndexParams;
use lance_arrow::json::{is_arrow_json_field, json_field, JsonArray};
use lance_arrow::FixedSizeListArrayExt;

use arrow::compute::concat_batches;
Expand Down Expand Up @@ -327,6 +328,44 @@ async fn test_fts_filter_vector_search() {
.await;
}

#[tokio::test]
async fn test_scan_limit_offset_preserves_json_extension_metadata() {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
json_field("meta", true),
]));

let json_array = JsonArray::try_from_iter((0..50).map(|i| Some(format!(r#"{{"i":{i}}}"#))))
.unwrap()
.into_inner();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_iter_values(0..50)),
Arc::new(json_array),
],
)
.unwrap();

let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
let dataset = Dataset::write(reader, "memory://", None).await.unwrap();

let mut scanner = dataset.scan();
scanner.limit(Some(10), None).unwrap();
let batch_no_offset = scanner.try_into_batch().await.unwrap();
assert!(is_arrow_json_field(
batch_no_offset.schema().field_with_name("meta").unwrap()
));

let mut scanner = dataset.scan();
scanner.limit(Some(10), Some(10)).unwrap();
let batch_with_offset = scanner.try_into_batch().await.unwrap();
assert!(is_arrow_json_field(
batch_with_offset.schema().field_with_name("meta").unwrap()
));
assert_eq!(batch_no_offset.schema(), batch_with_offset.schema());
}

async fn prepare_query_filter_dataset() -> Dataset {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", DataType::Int32, false),
Expand Down
54 changes: 48 additions & 6 deletions rust/lance/src/io/exec/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn project_field(field: &FieldRef, selection: &Selection) -> FieldRef {
match selection {
Selection::FullField(_) => {
// If we project, it's always null (for some reason).
Arc::new(Field::new(field.name(), field.data_type().clone(), true))
Arc::new(field.as_ref().clone().with_nullable(true))
}
Selection::StructProjection(_, sub_selections) => {
if let DataType::Struct(fields) = field.data_type() {
Expand All @@ -131,11 +131,14 @@ fn project_field(field: &FieldRef, selection: &Selection) -> FieldRef {
let projected_field = project_field(field, sub_selection);
projected_fields.push(projected_field);
}
Arc::new(Field::new(
field.name(),
DataType::Struct(projected_fields.into()),
true,
))
Arc::new(
Field::new(
field.name(),
DataType::Struct(projected_fields.into()),
true,
)
.with_metadata(field.metadata().clone()),
)
} else {
panic!("Expected struct")
}
Expand Down Expand Up @@ -311,6 +314,45 @@ mod tests {
Ok(batches.into_iter().next().unwrap())
}

#[tokio::test]
async fn test_project_preserves_field_metadata() {
use arrow_array::LargeBinaryArray;

let meta_field = Field::new("meta", DataType::LargeBinary, true).with_metadata(
std::collections::HashMap::from([(
lance_arrow::ARROW_EXT_NAME_KEY.to_string(),
"lance.json".to_string(),
)]),
);
let x_field = Field::new("x", DataType::Int32, true);

let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"b",
DataType::Struct(vec![meta_field.clone(), x_field.clone()].into()),
true,
)]));

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StructArray::from(vec![
(
Arc::new(meta_field.clone()),
Arc::new(LargeBinaryArray::from(vec![Some(b"{}".as_slice())])) as ArrayRef,
),
(Arc::new(x_field), Arc::new(Int32Array::from(vec![1]))),
]))],
)
.unwrap();

let projection = ArrowSchema::new(vec![Field::new(
"b",
DataType::Struct(vec![meta_field].into()),
true,
)]);
let result = apply_to_batch(batch, &projection).await.unwrap();
assert_eq!(result.schema().as_ref(), &projection);
}

#[tokio::test]
async fn test_project_node() {
let sample_data = sample_nested_data();
Expand Down
Loading