diff --git a/python/python/tests/test_json.py b/python/python/tests/test_json.py index c13322afe0f..d6a911c6aec 100644 --- a/python/python/tests/test_json.py +++ b/python/python/tests/test_json.py @@ -40,16 +40,15 @@ def test_json_basic_write_read(): # Read back the dataset dataset = lance.dataset(dataset_path) - # Verify storage schema - assert len(dataset.schema) == 2 - assert dataset.schema.field("id").type == pa.int32() - - # Check that JSON field is stored as JSONB internally - storage_field = dataset.schema.field("data") - assert storage_field.type == pa.large_binary() - assert storage_field.metadata is not None - assert b"ARROW:extension:name" in storage_field.metadata - assert storage_field.metadata[b"ARROW:extension:name"] == b"lance.json" + # Verify logical schema exposed to users + logical_schema = dataset.schema + assert len(logical_schema) == 2 + assert logical_schema.field("id").type == pa.int32() + logical_field = logical_schema.field("data") + assert ( + str(logical_field.type) == "extension" + or logical_field.type == pa.utf8() + ) # Read data back result_table = dataset.to_table() @@ -355,3 +354,56 @@ def test_json_array_operations(): result = dataset.to_table(filter="json_array_length(data, '$.items') = 0") assert result.num_rows == 1 assert result["id"][0].as_py() == 3 + + +def test_json_filter_append_missing_json_cast(tmp_path: Path): + """Ensure appending via dataset.schema keeps JSON columns valid.""" + + dataset_path = tmp_path / "json_append_issue.lance" + + initial_table = pa.table( + { + "article_metadata": pa.array( + [json.dumps({"article_journal": "Cell"})], type=pa.json_() + ), + "article_journal": pa.array(["Cell"], type=pa.string()), + } + ) + + lance.write_dataset(initial_table, dataset_path) + dataset = lance.dataset(dataset_path) + schema = dataset.schema + field = schema.field("article_metadata") + assert str(field.type) == "extension" or field.type == pa.utf8() + + append_table = pa.table( + { + "article_metadata": pa.array( + [ + json.dumps({"article_journal": "PLoS One"}), + json.dumps({"article_journal": "Nature"}), + ], + type=pa.json_(), + ), + "article_journal": pa.array(["PLoS One", "Nature"], type=pa.string()), + } + ) + + append_cast = append_table.cast(schema) + first_value = append_cast.column("article_metadata").to_pylist()[0] + assert isinstance(first_value, str) + + lance.write_dataset(append_cast, dataset_path, mode="append") + dataset = lance.dataset(dataset_path) + assert dataset.count_rows() == 3 + + result = dataset.to_table( + filter="json_get(article_metadata, 'article_journal') IS NOT NULL" + ) + + assert result.num_rows == 3 + assert result.column("article_journal").to_pylist() == [ + "Cell", + "PLoS One", + "Nature", + ] diff --git a/python/src/dataset.rs b/python/src/dataset.rs index a0bd8eb0767..be25991929a 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -89,7 +89,7 @@ use crate::fragment::FileFragment; use crate::indices::PyIndexConfig; use crate::rt; use crate::scanner::ScanStatistics; -use crate::schema::LanceSchema; +use crate::schema::{logical_schema_from_lance, LanceSchema}; use crate::session::Session; use crate::utils::PyLance; use crate::{LanceReader, Scanner}; @@ -592,8 +592,8 @@ impl Dataset { #[getter(schema)] fn schema(self_: PyRef<'_, Self>) -> PyResult { - let arrow_schema = ArrowSchema::from(self_.ds.schema()); - arrow_schema.to_pyarrow(self_.py()) + let logical_schema = logical_schema_from_lance(self_.ds.schema()); + logical_schema.to_pyarrow(self_.py()) } #[getter(lance_schema)] diff --git a/python/src/fragment.rs b/python/src/fragment.rs index 1bc864b9027..933f231d263 100644 --- a/python/src/fragment.rs +++ b/python/src/fragment.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use arrow::ffi_stream::ArrowArrayStreamReader; use arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; use arrow_array::RecordBatchReader; -use arrow_schema::Schema as ArrowSchema; use futures::TryFutureExt; use lance::dataset::fragment::FileFragment as LanceFragment; use lance::dataset::scanner::ColumnOrdering; @@ -39,7 +38,7 @@ use snafu::location; use crate::dataset::{get_write_params, transforms_from_python, PyWriteDest}; use crate::error::PythonErrorExt; -use crate::schema::LanceSchema; +use crate::schema::{logical_schema_from_lance, LanceSchema}; use crate::utils::{export_vec, extract_vec, PyLance}; use crate::{rt, Dataset, Scanner}; @@ -354,8 +353,8 @@ impl FileFragment { fn schema(self_: PyRef<'_, Self>) -> PyResult { let schema = self_.fragment.dataset().schema(); - let arrow_schema: ArrowSchema = schema.into(); - arrow_schema.to_pyarrow(self_.py()) + let logical_schema = logical_schema_from_lance(schema); + logical_schema.to_pyarrow(self_.py()) } /// Returns the data file objects associated with this fragment. diff --git a/python/src/scanner.rs b/python/src/scanner.rs index 8c6fd2f7ac0..747212a8423 100644 --- a/python/src/scanner.rs +++ b/python/src/scanner.rs @@ -29,6 +29,7 @@ use pyo3::exceptions::PyValueError; use crate::reader::LanceReader; use crate::rt; +use crate::schema::logical_arrow_schema; /// This will be wrapped by a python class to provide /// additional functionality @@ -99,9 +100,11 @@ impl Scanner { #[getter(schema)] fn schema(self_: PyRef<'_, Self>) -> PyResult { let scanner = self_.scanner.clone(); - rt().spawn(Some(self_.py()), async move { scanner.schema().await })? - .map(|s| s.to_pyarrow(self_.py())) - .map_err(|err| PyValueError::new_err(err.to_string()))? + let schema = rt() + .spawn(Some(self_.py()), async move { scanner.schema().await })? + .map_err(|err| PyValueError::new_err(err.to_string()))?; + let logical_schema = logical_arrow_schema(schema.as_ref()); + logical_schema.to_pyarrow(self_.py()) } #[pyo3(signature = (*, verbose = false))] diff --git a/python/src/schema.rs b/python/src/schema.rs index 0c8ef6870fb..107232f1a2b 100644 --- a/python/src/schema.rs +++ b/python/src/schema.rs @@ -2,8 +2,10 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use arrow::pyarrow::PyArrowType; +use arrow_array::RecordBatch; use arrow_schema::Schema as ArrowSchema; use lance::datatypes::{Field, Schema}; +use lance_arrow::json::{convert_lance_json_to_arrow, has_json_fields}; use lance_file::datatypes::{Fields, FieldsWithMeta}; use lance_file::format::pb; use prost::Message; @@ -165,3 +167,22 @@ impl LanceSchema { Ok(self.0.field(name).map(|f| LanceField(f.clone()))) } } + +pub(crate) fn logical_arrow_schema(schema: &ArrowSchema) -> ArrowSchema { + use std::sync::Arc; + + if !schema.fields().iter().any(|f| has_json_fields(f.as_ref())) { + return schema.clone(); + } + + let schema_ref = Arc::new(schema.clone()); + let empty_batch = RecordBatch::new_empty(schema_ref.clone()); + match convert_lance_json_to_arrow(&empty_batch) { + Ok(converted) => converted.schema().as_ref().clone(), + Err(_) => schema.clone(), + } +} + +pub(crate) fn logical_schema_from_lance(schema: &Schema) -> ArrowSchema { + logical_arrow_schema(&ArrowSchema::from(schema)) +}