diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 3e53c83545c..0f4287d02a7 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -75,7 +75,7 @@ use tracing::{info_span, instrument, Span}; use super::Dataset; use crate::dataset::row_offsets_to_row_addresses; -use crate::dataset::utils::wrap_json_stream_for_reading; +use crate::dataset::utils::SchemaAdapter; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::index::DatasetIndexInternalExt; use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions}; @@ -3647,10 +3647,13 @@ pub struct DatasetRecordBatchStream { impl DatasetRecordBatchStream { pub fn new(exec_node: SendableRecordBatchStream) -> Self { - // Convert lance.json (JSONB) back to arrow.json (strings) for reading - // - // This is so bad, we need to find a way to remove this. - let exec_node = wrap_json_stream_for_reading(exec_node); + let schema = exec_node.schema(); + let adapter = SchemaAdapter::new(schema.clone()); + let exec_node = if SchemaAdapter::requires_logical_conversion(&schema) { + adapter.to_logical_stream(exec_node) + } else { + exec_node + }; let span = info_span!("DatasetRecordBatchStream"); Self { exec_node, span } diff --git a/rust/lance/src/dataset/utils.rs b/rust/lance/src/dataset/utils.rs index e26de9a8811..56792a9317d 100644 --- a/rust/lance/src/dataset/utils.rs +++ b/rust/lance/src/dataset/utils.rs @@ -3,7 +3,7 @@ use crate::Result; use arrow_array::{RecordBatch, UInt64Array}; -use arrow_schema::Schema as ArrowSchema; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use datafusion::error::Result as DFResult; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::SendableRecordBatchStream; @@ -138,111 +138,121 @@ impl Default for CapturedRowIds { } } -/// Wrap a stream to convert arrow.json to lance.json for writing -/// -// FIXME: this is bad, really bad, we need to find a way to remove this. -pub fn wrap_json_stream_for_writing( - stream: SendableRecordBatchStream, -) -> SendableRecordBatchStream { - // Check if any fields need conversion - let needs_conversion = stream - .schema() - .fields() - .iter() - .any(|f| is_arrow_json_field(f)); - - if !needs_conversion { - return stream; +/// Adapter around the existing JSON conversion utilities. +#[derive(Debug, Clone)] +pub struct SchemaAdapter { + logical_schema: ArrowSchemaRef, +} + +impl SchemaAdapter { + /// Create a new adapter given the logical Arrow schema. + pub fn new(logical_schema: ArrowSchemaRef) -> Self { + Self { logical_schema } } - // Convert the schema - let arrow_schema = stream.schema(); - let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); - for field in arrow_schema.fields() { - if is_arrow_json_field(field) { - new_fields.push(Arc::new(arrow_json_to_lance_json(field))); - } else { - new_fields.push(Arc::clone(field)); - } + /// Determine if the logical schema includes Arrow JSON fields that require conversion. + pub fn requires_physical_conversion(&self) -> bool { + self.logical_schema + .fields() + .iter() + .any(|field| is_arrow_json_field(field)) } - let converted_schema = Arc::new(ArrowSchema::new_with_metadata( - new_fields, - arrow_schema.metadata().clone(), - )); - - // Convert the stream - let converted_stream = stream.map(move |batch_result| { - batch_result.and_then(|batch| { - convert_json_columns(&batch) - .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None)) - }) - }); - Box::pin(RecordBatchStreamAdapter::new( - converted_schema, - converted_stream, - )) -} + /// Determine if the physical schema includes Lance JSON fields that must be converted back. + pub fn requires_logical_conversion(schema: &ArrowSchemaRef) -> bool { + schema.fields().iter().any(|field| is_json_field(field)) + } -/// Wrap a stream to convert lance.json (JSONB) back to arrow.json (strings) for reading -/// -// FIXME: this is bad, really bad, we need to find a way to remove this. -pub fn wrap_json_stream_for_reading( - stream: SendableRecordBatchStream, -) -> SendableRecordBatchStream { - use lance_arrow::json::ARROW_JSON_EXT_NAME; - use lance_arrow::ARROW_EXT_NAME_KEY; + /// Convert a logical stream into a physical stream. + pub fn to_physical_stream( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + // Check if any fields need conversion + if !self.requires_physical_conversion() { + return stream; + } - // Check if any fields need conversion - let needs_conversion = stream.schema().fields().iter().any(|f| is_json_field(f)); + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_arrow_json_field(field) { + new_fields.push(Arc::new(arrow_json_to_lance_json(field))); + } else { + new_fields.push(Arc::clone(field)); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); - if !needs_conversion { - return stream; + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_json_columns(&batch) + .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None)) + }) + }); + + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) } - // Convert the schema - let arrow_schema = stream.schema(); - let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); - for field in arrow_schema.fields() { - if is_json_field(field) { - // Convert lance.json (LargeBinary) to arrow.json (Utf8) - let mut new_field = arrow_schema::Field::new( - field.name(), - arrow_schema::DataType::Utf8, - field.is_nullable(), - ); - let mut metadata = field.metadata().clone(); - metadata.insert( - ARROW_EXT_NAME_KEY.to_string(), - ARROW_JSON_EXT_NAME.to_string(), - ); - new_field.set_metadata(metadata); - new_fields.push(new_field); - } else { - new_fields.push(field.as_ref().clone()); + /// Convert a physical stream into a logical stream. + pub fn to_logical_stream( + &self, + stream: SendableRecordBatchStream, + ) -> SendableRecordBatchStream { + use lance_arrow::json::ARROW_JSON_EXT_NAME; + use lance_arrow::ARROW_EXT_NAME_KEY; + + if !Self::requires_logical_conversion(&stream.schema()) { + return stream; } - } - let converted_schema = Arc::new(ArrowSchema::new_with_metadata( - new_fields, - arrow_schema.metadata().clone(), - )); - - // Convert the stream - let converted_stream = stream.map(move |batch_result| { - batch_result.and_then(|batch| { - convert_lance_json_to_arrow(&batch).map_err(|e| { - datafusion::error::DataFusionError::ArrowError( - Box::new(arrow_schema::ArrowError::InvalidArgumentError( - e.to_string(), - )), - None, - ) + + let arrow_schema = stream.schema(); + let mut new_fields = Vec::with_capacity(arrow_schema.fields().len()); + for field in arrow_schema.fields() { + if is_json_field(field) { + let mut new_field = arrow_schema::Field::new( + field.name(), + arrow_schema::DataType::Utf8, + field.is_nullable(), + ); + let mut metadata = field.metadata().clone(); + metadata.insert( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + ); + new_field.set_metadata(metadata); + new_fields.push(new_field); + } else { + new_fields.push(field.as_ref().clone()); + } + } + let converted_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + arrow_schema.metadata().clone(), + )); + + let converted_stream = stream.map(move |batch_result| { + batch_result.and_then(|batch| { + convert_lance_json_to_arrow(&batch).map_err(|e| { + datafusion::error::DataFusionError::ArrowError( + Box::new(arrow_schema::ArrowError::InvalidArgumentError( + e.to_string(), + )), + None, + ) + }) }) - }) - }); + }); - Box::pin(RecordBatchStreamAdapter::new( - converted_schema, - converted_stream, - )) + Box::pin(RecordBatchStreamAdapter::new( + converted_schema, + converted_stream, + )) + } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index e4250e1e106..defb647e012 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -39,11 +39,9 @@ use super::blob::BlobStreamExt; use super::fragment::write::generate_random_filename; use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress}; use super::transaction::Transaction; -use super::utils::wrap_json_stream_for_writing; +use super::utils::SchemaAdapter; use super::DATA_DIR; -use lance_arrow::json::is_arrow_json_field; - mod commit; pub mod delete; mod insert; @@ -378,10 +376,8 @@ pub async fn do_write_fragments( storage_version: LanceFileVersion, target_bases_info: Option>, ) -> Result> { - // Convert arrow.json to lance.json (JSONB) for storage if needed - // - // FIXME: this is bad, really bad, we need to find a way to remove this. - let data = wrap_json_stream_for_writing(data); + let adapter = SchemaAdapter::new(data.schema()); + let data = adapter.to_physical_stream(data); let mut buffered_reader = if storage_version == LanceFileVersion::Legacy { // In v1 we split the stream into row group sized batches @@ -576,17 +572,10 @@ pub async fn write_fragments_internal( mut params: WriteParams, target_bases_info: Option>, ) -> Result { - // Convert Arrow JSON columns to Lance JSON (JSONB) format - // - // FIXME: this is bad, really bad, we need to find a way to remove this. - let needs_conversion = data - .schema() - .fields() - .iter() - .any(|f| is_arrow_json_field(f)); - - let (data, converted_schema) = if needs_conversion { - let data = wrap_json_stream_for_writing(data); + let adapter = SchemaAdapter::new(data.schema()); + + let (data, converted_schema) = if adapter.requires_physical_conversion() { + let data = adapter.to_physical_stream(data); // Update the schema to match the converted data let arrow_schema = data.schema(); let converted_schema = Schema::try_from(arrow_schema.as_ref())?;