Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use tracing::{info_span, instrument, Span};

use super::Dataset;
use crate::dataset::row_offsets_to_row_addresses;
use crate::dataset::utils::wrap_json_stream_for_reading;
use crate::dataset::utils::SchemaAdapter;
use crate::index::vector::utils::{get_vector_dim, get_vector_type};
use crate::index::DatasetIndexInternalExt;
use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions};
Expand Down Expand Up @@ -3647,10 +3647,13 @@ pub struct DatasetRecordBatchStream {

impl DatasetRecordBatchStream {
pub fn new(exec_node: SendableRecordBatchStream) -> Self {
// Convert lance.json (JSONB) back to arrow.json (strings) for reading
//
// This is so bad, we need to find a way to remove this.
let exec_node = wrap_json_stream_for_reading(exec_node);
let schema = exec_node.schema();
let adapter = SchemaAdapter::new(schema.clone());
let exec_node = if SchemaAdapter::requires_logical_conversion(&schema) {
adapter.to_logical_stream(exec_node)
} else {
exec_node
};

let span = info_span!("DatasetRecordBatchStream");
Self { exec_node, span }
Expand Down
202 changes: 106 additions & 96 deletions rust/lance/src/dataset/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::Result;
use arrow_array::{RecordBatch, UInt64Array};
use arrow_schema::Schema as ArrowSchema;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use datafusion::error::Result as DFResult;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::SendableRecordBatchStream;
Expand Down Expand Up @@ -138,111 +138,121 @@ impl Default for CapturedRowIds {
}
}

/// Wrap a stream to convert arrow.json to lance.json for writing
///
// FIXME: this is bad, really bad, we need to find a way to remove this.
pub fn wrap_json_stream_for_writing(
stream: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
// Check if any fields need conversion
let needs_conversion = stream
.schema()
.fields()
.iter()
.any(|f| is_arrow_json_field(f));

if !needs_conversion {
return stream;
/// Adapter around the existing JSON conversion utilities.
#[derive(Debug, Clone)]
pub struct SchemaAdapter {
logical_schema: ArrowSchemaRef,
}

impl SchemaAdapter {
/// Create a new adapter given the logical Arrow schema.
pub fn new(logical_schema: ArrowSchemaRef) -> Self {
Self { logical_schema }
}

// Convert the schema
let arrow_schema = stream.schema();
let mut new_fields = Vec::with_capacity(arrow_schema.fields().len());
for field in arrow_schema.fields() {
if is_arrow_json_field(field) {
new_fields.push(Arc::new(arrow_json_to_lance_json(field)));
} else {
new_fields.push(Arc::clone(field));
}
/// Determine if the logical schema includes Arrow JSON fields that require conversion.
pub fn requires_physical_conversion(&self) -> bool {
self.logical_schema
.fields()
.iter()
.any(|field| is_arrow_json_field(field))
}
let converted_schema = Arc::new(ArrowSchema::new_with_metadata(
new_fields,
arrow_schema.metadata().clone(),
));

// Convert the stream
let converted_stream = stream.map(move |batch_result| {
batch_result.and_then(|batch| {
convert_json_columns(&batch)
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
})
});

Box::pin(RecordBatchStreamAdapter::new(
converted_schema,
converted_stream,
))
}
/// Determine if the physical schema includes Lance JSON fields that must be converted back.
pub fn requires_logical_conversion(schema: &ArrowSchemaRef) -> bool {
schema.fields().iter().any(|field| is_json_field(field))
}

/// Wrap a stream to convert lance.json (JSONB) back to arrow.json (strings) for reading
///
// FIXME: this is bad, really bad, we need to find a way to remove this.
pub fn wrap_json_stream_for_reading(
stream: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
use lance_arrow::json::ARROW_JSON_EXT_NAME;
use lance_arrow::ARROW_EXT_NAME_KEY;
/// Convert a logical stream into a physical stream.
pub fn to_physical_stream(
&self,
stream: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
// Check if any fields need conversion
if !self.requires_physical_conversion() {
return stream;
}

// Check if any fields need conversion
let needs_conversion = stream.schema().fields().iter().any(|f| is_json_field(f));
let arrow_schema = stream.schema();
let mut new_fields = Vec::with_capacity(arrow_schema.fields().len());
for field in arrow_schema.fields() {
if is_arrow_json_field(field) {
new_fields.push(Arc::new(arrow_json_to_lance_json(field)));
} else {
new_fields.push(Arc::clone(field));
}
}
let converted_schema = Arc::new(ArrowSchema::new_with_metadata(
new_fields,
arrow_schema.metadata().clone(),
));

if !needs_conversion {
return stream;
let converted_stream = stream.map(move |batch_result| {
batch_result.and_then(|batch| {
convert_json_columns(&batch)
.map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
})
});

Box::pin(RecordBatchStreamAdapter::new(
converted_schema,
converted_stream,
))
}

// Convert the schema
let arrow_schema = stream.schema();
let mut new_fields = Vec::with_capacity(arrow_schema.fields().len());
for field in arrow_schema.fields() {
if is_json_field(field) {
// Convert lance.json (LargeBinary) to arrow.json (Utf8)
let mut new_field = arrow_schema::Field::new(
field.name(),
arrow_schema::DataType::Utf8,
field.is_nullable(),
);
let mut metadata = field.metadata().clone();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
new_field.set_metadata(metadata);
new_fields.push(new_field);
} else {
new_fields.push(field.as_ref().clone());
/// Convert a physical stream into a logical stream.
pub fn to_logical_stream(
&self,
stream: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
use lance_arrow::json::ARROW_JSON_EXT_NAME;
use lance_arrow::ARROW_EXT_NAME_KEY;

if !Self::requires_logical_conversion(&stream.schema()) {
return stream;
}
}
let converted_schema = Arc::new(ArrowSchema::new_with_metadata(
new_fields,
arrow_schema.metadata().clone(),
));

// Convert the stream
let converted_stream = stream.map(move |batch_result| {
batch_result.and_then(|batch| {
convert_lance_json_to_arrow(&batch).map_err(|e| {
datafusion::error::DataFusionError::ArrowError(
Box::new(arrow_schema::ArrowError::InvalidArgumentError(
e.to_string(),
)),
None,
)

let arrow_schema = stream.schema();
let mut new_fields = Vec::with_capacity(arrow_schema.fields().len());
for field in arrow_schema.fields() {
if is_json_field(field) {
let mut new_field = arrow_schema::Field::new(
field.name(),
arrow_schema::DataType::Utf8,
field.is_nullable(),
);
let mut metadata = field.metadata().clone();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
new_field.set_metadata(metadata);
new_fields.push(new_field);
} else {
new_fields.push(field.as_ref().clone());
}
}
let converted_schema = Arc::new(ArrowSchema::new_with_metadata(
new_fields,
arrow_schema.metadata().clone(),
));

let converted_stream = stream.map(move |batch_result| {
batch_result.and_then(|batch| {
convert_lance_json_to_arrow(&batch).map_err(|e| {
datafusion::error::DataFusionError::ArrowError(
Box::new(arrow_schema::ArrowError::InvalidArgumentError(
e.to_string(),
)),
None,
)
})
})
})
});
});

Box::pin(RecordBatchStreamAdapter::new(
converted_schema,
converted_stream,
))
Box::pin(RecordBatchStreamAdapter::new(
converted_schema,
converted_stream,
))
}
}
25 changes: 7 additions & 18 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,9 @@ use super::blob::BlobStreamExt;
use super::fragment::write::generate_random_filename;
use super::progress::{NoopFragmentWriteProgress, WriteFragmentProgress};
use super::transaction::Transaction;
use super::utils::wrap_json_stream_for_writing;
use super::utils::SchemaAdapter;
use super::DATA_DIR;

use lance_arrow::json::is_arrow_json_field;

mod commit;
pub mod delete;
mod insert;
Expand Down Expand Up @@ -378,10 +376,8 @@ pub async fn do_write_fragments(
storage_version: LanceFileVersion,
target_bases_info: Option<Vec<TargetBaseInfo>>,
) -> Result<Vec<Fragment>> {
// Convert arrow.json to lance.json (JSONB) for storage if needed
//
// FIXME: this is bad, really bad, we need to find a way to remove this.
let data = wrap_json_stream_for_writing(data);
let adapter = SchemaAdapter::new(data.schema());
let data = adapter.to_physical_stream(data);

let mut buffered_reader = if storage_version == LanceFileVersion::Legacy {
// In v1 we split the stream into row group sized batches
Expand Down Expand Up @@ -576,17 +572,10 @@ pub async fn write_fragments_internal(
mut params: WriteParams,
target_bases_info: Option<Vec<TargetBaseInfo>>,
) -> Result<WrittenFragments> {
// Convert Arrow JSON columns to Lance JSON (JSONB) format
//
// FIXME: this is bad, really bad, we need to find a way to remove this.
let needs_conversion = data
.schema()
.fields()
.iter()
.any(|f| is_arrow_json_field(f));

let (data, converted_schema) = if needs_conversion {
let data = wrap_json_stream_for_writing(data);
let adapter = SchemaAdapter::new(data.schema());

let (data, converted_schema) = if adapter.requires_physical_conversion() {
let data = adapter.to_physical_stream(data);
// Update the schema to match the converted data
let arrow_schema = data.schema();
let converted_schema = Schema::try_from(arrow_schema.as_ref())?;
Expand Down
Loading