diff --git a/java/lance-jni/src/file_reader.rs b/java/lance-jni/src/file_reader.rs index 6f9425dce82..dca3c858699 100644 --- a/java/lance-jni/src/file_reader.rs +++ b/java/lance-jni/src/file_reader.rs @@ -22,7 +22,7 @@ use lance::io::ObjectStore; use lance_core::cache::LanceCache; use lance_core::datatypes::Schema; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; -use lance_file::v2::reader::{FileReader, FileReaderOptions, ReaderProjection}; +use lance_file::reader::{FileReader, FileReaderOptions, ReaderProjection}; use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry}; use lance_io::{ scheduler::{ScanScheduler, SchedulerConfig}, diff --git a/java/lance-jni/src/file_writer.rs b/java/lance-jni/src/file_writer.rs index dd76b88d8bd..ab2501124e9 100644 --- a/java/lance-jni/src/file_writer.rs +++ b/java/lance-jni/src/file_writer.rs @@ -22,8 +22,8 @@ use jni::{ }; use lance::io::ObjectStore; use lance_file::{ - v2::writer::{FileWriter, FileWriterOptions}, version::LanceFileVersion, + writer::{FileWriter, FileWriterOptions}, }; use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry}; diff --git a/python/src/dataset.rs b/python/src/dataset.rs index e31071bb345..7f80766dfba 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -61,7 +61,7 @@ use lance_arrow::as_fixed_size_list_array; use lance_core::Error; use lance_datafusion::utils::reader_to_stream; use lance_encoding::decoder::DecoderConfig; -use lance_file::v2::reader::FileReaderOptions; +use lance_file::reader::FileReaderOptions; use lance_index::scalar::inverted::query::{ BooleanQuery, BoostQuery, FtsQuery, MatchQuery, MultiMatchQuery, Operator, PhraseQuery, }; diff --git a/python/src/file.rs b/python/src/file.rs index c8be45bcf2c..4dc596166a6 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -21,17 +21,12 @@ use futures::stream::StreamExt; use lance::io::{ObjectStore, RecordBatchStream}; use lance_core::cache::LanceCache; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; -use lance_file::v2::reader::ReaderProjection; -use lance_file::v2::LanceEncodingsIo; -use lance_file::{ - v2::{ - reader::{ - BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics, - }, - writer::{FileWriter, FileWriterOptions}, - }, - version::LanceFileVersion, +use lance_file::reader::{ + BufferDescriptor, CachedFileMetadata, FileReader, FileReaderOptions, FileStatistics, + ReaderProjection, }; +use lance_file::writer::{FileWriter, FileWriterOptions}; +use lance_file::{version::LanceFileVersion, LanceEncodingsIo}; use lance_io::object_store::ObjectStoreParams; use lance_io::{ scheduler::{ScanScheduler, SchedulerConfig}, @@ -91,7 +86,7 @@ impl LancePageMetadata { .collect(); Self { buffers, - encoding: lance_file::v2::reader::describe_encoding(inner), + encoding: lance_file::reader::describe_encoding(inner), } } } diff --git a/python/src/utils.rs b/python/src/utils.rs index 6a72e094408..948811b54a0 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -23,7 +23,7 @@ use arrow_schema::DataType; use lance::datatypes::Schema; use lance::Result; use lance_arrow::FixedSizeListArrayExt; -use lance_file::writer::FileWriter; +use lance_file::previous::writer::FileWriter as PreviousFileWriter; use lance_index::scalar::IndexWriter; use lance_index::vector::hnsw::{builder::HnswBuildParams, HNSW}; use lance_index::vector::kmeans::{ @@ -223,7 +223,7 @@ impl Hnsw { let mut writer = rt() .block_on( Some(py), - FileWriter::::try_new( + PreviousFileWriter::::try_new( &object_store, &path, Schema::try_from(HNSW::schema().as_ref()) diff --git a/rust/lance-file/benches/reader.rs b/rust/lance-file/benches/reader.rs index 11c3f31b505..889cce80b54 100644 --- a/rust/lance-file/benches/reader.rs +++ b/rust/lance-file/benches/reader.rs @@ -9,12 +9,10 @@ use futures::{FutureExt, StreamExt}; use lance_datagen::ArrayGeneratorExt; use lance_encoding::decoder::{DecoderPlugins, FilterExpression}; use lance_file::{ - v2::{ - reader::{FileReader, FileReaderOptions}, - testing::test_cache, - writer::{FileWriter, FileWriterOptions}, - }, + reader::{FileReader, FileReaderOptions}, + testing::test_cache, version::LanceFileVersion, + writer::{FileWriter, FileWriterOptions}, }; use lance_io::{ object_store::ObjectStore, diff --git a/rust/lance-file/src/format.rs b/rust/lance-file/src/format.rs index 5b8a7146654..d7bc9c4236e 100644 --- a/rust/lance-file/src/format.rs +++ b/rust/lance-file/src/format.rs @@ -27,8 +27,6 @@ pub mod pbfile { include!(concat!(env!("OUT_DIR"), "/lance.file.v2.rs")); } -pub mod metadata; - /// These version/magic values are written at the end of Lance files (e.g. versions/1.version) pub const MAJOR_VERSION: i16 = 0; pub const MINOR_VERSION: i16 = 2; diff --git a/rust/lance-file/src/v2/io.rs b/rust/lance-file/src/io.rs similarity index 100% rename from rust/lance-file/src/v2/io.rs rename to rust/lance-file/src/io.rs diff --git a/rust/lance-file/src/lib.rs b/rust/lance-file/src/lib.rs index f423d281bb2..b69f21f7cf0 100644 --- a/rust/lance-file/src/lib.rs +++ b/rust/lance-file/src/lib.rs @@ -3,11 +3,14 @@ pub mod datatypes; pub mod format; -pub mod page_table; +pub(crate) mod io; +pub mod previous; pub mod reader; -pub mod v2; +pub mod testing; pub mod writer; +pub use io::LanceEncodingsIo; + use format::MAGIC; pub use lance_encoding::version; diff --git a/rust/lance-file/src/format/metadata.rs b/rust/lance-file/src/previous/format/metadata.rs similarity index 99% rename from rust/lance-file/src/format/metadata.rs rename to rust/lance-file/src/previous/format/metadata.rs index 32108702392..025ed33d427 100644 --- a/rust/lance-file/src/format/metadata.rs +++ b/rust/lance-file/src/previous/format/metadata.rs @@ -11,6 +11,7 @@ use lance_core::datatypes::Schema; use lance_core::{Error, Result}; use lance_io::traits::ProtoStruct; use snafu::location; + /// Data File Metadata #[derive(Debug, Default, DeepSizeOf, PartialEq)] pub struct Metadata { diff --git a/rust/lance-file/src/previous/format/mod.rs b/rust/lance-file/src/previous/format/mod.rs new file mode 100644 index 00000000000..c83016dff5e --- /dev/null +++ b/rust/lance-file/src/previous/format/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +pub mod metadata; diff --git a/rust/lance-file/src/v2.rs b/rust/lance-file/src/previous/mod.rs similarity index 53% rename from rust/lance-file/src/v2.rs rename to rust/lance-file/src/previous/mod.rs index 72f93c21826..9031d2b4992 100644 --- a/rust/lance-file/src/v2.rs +++ b/rust/lance-file/src/previous/mod.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -pub(crate) mod io; +//! Legacy Lance file v1 implementation kept for backwards compatibility. + +pub mod format; +pub mod page_table; pub mod reader; -pub mod testing; pub mod writer; - -pub use io::LanceEncodingsIo; diff --git a/rust/lance-file/src/page_table.rs b/rust/lance-file/src/previous/page_table.rs similarity index 100% rename from rust/lance-file/src/page_table.rs rename to rust/lance-file/src/previous/page_table.rs diff --git a/rust/lance-file/src/previous/reader.rs b/rust/lance-file/src/previous/reader.rs new file mode 100644 index 00000000000..985906698b2 --- /dev/null +++ b/rust/lance-file/src/previous/reader.rs @@ -0,0 +1,1511 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Lance Data File Reader + +// Standard +use std::ops::{Range, RangeTo}; +use std::sync::Arc; + +use arrow_arith::numeric::sub; +use arrow_array::{ + builder::PrimitiveBuilder, + cast::AsArray, + types::{Int32Type, Int64Type}, + ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray, + RecordBatch, StructArray, UInt32Array, +}; +use arrow_buffer::ArrowNativeType; +use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema}; +use arrow_select::concat::{self, concat_batches}; +use async_recursion::async_recursion; +use deepsize::DeepSizeOf; +use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt}; +use lance_arrow::*; +use lance_core::cache::{CacheKey, LanceCache}; +use lance_core::datatypes::{Field, Schema}; +use lance_core::{Error, Result}; +use lance_io::encodings::dictionary::DictionaryDecoder; +use lance_io::encodings::AsyncIndex; +use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter}; +use lance_io::traits::Reader; +use lance_io::utils::{ + read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf, +}; +use lance_io::{object_store::ObjectStore, ReadBatchParams}; +use std::borrow::Cow; + +use object_store::path::Path; +use snafu::location; +use tracing::instrument; + +use crate::previous::format::metadata::Metadata; +use crate::previous::page_table::{PageInfo, PageTable}; + +/// Lance File Reader. +/// +/// It reads arrow data from one data file. +#[derive(Clone, DeepSizeOf)] +pub struct FileReader { + pub object_reader: Arc, + metadata: Arc, + page_table: Arc, + schema: Schema, + + /// The id of the fragment which this file belong to. + /// For simple file access, this can just be zero. + fragment_id: u64, + + /// Page table for statistics + stats_page_table: Arc>, +} + +impl std::fmt::Debug for FileReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileReader") + .field("fragment", &self.fragment_id) + .field("path", &self.object_reader.path()) + .finish() + } +} + +// Generic cache key for string-based keys +struct StringCacheKey<'a, T> { + key: &'a str, + _phantom: std::marker::PhantomData, +} + +impl<'a, T> StringCacheKey<'a, T> { + fn new(key: &'a str) -> Self { + Self { + key, + _phantom: std::marker::PhantomData, + } + } +} + +impl CacheKey for StringCacheKey<'_, T> { + type ValueType = T; + + fn key(&self) -> Cow<'_, str> { + self.key.into() + } +} + +impl FileReader { + /// Open file reader + /// + /// Open the file at the given path using the provided object store. + /// + /// The passed fragment ID determines the first 32-bits of the row IDs. + /// + /// If a manifest is passed in, it will be used to load the schema and dictionary. + /// This is typically done if the file is part of a dataset fragment. If no manifest + /// is passed in, then it is read from the file itself. + /// + /// The session passed in is used to cache metadata about the file. If no session + /// is passed in, there will be no caching. + #[instrument(level = "debug", skip(object_store, schema, session))] + pub async fn try_new_with_fragment_id( + object_store: &ObjectStore, + path: &Path, + schema: Schema, + fragment_id: u32, + field_id_offset: i32, + max_field_id: i32, + session: Option<&LanceCache>, + ) -> Result { + let object_reader = object_store.open(path).await?; + + let metadata = Self::read_metadata(object_reader.as_ref(), session).await?; + + Self::try_new_from_reader( + path, + object_reader.into(), + Some(metadata), + schema, + fragment_id, + field_id_offset, + max_field_id, + session, + ) + .await + } + + #[allow(clippy::too_many_arguments)] + pub async fn try_new_from_reader( + path: &Path, + object_reader: Arc, + metadata: Option>, + schema: Schema, + fragment_id: u32, + field_id_offset: i32, + max_field_id: i32, + session: Option<&LanceCache>, + ) -> Result { + let metadata = match metadata { + Some(metadata) => metadata, + None => Self::read_metadata(object_reader.as_ref(), session).await?, + }; + + let page_table = async { + Self::load_from_cache(session, path.to_string(), |_| async { + PageTable::load( + object_reader.as_ref(), + metadata.page_table_position, + field_id_offset, + max_field_id, + metadata.num_batches() as i32, + ) + .await + }) + .await + }; + + let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session); + + // Can concurrently load page tables + let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?; + + Ok(Self { + object_reader, + metadata, + schema, + page_table, + fragment_id: fragment_id as u64, + stats_page_table, + }) + } + + pub async fn read_metadata( + object_reader: &dyn Reader, + cache: Option<&LanceCache>, + ) -> Result> { + Self::load_from_cache(cache, object_reader.path().to_string(), |_| async { + let file_size = object_reader.size().await?; + let begin = if file_size < object_reader.block_size() { + 0 + } else { + file_size - object_reader.block_size() + }; + let tail_bytes = object_reader.get_range(begin..file_size).await?; + let metadata_pos = read_metadata_offset(&tail_bytes)?; + + let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() { + // We have not read the metadata bytes yet. + read_struct(object_reader, metadata_pos).await? + } else { + let offset = tail_bytes.len() - (file_size - metadata_pos); + read_struct_from_buf(&tail_bytes.slice(offset..))? + }; + Ok(metadata) + }) + .await + } + + /// Get the statistics page table. This will read the metadata if it is not cached. + /// + /// The page table is cached. + async fn read_stats_page_table( + reader: &dyn Reader, + cache: Option<&LanceCache>, + ) -> Result>> { + // To prevent collisions, we cache this at a child path + Self::load_from_cache(cache, reader.path().child("stats").to_string(), |_| async { + let metadata = Self::read_metadata(reader, cache).await?; + + if let Some(stats_meta) = metadata.stats_metadata.as_ref() { + Ok(Some( + PageTable::load( + reader, + stats_meta.page_table_position, + /*min_field_id=*/ 0, + /*max_field_id=*/ *stats_meta.leaf_field_ids.iter().max().unwrap(), + /*num_batches=*/ 1, + ) + .await?, + )) + } else { + Ok(None) + } + }) + .await + } + + /// Load some metadata about the fragment from the cache, if there is one. + async fn load_from_cache( + cache: Option<&LanceCache>, + key: String, + loader: F, + ) -> Result> + where + F: Fn(&str) -> Fut, + Fut: Future> + Send, + { + if let Some(cache) = cache { + let cache_key = StringCacheKey::::new(key.as_str()); + cache + .get_or_insert_with_key(cache_key, || loader(key.as_str())) + .await + } else { + Ok(Arc::new(loader(key.as_str()).await?)) + } + } + + /// Open one Lance data file for read. + pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result { + // If just reading a lance data file we assume the schema is the schema of the data file + let max_field_id = schema.max_field_id().unwrap_or_default(); + Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await + } + + fn io_parallelism(&self) -> usize { + self.object_reader.io_parallelism() + } + + /// Requested projection of the data in this file, excluding the row id column. + pub fn schema(&self) -> &Schema { + &self.schema + } + + pub fn num_batches(&self) -> usize { + self.metadata.num_batches() + } + + /// Get the number of rows in this batch + pub fn num_rows_in_batch(&self, batch_id: i32) -> usize { + self.metadata.get_batch_length(batch_id).unwrap_or_default() as usize + } + + /// Count the number of rows in this file. + pub fn len(&self) -> usize { + self.metadata.len() + } + + pub fn is_empty(&self) -> bool { + self.metadata.is_empty() + } + + /// Read a batch of data from the file. + /// + /// The schema of the returned [RecordBatch] is set by [`FileReader::schema()`]. + #[instrument(level = "debug", skip(self, params, projection))] + pub async fn read_batch( + &self, + batch_id: i32, + params: impl Into, + projection: &Schema, + ) -> Result { + read_batch(self, ¶ms.into(), projection, batch_id).await + } + + /// Read a range of records into one batch. + /// + /// Note that it might call concat if the range is crossing multiple batches, which + /// makes it less efficient than [`FileReader::read_batch()`]. + #[instrument(level = "debug", skip(self, projection))] + pub async fn read_range( + &self, + range: Range, + projection: &Schema, + ) -> Result { + if range.is_empty() { + return Ok(RecordBatch::new_empty(Arc::new(projection.into()))); + } + let range_in_batches = self.metadata.range_to_batches(range)?; + let batches = + stream::iter(range_in_batches) + .map(|(batch_id, range)| async move { + self.read_batch(batch_id, range, projection).await + }) + .buffered(self.io_parallelism()) + .try_collect::>() + .await?; + if batches.len() == 1 { + return Ok(batches[0].clone()); + } + let schema = batches[0].schema(); + Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??) + } + + /// Take by records by indices within the file. + /// + /// The indices must be sorted. + #[instrument(level = "debug", skip_all)] + pub async fn take(&self, indices: &[u32], projection: &Schema) -> Result { + let num_batches = self.num_batches(); + let num_rows = self.len() as u32; + let indices_in_batches = self.metadata.group_indices_to_batches(indices); + let batches = stream::iter(indices_in_batches) + .map(|batch| async move { + if batch.batch_id >= num_batches as i32 { + Err(Error::InvalidInput { + source: format!("batch_id: {} out of bounds", batch.batch_id).into(), + location: location!(), + }) + } else if *batch.offsets.last().expect("got empty batch") > num_rows { + Err(Error::InvalidInput { + source: format!("indices: {:?} out of bounds", batch.offsets).into(), + location: location!(), + }) + } else { + self.read_batch(batch.batch_id, batch.offsets.as_slice(), projection) + .await + } + }) + .buffered(self.io_parallelism()) + .try_collect::>() + .await?; + + let schema = Arc::new(ArrowSchema::from(projection)); + + Ok(tokio::task::spawn_blocking(move || concat_batches(&schema, &batches)).await??) + } + + /// Get the schema of the statistics page table, for the given data field ids. + pub fn page_stats_schema(&self, field_ids: &[i32]) -> Option { + self.metadata.stats_metadata.as_ref().map(|meta| { + let mut stats_field_ids = vec![]; + for stats_field in &meta.schema.fields { + if let Ok(stats_field_id) = stats_field.name.parse::() { + if field_ids.contains(&stats_field_id) { + stats_field_ids.push(stats_field.id); + for child in &stats_field.children { + stats_field_ids.push(child.id); + } + } + } + } + meta.schema.project_by_ids(&stats_field_ids, true) + }) + } + + /// Get the page statistics for the given data field ids. + pub async fn read_page_stats(&self, field_ids: &[i32]) -> Result> { + if let Some(stats_page_table) = self.stats_page_table.as_ref() { + let projection = self.page_stats_schema(field_ids).unwrap(); + // It's possible none of the requested fields have stats. + if projection.fields.is_empty() { + return Ok(None); + } + let arrays = futures::stream::iter(projection.fields.iter().cloned()) + .map(|field| async move { + read_array( + self, + &field, + 0, + stats_page_table, + &ReadBatchParams::RangeFull, + ) + .await + }) + .buffered(self.io_parallelism()) + .try_collect::>() + .await?; + + let schema = ArrowSchema::from(&projection); + let batch = RecordBatch::try_new(Arc::new(schema), arrays)?; + Ok(Some(batch)) + } else { + Ok(None) + } + } +} + +/// Stream desired full batches from the file. +/// +/// Parameters: +/// - **reader**: An opened file reader. +/// - **projection**: The schema of the returning [RecordBatch]. +/// - **predicate**: A function that takes a batch ID and returns true if the batch should be +/// returned. +/// +/// Returns: +/// - A stream of [RecordBatch]s, each one corresponding to one full batch in the file. +pub fn batches_stream( + reader: FileReader, + projection: Schema, + predicate: impl FnMut(&i32) -> bool + Send + Sync + 'static, +) -> impl RecordBatchStream { + // Make projection an Arc so we can clone it and pass between threads. + let projection = Arc::new(projection); + let arrow_schema = ArrowSchema::from(projection.as_ref()); + + let total_batches = reader.num_batches() as i32; + let batches = (0..total_batches).filter(predicate); + // Make another copy of self so we can clone it and pass between threads. + let this = Arc::new(reader); + let inner = stream::iter(batches) + .zip(stream::repeat_with(move || { + (this.clone(), projection.clone()) + })) + .map(move |(batch_id, (reader, projection))| async move { + reader + .read_batch(batch_id, ReadBatchParams::RangeFull, &projection) + .await + }) + .buffered(2) + .boxed(); + RecordBatchStreamAdapter::new(Arc::new(arrow_schema), inner) +} + +/// Read a batch. +/// +/// `schema` may only be empty if `with_row_id` is also true. This function +/// panics otherwise. +pub async fn read_batch( + reader: &FileReader, + params: &ReadBatchParams, + schema: &Schema, + batch_id: i32, +) -> Result { + if !schema.fields.is_empty() { + // We box this because otherwise we get a higher-order lifetime error. + let arrs = stream::iter(&schema.fields) + .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await }) + .buffered(reader.io_parallelism()) + .try_collect::>() + .boxed(); + let arrs = arrs.await?; + Ok(RecordBatch::try_new(Arc::new(schema.into()), arrs)?) + } else { + Err(Error::invalid_input("no fields requested", location!())) + } +} + +#[async_recursion] +async fn read_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + let data_type = field.data_type(); + + use DataType::*; + + if data_type.is_fixed_stride() { + _read_fixed_stride_array(reader, field, batch_id, page_table, params).await + } else { + match data_type { + Null => read_null_array(field, batch_id, page_table, params), + Utf8 | LargeUtf8 | Binary | LargeBinary => { + read_binary_array(reader, field, batch_id, page_table, params).await + } + Struct(_) => read_struct_array(reader, field, batch_id, page_table, params).await, + Dictionary(_, _) => { + read_dictionary_array(reader, field, batch_id, page_table, params).await + } + List(_) => { + read_list_array::(reader, field, batch_id, page_table, params).await + } + LargeList(_) => { + read_list_array::(reader, field, batch_id, page_table, params).await + } + _ => { + unimplemented!("{}", format!("No support for {data_type} yet")); + } + } + } +} + +fn get_page_info<'a>( + page_table: &'a PageTable, + field: &'a Field, + batch_id: i32, +) -> Result<&'a PageInfo> { + page_table.get(field.id, batch_id).ok_or_else(|| { + Error::io( + format!( + "No page info found for field: {}, field_id={} batch={}", + field.name, field.id, batch_id + ), + location!(), + ) + }) +} + +/// Read primitive array for batch `batch_idx`. +async fn _read_fixed_stride_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + let page_info = get_page_info(page_table, field, batch_id)?; + read_fixed_stride_array( + reader.object_reader.as_ref(), + &field.data_type(), + page_info.position, + page_info.length, + params.clone(), + ) + .await +} + +fn read_null_array( + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + let page_info = get_page_info(page_table, field, batch_id)?; + + let length_output = match params { + ReadBatchParams::Indices(indices) => { + if indices.is_empty() { + 0 + } else { + let idx_max = *indices.values().iter().max().unwrap() as u64; + if idx_max >= page_info.length as u64 { + return Err(Error::io( + format!( + "NullArray Reader: request([{}]) out of range: [0..{}]", + idx_max, page_info.length + ), + location!(), + )); + } + indices.len() + } + } + _ => { + let (idx_start, idx_end) = match params { + ReadBatchParams::Range(r) => (r.start, r.end), + ReadBatchParams::RangeFull => (0, page_info.length), + ReadBatchParams::RangeTo(r) => (0, r.end), + ReadBatchParams::RangeFrom(r) => (r.start, page_info.length), + _ => unreachable!(), + }; + if idx_end > page_info.length { + return Err(Error::io( + format!( + "NullArray Reader: request([{}..{}]) out of range: [0..{}]", + // and wrap it in here. + idx_start, + idx_end, + page_info.length + ), + location!(), + )); + } + idx_end - idx_start + } + }; + + Ok(Arc::new(NullArray::new(length_output))) +} + +async fn read_binary_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + let page_info = get_page_info(page_table, field, batch_id)?; + + lance_io::utils::read_binary_array( + reader.object_reader.as_ref(), + &field.data_type(), + field.nullable, + page_info.position, + page_info.length, + params, + ) + .await +} + +async fn read_dictionary_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + let page_info = get_page_info(page_table, field, batch_id)?; + let data_type = field.data_type(); + let decoder = DictionaryDecoder::new( + reader.object_reader.as_ref(), + page_info.position, + page_info.length, + &data_type, + field + .dictionary + .as_ref() + .unwrap() + .values + .as_ref() + .unwrap() + .clone(), + ); + decoder.get(params.clone()).await +} + +async fn read_struct_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result { + // TODO: use tokio to make the reads in parallel. + let mut sub_arrays: Vec<(FieldRef, ArrayRef)> = vec![]; + + for child in field.children.as_slice() { + let arr = read_array(reader, child, batch_id, page_table, params).await?; + sub_arrays.push((Arc::new(child.into()), arr)); + } + + Ok(Arc::new(StructArray::from(sub_arrays))) +} + +async fn take_list_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + positions: &PrimitiveArray, + indices: &UInt32Array, +) -> Result +where + T::Native: ArrowNativeTypeOp + OffsetSizeTrait, +{ + let first_idx = indices.value(0); + // Range of values for each index + let ranges = indices + .values() + .iter() + .map(|i| (*i - first_idx).as_usize()) + .map(|idx| positions.value(idx).as_usize()..positions.value(idx + 1).as_usize()) + .collect::>(); + let field = field.clone(); + let mut list_values: Vec = vec![]; + // TODO: read them in parallel. + for range in ranges.iter() { + list_values.push( + read_array( + reader, + &field.children[0], + batch_id, + page_table, + &(range.clone()).into(), + ) + .await?, + ); + } + + let value_refs = list_values + .iter() + .map(|arr| arr.as_ref()) + .collect::>(); + let mut offsets_builder = PrimitiveBuilder::::new(); + offsets_builder.append_value(T::Native::usize_as(0)); + let mut off = 0_usize; + for range in ranges { + off += range.len(); + offsets_builder.append_value(T::Native::usize_as(off)); + } + let all_values = concat::concat(value_refs.as_slice())?; + let offset_arr = offsets_builder.finish(); + let arr = try_new_generic_list_array(all_values, &offset_arr)?; + Ok(Arc::new(arr) as ArrayRef) +} + +async fn read_list_array( + reader: &FileReader, + field: &Field, + batch_id: i32, + page_table: &PageTable, + params: &ReadBatchParams, +) -> Result +where + T::Native: ArrowNativeTypeOp + OffsetSizeTrait, +{ + // Offset the position array by 1 in order to include the upper bound of the last element + let positions_params = match params { + ReadBatchParams::Range(range) => ReadBatchParams::from(range.start..(range.end + 1)), + ReadBatchParams::RangeTo(range) => ReadBatchParams::from(..range.end + 1), + ReadBatchParams::Indices(indices) => { + (indices.value(0).as_usize()..indices.value(indices.len() - 1).as_usize() + 2).into() + } + p => p.clone(), + }; + + let page_info = get_page_info(&reader.page_table, field, batch_id)?; + let position_arr = read_fixed_stride_array( + reader.object_reader.as_ref(), + &T::DATA_TYPE, + page_info.position, + page_info.length, + positions_params, + ) + .await?; + + let positions: &PrimitiveArray = position_arr.as_primitive(); + + // Recompute params so they align with the offset array + let value_params = match params { + ReadBatchParams::Range(range) => ReadBatchParams::from( + positions.value(0).as_usize()..positions.value(range.end - range.start).as_usize(), + ), + ReadBatchParams::Ranges(_) => { + return Err(Error::Internal { + message: "ReadBatchParams::Ranges should not be used in v1 files".to_string(), + location: location!(), + }) + } + ReadBatchParams::RangeTo(RangeTo { end }) => { + ReadBatchParams::from(..positions.value(*end).as_usize()) + } + ReadBatchParams::RangeFrom(_) => ReadBatchParams::from(positions.value(0).as_usize()..), + ReadBatchParams::RangeFull => ReadBatchParams::from( + positions.value(0).as_usize()..positions.value(positions.len() - 1).as_usize(), + ), + ReadBatchParams::Indices(indices) => { + return take_list_array(reader, field, batch_id, page_table, positions, indices).await; + } + }; + + let start_position = PrimitiveArray::::new_scalar(positions.value(0)); + let offset_arr = sub(positions, &start_position)?; + let offset_arr_ref = offset_arr.as_primitive::(); + let value_arrs = read_array( + reader, + &field.children[0], + batch_id, + page_table, + &value_params, + ) + .await?; + let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?; + Ok(Arc::new(arr) as ArrayRef) +} + +#[cfg(test)] +mod tests { + use crate::previous::writer::{FileWriter as PreviousFileWriter, NotSelfDescribing}; + + use super::*; + + use arrow_array::{ + builder::{Int32Builder, LargeListBuilder, ListBuilder, StringBuilder}, + cast::{as_string_array, as_struct_array}, + types::UInt8Type, + Array, DictionaryArray, Float32Array, Int64Array, LargeListArray, ListArray, StringArray, + UInt8Array, + }; + use arrow_array::{BooleanArray, Int32Array}; + use arrow_schema::{Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema}; + use lance_io::object_store::ObjectStoreParams; + + #[tokio::test] + async fn test_take() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("i", DataType::Int64, true), + ArrowField::new("f", DataType::Float32, false), + ArrowField::new("s", DataType::Utf8, false), + ArrowField::new( + "d", + DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), + false, + ), + ]); + let mut schema = Schema::try_from(&arrow_schema).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/take_test"); + + // Write 10 batches. + let values = StringArray::from_iter_values(["a", "b", "c", "d", "e", "f", "g"]); + let values_ref = Arc::new(values); + let mut batches = vec![]; + for batch_id in 0..10 { + let value_range: Range = batch_id * 10..batch_id * 10 + 10; + let keys = UInt8Array::from_iter_values(value_range.clone().map(|v| (v % 7) as u8)); + let columns: Vec = vec![ + Arc::new(Int64Array::from_iter( + value_range.clone().collect::>(), + )), + Arc::new(Float32Array::from_iter( + value_range.clone().map(|n| n as f32).collect::>(), + )), + Arc::new(StringArray::from_iter_values( + value_range.clone().map(|n| format!("str-{}", n)), + )), + Arc::new(DictionaryArray::::try_new(keys, values_ref.clone()).unwrap()), + ]; + batches.push(RecordBatch::try_new(Arc::new(arrow_schema.clone()), columns).unwrap()); + } + schema.set_dictionary(&batches[0]).unwrap(); + + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + for batch in batches.iter() { + file_writer + .write(std::slice::from_ref(batch)) + .await + .unwrap(); + } + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let batch = reader + .take(&[1, 15, 20, 25, 30, 48, 90], reader.schema()) + .await + .unwrap(); + let dict_keys = UInt8Array::from_iter_values([1, 1, 6, 4, 2, 6, 6]); + assert_eq!( + batch, + RecordBatch::try_new( + batch.schema(), + vec![ + Arc::new(Int64Array::from_iter_values([1, 15, 20, 25, 30, 48, 90])), + Arc::new(Float32Array::from_iter_values([ + 1.0, 15.0, 20.0, 25.0, 30.0, 48.0, 90.0 + ])), + Arc::new(StringArray::from_iter_values([ + "str-1", "str-15", "str-20", "str-25", "str-30", "str-48", "str-90" + ])), + Arc::new(DictionaryArray::try_new(dict_keys, values_ref.clone()).unwrap()), + ] + ) + .unwrap() + ); + } + + async fn test_write_null_string_in_struct(field_nullable: bool) { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "parent", + DataType::Struct(ArrowFields::from(vec![ArrowField::new( + "str", + DataType::Utf8, + field_nullable, + )])), + true, + )])); + + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/null_strings"); + + let string_arr = Arc::new(StringArray::from_iter([Some("a"), Some(""), Some("b")])); + let struct_arr = Arc::new(StructArray::from(vec![( + Arc::new(ArrowField::new("str", DataType::Utf8, field_nullable)), + string_arr.clone() as ArrayRef, + )])); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_arr]).unwrap(); + + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap(); + + if field_nullable { + assert_eq!( + &StringArray::from_iter(vec![Some("a"), None, Some("b")]), + as_string_array( + as_struct_array(actual_batch.column_by_name("parent").unwrap().as_ref()) + .column_by_name("str") + .unwrap() + .as_ref() + ) + ); + } else { + assert_eq!(actual_batch, batch); + } + } + + #[tokio::test] + async fn read_nullable_string_in_struct() { + test_write_null_string_in_struct(true).await; + test_write_null_string_in_struct(false).await; + } + + #[tokio::test] + async fn test_read_struct_of_list_arrays() { + let store = ObjectStore::memory(); + let path = Path::from("/null_strings"); + + let arrow_schema = make_schema_of_list_array(); + let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + + let batches = (0..3) + .map(|_| { + let struct_array = make_struct_of_list_array(10, 10); + RecordBatch::try_new(arrow_schema.clone(), vec![struct_array]).unwrap() + }) + .collect::>(); + let batches_ref = batches.iter().collect::>(); + + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&batches).await.unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap(); + let expected = concat_batches(&arrow_schema, batches_ref).unwrap(); + assert_eq!(expected, actual_batch); + } + + #[tokio::test] + async fn test_scan_struct_of_list_arrays() { + let store = ObjectStore::memory(); + let path = Path::from("/null_strings"); + + let arrow_schema = make_schema_of_list_array(); + let struct_array = make_struct_of_list_array(3, 10); + let schema: Schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![struct_array.clone()]).unwrap(); + + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + let mut expected_columns: Vec = Vec::new(); + for c in struct_array.columns().iter() { + expected_columns.push(c.slice(1, 1)); + } + + let expected_struct = match arrow_schema.fields[0].data_type() { + DataType::Struct(subfields) => subfields + .iter() + .zip(expected_columns) + .map(|(f, d)| (f.clone(), d)) + .collect::>(), + _ => panic!("unexpected field"), + }; + + let expected_struct_array = StructArray::from(expected_struct); + let expected_batch = RecordBatch::from(&StructArray::from(vec![( + Arc::new(arrow_schema.fields[0].as_ref().clone()), + Arc::new(expected_struct_array) as ArrayRef, + )])); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let params = ReadBatchParams::Range(1..2); + let slice_of_batch = reader.read_batch(0, params, reader.schema()).await.unwrap(); + assert_eq!(expected_batch, slice_of_batch); + } + + fn make_schema_of_list_array() -> Arc { + Arc::new(ArrowSchema::new(vec![ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new( + "li", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + true, + ), + ArrowField::new( + "ls", + DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))), + true, + ), + ArrowField::new( + "ll", + DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ])), + true, + )])) + } + + fn make_struct_of_list_array(rows: i32, num_items: i32) -> Arc { + let mut li_builder = ListBuilder::new(Int32Builder::new()); + let mut ls_builder = ListBuilder::new(StringBuilder::new()); + let ll_value_builder = Int32Builder::new(); + let mut large_list_builder = LargeListBuilder::new(ll_value_builder); + for i in 0..rows { + for j in 0..num_items { + li_builder.values().append_value(i * 10 + j); + ls_builder + .values() + .append_value(format!("str-{}", i * 10 + j)); + large_list_builder.values().append_value(i * 10 + j); + } + li_builder.append(true); + ls_builder.append(true); + large_list_builder.append(true); + } + Arc::new(StructArray::from(vec![ + ( + Arc::new(ArrowField::new( + "li", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + true, + )), + Arc::new(li_builder.finish()) as ArrayRef, + ), + ( + Arc::new(ArrowField::new( + "ls", + DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))), + true, + )), + Arc::new(ls_builder.finish()) as ArrayRef, + ), + ( + Arc::new(ArrowField::new( + "ll", + DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + )), + Arc::new(large_list_builder.finish()) as ArrayRef, + ), + ])) + } + + #[tokio::test] + async fn test_read_nullable_arrays() { + use arrow_array::Array; + + // create a record batch with a null array column + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("i", DataType::Int64, false), + ArrowField::new("n", DataType::Null, true), + ]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let columns: Vec = vec![ + Arc::new(Int64Array::from_iter_values(0..100)), + Arc::new(NullArray::new(100)), + ]; + let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); + + // write to a lance file + let store = ObjectStore::memory(); + let path = Path::from("/takes"); + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + // read the file back + let reader = FileReader::try_new(&store, &path, schema.clone()) + .await + .unwrap(); + + async fn read_array_w_params( + reader: &FileReader, + field: &Field, + params: ReadBatchParams, + ) -> ArrayRef { + read_array(reader, field, 0, reader.page_table.as_ref(), ¶ms) + .await + .expect("Error reading back the null array from file") as _ + } + + let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await; + assert_eq!(100, arr.len()); + assert_eq!(arr.data_type(), &DataType::Null); + + let arr = + read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await; + assert_eq!(15, arr.len()); + assert_eq!(arr.data_type(), &DataType::Null); + + let arr = + read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await; + assert_eq!(40, arr.len()); + assert_eq!(arr.data_type(), &DataType::Null); + + let arr = + read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await; + assert_eq!(25, arr.len()); + assert_eq!(arr.data_type(), &DataType::Null); + + let arr = read_array_w_params( + &reader, + &schema.fields[1], + ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72])), + ) + .await; + assert_eq!(4, arr.len()); + assert_eq!(arr.data_type(), &DataType::Null); + + // raise error if take indices are out of bounds + let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100])); + let arr = read_array( + &reader, + &schema.fields[1], + 0, + reader.page_table.as_ref(), + ¶ms, + ); + assert!(arr.await.is_err()); + + // raise error if range indices are out of bounds + let params = ReadBatchParams::RangeTo(..107); + let arr = read_array( + &reader, + &schema.fields[1], + 0, + reader.page_table.as_ref(), + ¶ms, + ); + assert!(arr.await.is_err()); + } + + #[tokio::test] + async fn test_take_lists() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new( + "l", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ArrowField::new( + "ll", + DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ]); + + let value_builder = Int32Builder::new(); + let mut list_builder = ListBuilder::new(value_builder); + let ll_value_builder = Int32Builder::new(); + let mut large_list_builder = LargeListBuilder::new(ll_value_builder); + for i in 0..100 { + list_builder.values().append_value(i); + large_list_builder.values().append_value(i); + if (i + 1) % 10 == 0 { + list_builder.append(true); + large_list_builder.append(true); + } + } + let list_arr = Arc::new(list_builder.finish()); + let large_list_arr = Arc::new(large_list_builder.finish()); + + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![list_arr as ArrayRef, large_list_arr as ArrayRef], + ) + .unwrap(); + + // write to a lance file + let store = ObjectStore::memory(); + let path = Path::from("/take_list"); + let schema: Schema = (&arrow_schema).try_into().unwrap(); + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + // read the file back + let reader = FileReader::try_new(&store, &path, schema.clone()) + .await + .unwrap(); + let actual = reader.take(&[1, 3, 5, 9], &schema).await.unwrap(); + + let value_builder = Int32Builder::new(); + let mut list_builder = ListBuilder::new(value_builder); + let ll_value_builder = Int32Builder::new(); + let mut large_list_builder = LargeListBuilder::new(ll_value_builder); + for i in [1, 3, 5, 9] { + for j in 0..10 { + list_builder.values().append_value(i * 10 + j); + large_list_builder.values().append_value(i * 10 + j); + } + list_builder.append(true); + large_list_builder.append(true); + } + let expected_list = list_builder.finish(); + let expected_large_list = large_list_builder.finish(); + + assert_eq!(actual.column_by_name("l").unwrap().as_ref(), &expected_list); + assert_eq!( + actual.column_by_name("ll").unwrap().as_ref(), + &expected_large_list + ); + } + + #[tokio::test] + async fn test_list_array_with_offsets() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new( + "l", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ArrowField::new( + "ll", + DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Int32, true))), + false, + ), + ]); + + let store = ObjectStore::memory(); + let path = Path::from("/lists"); + + let list_array = ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3), Some(4)]), + Some((0..2_000).map(Some).collect::>()), + ]) + .slice(1, 1); + let large_list_array = LargeListArray::from_iter_primitive::(vec![ + Some(vec![Some(10), Some(11)]), + Some(vec![Some(12), Some(13)]), + Some((0..2_000).map(Some).collect::>()), + ]) + .slice(1, 1); + + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![Arc::new(list_array), Arc::new(large_list_array)], + ) + .unwrap(); + + let schema: Schema = (&arrow_schema).try_into().unwrap(); + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + // Make sure the big array was not written to the file + let file_size_bytes = store.size(&path).await.unwrap(); + assert!(file_size_bytes < 1_000); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual_batch = reader.read_batch(0, .., reader.schema()).await.unwrap(); + assert_eq!(batch, actual_batch); + } + + #[tokio::test] + async fn test_read_ranges() { + // create a record batch with a null array column + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int64, false)]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let columns: Vec = vec![Arc::new(Int64Array::from_iter_values(0..100))]; + let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); + + // write to a lance file + let store = ObjectStore::memory(); + let path = Path::from("/read_range"); + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual_batch = reader.read_range(7..25, reader.schema()).await.unwrap(); + + assert_eq!( + actual_batch.column_by_name("i").unwrap().as_ref(), + &Int64Array::from_iter_values(7..25) + ); + } + + #[tokio::test] + async fn test_batches_stream() { + let store = ObjectStore::memory(); + let path = Path::from("/batch_stream"); + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, true)]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + let mut writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + for i in 0..10 { + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![Arc::new(Int32Array::from_iter_values(i * 10..(i + 1) * 10))], + ) + .unwrap(); + writer.write(&[batch]).await.unwrap(); + } + writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema.clone()) + .await + .unwrap(); + let stream = batches_stream(reader, schema, |id| id % 2 == 0); + let batches = stream.try_collect::>().await.unwrap(); + + assert_eq!(batches.len(), 5); + for (i, batch) in batches.iter().enumerate() { + assert_eq!( + batch, + &RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![Arc::new(Int32Array::from_iter_values( + i as i32 * 2 * 10..(i as i32 * 2 + 1) * 10 + ))], + ) + .unwrap() + ) + } + } + + #[tokio::test] + async fn test_take_boolean_beyond_chunk() { + let store = ObjectStore::from_uri_and_params( + Arc::new(Default::default()), + "memory://", + &ObjectStoreParams { + block_size: Some(256), + ..Default::default() + }, + ) + .await + .unwrap() + .0; + let path = Path::from("/take_bools"); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "b", + DataType::Boolean, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + + let array = BooleanArray::from((0..5000).map(|v| v % 5 == 0).collect::>()); + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(array.clone())]).unwrap(); + file_writer.write(&[batch]).await.unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema.clone()) + .await + .unwrap(); + let actual = reader.take(&[2, 4, 5, 8, 4555], &schema).await.unwrap(); + + assert_eq!( + actual.column_by_name("b").unwrap().as_ref(), + &BooleanArray::from(vec![false, false, true, false, true]) + ); + } + + #[tokio::test] + async fn test_read_projection() { + // The dataset schema may be very large. The file reader should support reading + // a small projection of that schema (this just tests the field_offset / num_fields + // parameters) + let store = ObjectStore::memory(); + let path = Path::from("/partial_read"); + + // Create a large schema + let mut fields = vec![]; + for i in 0..100 { + fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false)); + } + let arrow_schema = ArrowSchema::new(fields); + let schema = Schema::try_from(&arrow_schema).unwrap(); + + let partial_schema = schema.project(&["f50"]).unwrap(); + let partial_arrow: ArrowSchema = (&partial_schema).into(); + + let mut file_writer = PreviousFileWriter::::try_new( + &store, + &path, + partial_schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + + let array = Int32Array::from(vec![0; 15]); + let batch = + RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + let field_id = partial_schema.fields.first().unwrap().id; + let reader = FileReader::try_new_with_fragment_id( + &store, + &path, + schema.clone(), + 0, + /*min_field_id=*/ field_id, + /*max_field_id=*/ field_id, + None, + ) + .await + .unwrap(); + let actual = reader + .read_batch(0, ReadBatchParams::RangeFull, &partial_schema) + .await + .unwrap(); + + assert_eq!(actual, batch); + } +} diff --git a/rust/lance-file/src/previous/writer/mod.rs b/rust/lance-file/src/previous/writer/mod.rs new file mode 100644 index 00000000000..3bef0a73455 --- /dev/null +++ b/rust/lance-file/src/previous/writer/mod.rs @@ -0,0 +1,1331 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +mod statistics; + +use std::collections::HashMap; +use std::marker::PhantomData; + +use arrow_array::builder::{ArrayBuilder, PrimitiveBuilder}; +use arrow_array::cast::{as_large_list_array, as_list_array, as_struct_array}; +use arrow_array::types::{Int32Type, Int64Type}; +use arrow_array::{Array, ArrayRef, RecordBatch, StructArray}; +use arrow_buffer::ArrowNativeType; +use arrow_data::ArrayData; +use arrow_schema::DataType; +use async_recursion::async_recursion; +use async_trait::async_trait; +use lance_arrow::*; +use lance_core::datatypes::{Encoding, Field, NullabilityComparison, Schema, SchemaCompareOptions}; +use lance_core::{Error, Result}; +use lance_io::encodings::{ + binary::BinaryEncoder, dictionary::DictionaryEncoder, plain::PlainEncoder, Encoder, +}; +use lance_io::object_store::ObjectStore; +use lance_io::object_writer::ObjectWriter; +use lance_io::traits::{WriteExt, Writer}; +use object_store::path::Path; +use snafu::location; +use tokio::io::AsyncWriteExt; + +use crate::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION}; +use crate::previous::format::metadata::{Metadata, StatisticsMetadata}; +use crate::previous::page_table::{PageInfo, PageTable}; + +/// The file format currently includes a "manifest" where it stores the schema for +/// self-describing files. Historically this has been a table format manifest that +/// is empty except for the schema field. +/// +/// Since this crate is not aware of the table format we need this to be provided +/// externally. You should always use lance_table::io::manifest::ManifestDescribing +/// for this today. +#[async_trait] +pub trait ManifestProvider { + /// Store the schema in the file + /// + /// This should just require writing the schema (or a manifest wrapper) as a proto struct + /// + /// Note: the dictionaries have already been written by this point and the schema should + /// be populated with the dictionary lengths/offsets + async fn store_schema( + object_writer: &mut ObjectWriter, + schema: &Schema, + ) -> Result>; +} + +/// Implementation of ManifestProvider that does not store the schema +#[cfg(test)] +pub(crate) struct NotSelfDescribing {} + +#[cfg(test)] +#[async_trait] +impl ManifestProvider for NotSelfDescribing { + async fn store_schema(_: &mut ObjectWriter, _: &Schema) -> Result> { + Ok(None) + } +} + +/// [FileWriter] writes Arrow [RecordBatch] to one Lance file. +/// +/// ```ignored +/// use lance::io::FileWriter; +/// use futures::stream::Stream; +/// +/// let mut file_writer = FileWriter::new(object_store, &path, &schema); +/// while let Ok(batch) = stream.next().await { +/// file_writer.write(&batch).unwrap(); +/// } +/// // Need to close file writer to flush buffer and footer. +/// file_writer.shutdown(); +/// ``` +pub struct FileWriter { + pub object_writer: ObjectWriter, + schema: Schema, + batch_id: i32, + page_table: PageTable, + metadata: Metadata, + stats_collector: Option, + manifest_provider: PhantomData, +} + +#[derive(Debug, Clone, Default)] +pub struct FileWriterOptions { + /// The field ids to collect statistics for. + /// + /// If None, will collect for all fields in the schema (that support stats). + /// If an empty vector, will not collect any statistics. + pub collect_stats_for_fields: Option>, +} + +impl FileWriter { + pub async fn try_new( + object_store: &ObjectStore, + path: &Path, + schema: Schema, + options: &FileWriterOptions, + ) -> Result { + let object_writer = object_store.create(path).await?; + Self::with_object_writer(object_writer, schema, options) + } + + pub fn with_object_writer( + object_writer: ObjectWriter, + schema: Schema, + options: &FileWriterOptions, + ) -> Result { + let collect_stats_for_fields = if let Some(stats_fields) = &options.collect_stats_for_fields + { + stats_fields.clone() + } else { + schema.field_ids() + }; + + let stats_collector = if !collect_stats_for_fields.is_empty() { + let stats_schema = schema.project_by_ids(&collect_stats_for_fields, true); + statistics::StatisticsCollector::try_new(&stats_schema) + } else { + None + }; + + Ok(Self { + object_writer, + schema, + batch_id: 0, + page_table: PageTable::default(), + metadata: Metadata::default(), + stats_collector, + manifest_provider: PhantomData, + }) + } + + /// Return the schema of the file writer. + pub fn schema(&self) -> &Schema { + &self.schema + } + + fn verify_field_nullability(arr: &ArrayData, field: &Field) -> Result<()> { + if !field.nullable && arr.null_count() > 0 { + return Err(Error::invalid_input(format!("The field `{}` contained null values even though the field is marked non-null in the schema", field.name), location!())); + } + + for (child_field, child_arr) in field.children.iter().zip(arr.child_data()) { + Self::verify_field_nullability(child_arr, child_field)?; + } + + Ok(()) + } + + fn verify_nullability_constraints(&self, batch: &RecordBatch) -> Result<()> { + for (col, field) in batch.columns().iter().zip(self.schema.fields.iter()) { + Self::verify_field_nullability(&col.to_data(), field)?; + } + Ok(()) + } + + /// Write a [RecordBatch] to the open file. + /// All RecordBatch will be treated as one RecordBatch on disk + /// + /// Returns [Err] if the schema does not match with the batch. + pub async fn write(&mut self, batches: &[RecordBatch]) -> Result<()> { + if batches.is_empty() { + return Ok(()); + } + + for batch in batches { + // Compare, ignore metadata and dictionary + // dictionary should have been checked earlier and could be an expensive check + let schema = Schema::try_from(batch.schema().as_ref())?; + schema.check_compatible( + &self.schema, + &SchemaCompareOptions { + compare_nullability: NullabilityComparison::Ignore, + ..Default::default() + }, + )?; + self.verify_nullability_constraints(batch)?; + } + + // If we are collecting stats for this column, collect them. + // Statistics need to traverse nested arrays, so it's a separate loop + // from writing which is done on top-level arrays. + if let Some(stats_collector) = &mut self.stats_collector { + for (field, arrays) in fields_in_batches(batches, &self.schema) { + if let Some(stats_builder) = stats_collector.get_builder(field.id) { + let stats_row = statistics::collect_statistics(&arrays); + stats_builder.append(stats_row); + } + } + } + + // Copy a list of fields to avoid borrow checker error. + let fields = self.schema.fields.clone(); + for field in fields.iter() { + let arrs = batches + .iter() + .map(|batch| { + batch.column_by_name(&field.name).ok_or_else(|| { + Error::io( + format!("FileWriter::write: Field '{}' not found", field.name), + location!(), + ) + }) + }) + .collect::>>()?; + + Self::write_array( + &mut self.object_writer, + field, + &arrs, + self.batch_id, + &mut self.page_table, + ) + .await?; + } + let batch_length = batches.iter().map(|b| b.num_rows() as i32).sum(); + self.metadata.push_batch_length(batch_length); + + // It's imperative we complete any in-flight requests, since we are + // returning control to the caller. If the caller takes a long time to + // write the next batch, the in-flight requests will not be polled and + // may time out. + self.object_writer.flush().await?; + + self.batch_id += 1; + Ok(()) + } + + /// Add schema metadata, as (key, value) pair to the file. + pub fn add_metadata(&mut self, key: &str, value: &str) { + self.schema + .metadata + .insert(key.to_string(), value.to_string()); + } + + pub async fn finish_with_metadata( + &mut self, + metadata: &HashMap, + ) -> Result { + self.schema + .metadata + .extend(metadata.iter().map(|(k, y)| (k.clone(), y.clone()))); + self.finish().await + } + + pub async fn finish(&mut self) -> Result { + self.write_footer().await?; + self.object_writer.shutdown().await?; + let num_rows = self + .metadata + .batch_offsets + .last() + .cloned() + .unwrap_or_default(); + Ok(num_rows as usize) + } + + /// Total records written in this file. + pub fn len(&self) -> usize { + self.metadata.len() + } + + /// Total bytes written so far + pub async fn tell(&mut self) -> Result { + self.object_writer.tell().await + } + + /// Return the id of the next batch to be written. + pub fn next_batch_id(&self) -> i32 { + self.batch_id + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[async_recursion] + async fn write_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&ArrayRef], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + assert!(!arrs.is_empty()); + let data_type = arrs[0].data_type(); + let arrs_ref = arrs.iter().map(|a| a.as_ref()).collect::>(); + + match data_type { + DataType::Null => { + Self::write_null_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + dt if dt.is_fixed_stride() => { + Self::write_fixed_stride_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + dt if dt.is_binary_like() => { + Self::write_binary_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + DataType::Dictionary(key_type, _) => { + Self::write_dictionary_arr( + object_writer, + field, + arrs_ref.as_slice(), + key_type, + batch_id, + page_table, + ) + .await + } + dt if dt.is_struct() => { + let struct_arrays = arrs.iter().map(|a| as_struct_array(a)).collect::>(); + Self::write_struct_array( + object_writer, + field, + struct_arrays.as_slice(), + batch_id, + page_table, + ) + .await + } + DataType::FixedSizeList(_, _) | DataType::FixedSizeBinary(_) => { + Self::write_fixed_stride_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + DataType::List(_) => { + Self::write_list_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + DataType::LargeList(_) => { + Self::write_large_list_array( + object_writer, + field, + arrs_ref.as_slice(), + batch_id, + page_table, + ) + .await + } + _ => Err(Error::Schema { + message: format!("FileWriter::write: unsupported data type: {data_type}"), + location: location!(), + }), + } + } + + async fn write_null_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + let arrs_length: i32 = arrs.iter().map(|a| a.len() as i32).sum(); + let page_info = PageInfo::new(object_writer.tell().await?, arrs_length as usize); + page_table.set(field.id, batch_id, page_info); + Ok(()) + } + + /// Write fixed size array, including, primtiives, fixed size binary, and fixed size list. + async fn write_fixed_stride_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + assert_eq!(field.encoding, Some(Encoding::Plain)); + assert!(!arrs.is_empty()); + let data_type = arrs[0].data_type(); + + let mut encoder = PlainEncoder::new(object_writer, data_type); + let pos = encoder.encode(arrs).await?; + let arrs_length: i32 = arrs.iter().map(|a| a.len() as i32).sum(); + let page_info = PageInfo::new(pos, arrs_length as usize); + page_table.set(field.id, batch_id, page_info); + Ok(()) + } + + /// Write var-length binary arrays. + async fn write_binary_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + assert_eq!(field.encoding, Some(Encoding::VarBinary)); + let mut encoder = BinaryEncoder::new(object_writer); + let pos = encoder.encode(arrs).await?; + let arrs_length: i32 = arrs.iter().map(|a| a.len() as i32).sum(); + let page_info = PageInfo::new(pos, arrs_length as usize); + page_table.set(field.id, batch_id, page_info); + Ok(()) + } + + async fn write_dictionary_arr( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + key_type: &DataType, + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + assert_eq!(field.encoding, Some(Encoding::Dictionary)); + + // Write the dictionary keys. + let mut encoder = DictionaryEncoder::new(object_writer, key_type); + let pos = encoder.encode(arrs).await?; + let arrs_length: i32 = arrs.iter().map(|a| a.len() as i32).sum(); + let page_info = PageInfo::new(pos, arrs_length as usize); + page_table.set(field.id, batch_id, page_info); + Ok(()) + } + + #[async_recursion] + async fn write_struct_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrays: &[&StructArray], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + arrays + .iter() + .for_each(|a| assert_eq!(a.num_columns(), field.children.len())); + + for child in &field.children { + let mut arrs: Vec<&ArrayRef> = Vec::new(); + for struct_array in arrays { + let arr = struct_array + .column_by_name(&child.name) + .ok_or(Error::Schema { + message: format!( + "FileWriter: schema mismatch: column {} does not exist in array: {:?}", + child.name, + struct_array.data_type() + ), + location: location!(), + })?; + arrs.push(arr); + } + Self::write_array(object_writer, child, arrs.as_slice(), batch_id, page_table).await?; + } + Ok(()) + } + + async fn write_list_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + let capacity: usize = arrs.iter().map(|a| a.len()).sum(); + let mut list_arrs: Vec = Vec::new(); + let mut pos_builder: PrimitiveBuilder = + PrimitiveBuilder::with_capacity(capacity); + + let mut last_offset: usize = 0; + pos_builder.append_value(last_offset as i32); + for array in arrs.iter() { + let list_arr = as_list_array(*array); + let offsets = list_arr.value_offsets(); + + assert!(!offsets.is_empty()); + let start_offset = offsets[0].as_usize(); + let end_offset = offsets[offsets.len() - 1].as_usize(); + + let list_values = list_arr.values(); + let sliced_values = list_values.slice(start_offset, end_offset - start_offset); + list_arrs.push(sliced_values); + + offsets + .iter() + .skip(1) + .map(|b| b.as_usize() - start_offset + last_offset) + .for_each(|o| pos_builder.append_value(o as i32)); + last_offset = pos_builder.values_slice()[pos_builder.len() - 1_usize] as usize; + } + + let positions: &dyn Array = &pos_builder.finish(); + Self::write_fixed_stride_array(object_writer, field, &[positions], batch_id, page_table) + .await?; + let arrs = list_arrs.iter().collect::>(); + Self::write_array( + object_writer, + &field.children[0], + arrs.as_slice(), + batch_id, + page_table, + ) + .await + } + + async fn write_large_list_array( + object_writer: &mut ObjectWriter, + field: &Field, + arrs: &[&dyn Array], + batch_id: i32, + page_table: &mut PageTable, + ) -> Result<()> { + let capacity: usize = arrs.iter().map(|a| a.len()).sum(); + let mut list_arrs: Vec = Vec::new(); + let mut pos_builder: PrimitiveBuilder = + PrimitiveBuilder::with_capacity(capacity); + + let mut last_offset: usize = 0; + pos_builder.append_value(last_offset as i64); + for array in arrs.iter() { + let list_arr = as_large_list_array(*array); + let offsets = list_arr.value_offsets(); + + assert!(!offsets.is_empty()); + let start_offset = offsets[0].as_usize(); + let end_offset = offsets[offsets.len() - 1].as_usize(); + + let sliced_values = list_arr + .values() + .slice(start_offset, end_offset - start_offset); + list_arrs.push(sliced_values); + + offsets + .iter() + .skip(1) + .map(|b| b.as_usize() - start_offset + last_offset) + .for_each(|o| pos_builder.append_value(o as i64)); + last_offset = pos_builder.values_slice()[pos_builder.len() - 1_usize] as usize; + } + + let positions: &dyn Array = &pos_builder.finish(); + Self::write_fixed_stride_array(object_writer, field, &[positions], batch_id, page_table) + .await?; + let arrs = list_arrs.iter().collect::>(); + Self::write_array( + object_writer, + &field.children[0], + arrs.as_slice(), + batch_id, + page_table, + ) + .await + } + + async fn write_statistics(&mut self) -> Result> { + let statistics = self + .stats_collector + .as_mut() + .map(|collector| collector.finish()); + + match statistics { + Some(Ok(stats_batch)) if stats_batch.num_rows() > 0 => { + debug_assert_eq!(self.next_batch_id() as usize, stats_batch.num_rows()); + let schema = Schema::try_from(stats_batch.schema().as_ref())?; + let leaf_field_ids = schema.field_ids(); + + let mut stats_page_table = PageTable::default(); + for (i, field) in schema.fields.iter().enumerate() { + Self::write_array( + &mut self.object_writer, + field, + &[stats_batch.column(i)], + 0, // Only one batch for statistics. + &mut stats_page_table, + ) + .await?; + } + + let page_table_position = + stats_page_table.write(&mut self.object_writer, 0).await?; + + Ok(Some(StatisticsMetadata { + schema, + leaf_field_ids, + page_table_position, + })) + } + Some(Err(e)) => Err(e), + _ => Ok(None), + } + } + + /// Writes the dictionaries (using plain/binary encoding) into the file + /// + /// The offsets and lengths of the written buffers are stored in the given + /// schema so that the dictionaries can be loaded in the future. + async fn write_dictionaries(writer: &mut ObjectWriter, schema: &mut Schema) -> Result<()> { + // Write dictionary values. + let max_field_id = schema.max_field_id().unwrap_or(-1); + for field_id in 0..max_field_id + 1 { + if let Some(field) = schema.mut_field_by_id(field_id) { + if field.data_type().is_dictionary() { + let dict_info = field.dictionary.as_mut().ok_or_else(|| { + Error::io( + format!("Lance field {} misses dictionary info", field.name), + // and wrap it in here. + location!(), + ) + })?; + + let value_arr = dict_info.values.as_ref().ok_or_else(|| { + Error::io( + format!( + "Lance field {} is dictionary type, but misses the dictionary value array", + field.name), + location!(), + ) + })?; + + let data_type = value_arr.data_type(); + let pos = match data_type { + dt if dt.is_numeric() => { + let mut encoder = PlainEncoder::new(writer, dt); + encoder.encode(&[value_arr]).await? + } + dt if dt.is_binary_like() => { + let mut encoder = BinaryEncoder::new(writer); + encoder.encode(&[value_arr]).await? + } + _ => { + return Err(Error::io( + format!( + "Does not support {} as dictionary value type", + value_arr.data_type() + ), + location!(), + )); + } + }; + dict_info.offset = pos; + dict_info.length = value_arr.len(); + } + } + } + Ok(()) + } + + async fn write_footer(&mut self) -> Result<()> { + // Step 1. Write page table. + let field_id_offset = *self.schema.field_ids().iter().min().unwrap(); + let pos = self + .page_table + .write(&mut self.object_writer, field_id_offset) + .await?; + self.metadata.page_table_position = pos; + + // Step 2. Write statistics. + self.metadata.stats_metadata = self.write_statistics().await?; + + // Step 3. Write manifest and dictionary values. + Self::write_dictionaries(&mut self.object_writer, &mut self.schema).await?; + let pos = M::store_schema(&mut self.object_writer, &self.schema).await?; + + // Step 4. Write metadata. + self.metadata.manifest_position = pos; + let pos = self.object_writer.write_struct(&self.metadata).await?; + + // Step 5. Write magics. + self.object_writer + .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC) + .await + } +} + +/// Walk through the schema and return arrays with their Lance field. +/// +/// This skips over nested arrays and fields within list arrays. It does walk +/// over the children of structs. +fn fields_in_batches<'a>( + batches: &'a [RecordBatch], + schema: &'a Schema, +) -> impl Iterator)> { + let num_columns = batches[0].num_columns(); + let array_iters = (0..num_columns).map(|col_i| { + batches + .iter() + .map(|batch| batch.column(col_i)) + .collect::>() + }); + let mut to_visit: Vec<(&'a Field, Vec<&'a ArrayRef>)> = + schema.fields.iter().zip(array_iters).collect(); + + std::iter::from_fn(move || { + loop { + let (field, arrays): (_, Vec<&'a ArrayRef>) = to_visit.pop()?; + match field.data_type() { + DataType::Struct(_) => { + for (i, child_field) in field.children.iter().enumerate() { + let child_arrays = arrays + .iter() + .map(|arr| as_struct_array(*arr).column(i)) + .collect::>(); + to_visit.push((child_field, child_arrays)); + } + continue; + } + // We only walk structs right now. + _ if field.data_type().is_nested() => continue, + _ => return Some((field, arrays)), + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::sync::Arc; + + use arrow_array::{ + types::UInt32Type, BooleanArray, Decimal128Array, Decimal256Array, DictionaryArray, + DurationMicrosecondArray, DurationMillisecondArray, DurationNanosecondArray, + DurationSecondArray, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, Int32Array, + Int64Array, ListArray, NullArray, StringArray, TimestampMicrosecondArray, + TimestampSecondArray, UInt8Array, + }; + use arrow_buffer::i256; + use arrow_schema::{ + Field as ArrowField, Fields as ArrowFields, Schema as ArrowSchema, TimeUnit, + }; + use arrow_select::concat::concat_batches; + + use crate::previous::reader::FileReader; + + #[tokio::test] + async fn test_write_file() { + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("null", DataType::Null, true), + ArrowField::new("bool", DataType::Boolean, true), + ArrowField::new("i", DataType::Int64, true), + ArrowField::new("f", DataType::Float32, false), + ArrowField::new("b", DataType::Utf8, true), + ArrowField::new("decimal128", DataType::Decimal128(7, 3), false), + ArrowField::new("decimal256", DataType::Decimal256(7, 3), false), + ArrowField::new("duration_sec", DataType::Duration(TimeUnit::Second), false), + ArrowField::new( + "duration_msec", + DataType::Duration(TimeUnit::Millisecond), + false, + ), + ArrowField::new( + "duration_usec", + DataType::Duration(TimeUnit::Microsecond), + false, + ), + ArrowField::new( + "duration_nsec", + DataType::Duration(TimeUnit::Nanosecond), + false, + ), + ArrowField::new( + "d", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + ), + ArrowField::new( + "fixed_size_list", + DataType::FixedSizeList( + Arc::new(ArrowField::new("item", DataType::Float32, true)), + 16, + ), + true, + ), + ArrowField::new("fixed_size_binary", DataType::FixedSizeBinary(8), true), + ArrowField::new( + "l", + DataType::List(Arc::new(ArrowField::new("item", DataType::Utf8, true))), + true, + ), + ArrowField::new( + "large_l", + DataType::LargeList(Arc::new(ArrowField::new("item", DataType::Utf8, true))), + true, + ), + ArrowField::new( + "l_dict", + DataType::List(Arc::new(ArrowField::new( + "item", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + ))), + true, + ), + ArrowField::new( + "large_l_dict", + DataType::LargeList(Arc::new(ArrowField::new( + "item", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + ))), + true, + ), + ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("si", DataType::Int64, true), + ArrowField::new("sb", DataType::Utf8, true), + ])), + true, + ), + ]); + let mut schema = Schema::try_from(&arrow_schema).unwrap(); + + let dict_vec = (0..100).map(|n| ["a", "b", "c"][n % 3]).collect::>(); + let dict_arr: DictionaryArray = dict_vec.into_iter().collect(); + + let fixed_size_list_arr = FixedSizeListArray::try_new_from_values( + Float32Array::from_iter((0..1600).map(|n| n as f32).collect::>()), + 16, + ) + .unwrap(); + + let binary_data: [u8; 800] = [123; 800]; + let fixed_size_binary_arr = + FixedSizeBinaryArray::try_new_from_values(&UInt8Array::from_iter(binary_data), 8) + .unwrap(); + + let list_offsets: Int32Array = (0..202).step_by(2).collect(); + let list_values = + StringArray::from((0..200).map(|n| format!("str-{}", n)).collect::>()); + let list_arr: arrow_array::GenericListArray = + try_new_generic_list_array(list_values, &list_offsets).unwrap(); + + let large_list_offsets: Int64Array = (0..202).step_by(2).collect(); + let large_list_values = + StringArray::from((0..200).map(|n| format!("str-{}", n)).collect::>()); + let large_list_arr: arrow_array::GenericListArray = + try_new_generic_list_array(large_list_values, &large_list_offsets).unwrap(); + + let list_dict_offsets: Int32Array = (0..202).step_by(2).collect(); + let list_dict_vec = (0..200).map(|n| ["a", "b", "c"][n % 3]).collect::>(); + let list_dict_arr: DictionaryArray = list_dict_vec.into_iter().collect(); + let list_dict_arr: arrow_array::GenericListArray = + try_new_generic_list_array(list_dict_arr, &list_dict_offsets).unwrap(); + + let large_list_dict_offsets: Int64Array = (0..202).step_by(2).collect(); + let large_list_dict_vec = (0..200).map(|n| ["a", "b", "c"][n % 3]).collect::>(); + let large_list_dict_arr: DictionaryArray = + large_list_dict_vec.into_iter().collect(); + let large_list_dict_arr: arrow_array::GenericListArray = + try_new_generic_list_array(large_list_dict_arr, &large_list_dict_offsets).unwrap(); + + let columns: Vec = vec![ + Arc::new(NullArray::new(100)), + Arc::new(BooleanArray::from_iter( + (0..100).map(|f| Some(f % 3 == 0)).collect::>(), + )), + Arc::new(Int64Array::from_iter((0..100).collect::>())), + Arc::new(Float32Array::from_iter( + (0..100).map(|n| n as f32).collect::>(), + )), + Arc::new(StringArray::from( + (0..100).map(|n| n.to_string()).collect::>(), + )), + Arc::new( + Decimal128Array::from_iter_values(0..100) + .with_precision_and_scale(7, 3) + .unwrap(), + ), + Arc::new( + Decimal256Array::from_iter_values((0..100).map(|v| i256::from_i128(v as i128))) + .with_precision_and_scale(7, 3) + .unwrap(), + ), + Arc::new(DurationSecondArray::from_iter_values(0..100)), + Arc::new(DurationMillisecondArray::from_iter_values(0..100)), + Arc::new(DurationMicrosecondArray::from_iter_values(0..100)), + Arc::new(DurationNanosecondArray::from_iter_values(0..100)), + Arc::new(dict_arr), + Arc::new(fixed_size_list_arr), + Arc::new(fixed_size_binary_arr), + Arc::new(list_arr), + Arc::new(large_list_arr), + Arc::new(list_dict_arr), + Arc::new(large_list_dict_arr), + Arc::new(StructArray::from(vec![ + ( + Arc::new(ArrowField::new("si", DataType::Int64, true)), + Arc::new(Int64Array::from_iter((100..200).collect::>())) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("sb", DataType::Utf8, true)), + Arc::new(StringArray::from( + (0..100).map(|n| n.to_string()).collect::>(), + )) as ArrayRef, + ), + ])), + ]; + let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); + schema.set_dictionary(&batch).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/foo"); + let mut file_writer = FileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual = reader.read_batch(0, .., reader.schema()).await.unwrap(); + assert_eq!(actual, batch); + } + + #[tokio::test] + async fn test_dictionary_first_element_file() { + let arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "d", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + )]); + let mut schema = Schema::try_from(&arrow_schema).unwrap(); + + let dict_vec = (0..100).map(|n| ["a", "b", "c"][n % 3]).collect::>(); + let dict_arr: DictionaryArray = dict_vec.into_iter().collect(); + + let columns: Vec = vec![Arc::new(dict_arr)]; + let batch = RecordBatch::try_new(Arc::new(arrow_schema), columns).unwrap(); + schema.set_dictionary(&batch).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/foo"); + let mut file_writer = FileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual = reader.read_batch(0, .., reader.schema()).await.unwrap(); + assert_eq!(actual, batch); + } + + #[tokio::test] + async fn test_write_temporal_types() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new( + "ts_notz", + DataType::Timestamp(TimeUnit::Second, None), + false, + ), + ArrowField::new( + "ts_tz", + DataType::Timestamp(TimeUnit::Microsecond, Some("America/Los_Angeles".into())), + false, + ), + ])); + let columns: Vec = vec![ + Arc::new(TimestampSecondArray::from(vec![11111111, 22222222])), + Arc::new( + TimestampMicrosecondArray::from(vec![3333333, 4444444]) + .with_timezone("America/Los_Angeles"), + ), + ]; + let batch = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); + + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + let store = ObjectStore::memory(); + let path = Path::from("/foo"); + let mut file_writer = FileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer + .write(std::slice::from_ref(&batch)) + .await + .unwrap(); + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + let actual = reader.read_batch(0, .., reader.schema()).await.unwrap(); + assert_eq!(actual, batch); + } + + #[tokio::test] + async fn test_collect_stats() { + // Validate: + // Only collects stats for requested columns + // Can collect stats in nested structs + // Won't collect stats for list columns (for now) + + let arrow_schema = ArrowSchema::new(vec![ + ArrowField::new("i", DataType::Int64, true), + ArrowField::new("i2", DataType::Int64, true), + ArrowField::new( + "l", + DataType::List(Arc::new(ArrowField::new("item", DataType::Int32, true))), + true, + ), + ArrowField::new( + "s", + DataType::Struct(ArrowFields::from(vec![ + ArrowField::new("si", DataType::Int64, true), + ArrowField::new("sb", DataType::Utf8, true), + ])), + true, + ), + ]); + + let schema = Schema::try_from(&arrow_schema).unwrap(); + + let store = ObjectStore::memory(); + let path = Path::from("/foo"); + + let options = FileWriterOptions { + collect_stats_for_fields: Some(vec![0, 1, 5, 6]), + }; + let mut file_writer = + FileWriter::::try_new(&store, &path, schema.clone(), &options) + .await + .unwrap(); + + let batch1 = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![4, 5, 6])), + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + Some(vec![Some(4), Some(5)]), + Some(vec![]), + ])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(ArrowField::new("si", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("sb", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + file_writer.write(&[batch1]).await.unwrap(); + + let batch2 = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int64Array::from(vec![5, 6])), + Arc::new(Int64Array::from(vec![10, 11])), + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1i32), Some(2), Some(3)]), + Some(vec![]), + ])), + Arc::new(StructArray::from(vec![ + ( + Arc::new(ArrowField::new("si", DataType::Int64, true)), + Arc::new(Int64Array::from(vec![4, 5])) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("sb", DataType::Utf8, true)), + Arc::new(StringArray::from(vec!["d", "e"])) as ArrayRef, + ), + ])), + ], + ) + .unwrap(); + file_writer.write(&[batch2]).await.unwrap(); + + file_writer.finish().await.unwrap(); + + let reader = FileReader::try_new(&store, &path, schema).await.unwrap(); + + let read_stats = reader.read_page_stats(&[0, 1, 5, 6]).await.unwrap(); + assert!(read_stats.is_some()); + let read_stats = read_stats.unwrap(); + + let expected_stats_schema = stats_schema([ + (0, DataType::Int64), + (1, DataType::Int64), + (5, DataType::Int64), + (6, DataType::Utf8), + ]); + + assert_eq!(read_stats.schema().as_ref(), &expected_stats_schema); + + let expected_stats = stats_batch(&[ + Stats { + field_id: 0, + null_counts: vec![0, 0], + min_values: Arc::new(Int64Array::from(vec![1, 5])), + max_values: Arc::new(Int64Array::from(vec![3, 6])), + }, + Stats { + field_id: 1, + null_counts: vec![0, 0], + min_values: Arc::new(Int64Array::from(vec![4, 10])), + max_values: Arc::new(Int64Array::from(vec![6, 11])), + }, + Stats { + field_id: 5, + null_counts: vec![0, 0], + min_values: Arc::new(Int64Array::from(vec![1, 4])), + max_values: Arc::new(Int64Array::from(vec![3, 5])), + }, + // FIXME: these max values shouldn't be incremented + // https://github.com/lancedb/lance/issues/1517 + Stats { + field_id: 6, + null_counts: vec![0, 0], + min_values: Arc::new(StringArray::from(vec!["a", "d"])), + max_values: Arc::new(StringArray::from(vec!["c", "e"])), + }, + ]); + + assert_eq!(read_stats, expected_stats); + } + + fn stats_schema(data_fields: impl IntoIterator) -> ArrowSchema { + let fields = data_fields + .into_iter() + .map(|(field_id, data_type)| { + Arc::new(ArrowField::new( + format!("{}", field_id), + DataType::Struct( + vec![ + Arc::new(ArrowField::new("null_count", DataType::Int64, false)), + Arc::new(ArrowField::new("min_value", data_type.clone(), true)), + Arc::new(ArrowField::new("max_value", data_type, true)), + ] + .into(), + ), + false, + )) + }) + .collect::>(); + ArrowSchema::new(fields) + } + + struct Stats { + field_id: i32, + null_counts: Vec, + min_values: ArrayRef, + max_values: ArrayRef, + } + + fn stats_batch(stats: &[Stats]) -> RecordBatch { + let schema = stats_schema( + stats + .iter() + .map(|s| (s.field_id, s.min_values.data_type().clone())), + ); + + let columns = stats + .iter() + .map(|s| { + let data_type = s.min_values.data_type().clone(); + let fields = vec![ + Arc::new(ArrowField::new("null_count", DataType::Int64, false)), + Arc::new(ArrowField::new("min_value", data_type.clone(), true)), + Arc::new(ArrowField::new("max_value", data_type, true)), + ]; + let arrays = vec![ + Arc::new(Int64Array::from(s.null_counts.clone())), + s.min_values.clone(), + s.max_values.clone(), + ]; + Arc::new(StructArray::new(fields.into(), arrays, None)) as ArrayRef + }) + .collect(); + + RecordBatch::try_new(Arc::new(schema), columns).unwrap() + } + + async fn read_file_as_one_batch( + object_store: &ObjectStore, + path: &Path, + schema: Schema, + ) -> RecordBatch { + let reader = FileReader::try_new(object_store, path, schema) + .await + .unwrap(); + let mut batches = vec![]; + for i in 0..reader.num_batches() { + batches.push( + reader + .read_batch(i as i32, .., reader.schema()) + .await + .unwrap(), + ); + } + let arrow_schema = Arc::new(reader.schema().into()); + concat_batches(&arrow_schema, &batches).unwrap() + } + + /// Test encoding arrays that share the same underneath buffer. + #[tokio::test] + async fn test_encode_slice() { + let store = ObjectStore::memory(); + let path = Path::from("/shared_slice"); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + let mut file_writer = FileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + + let array = Int32Array::from_iter_values(0..1000); + + for i in (0..1000).step_by(4) { + let data = array.slice(i, 4); + file_writer + .write(&[RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(data)]).unwrap()]) + .await + .unwrap(); + } + file_writer.finish().await.unwrap(); + assert!(store.size(&path).await.unwrap() < 2 * 8 * 1000); + + let batch = read_file_as_one_batch(&store, &path, schema).await; + assert_eq!(batch.column_by_name("i").unwrap().as_ref(), &array); + } + + #[tokio::test] + async fn test_write_schema_with_holes() { + let store = ObjectStore::memory(); + let path = Path::from("test"); + + let mut field0 = Field::try_from(&ArrowField::new("a", DataType::Int32, true)).unwrap(); + field0.set_id(-1, &mut 0); + assert_eq!(field0.id, 0); + let mut field2 = Field::try_from(&ArrowField::new("b", DataType::Int32, true)).unwrap(); + field2.set_id(-1, &mut 2); + assert_eq!(field2.id, 2); + // There is a hole at field id 1. + let schema = Schema { + fields: vec![field0, field2], + metadata: Default::default(), + }; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + let data = RecordBatch::try_new( + arrow_schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..10)), + Arc::new(Int32Array::from_iter_values(10..20)), + ], + ) + .unwrap(); + + let mut file_writer = FileWriter::::try_new( + &store, + &path, + schema.clone(), + &Default::default(), + ) + .await + .unwrap(); + file_writer.write(&[data]).await.unwrap(); + file_writer.finish().await.unwrap(); + + let page_table = file_writer.page_table; + assert!(page_table.get(0, 0).is_some()); + assert!(page_table.get(2, 0).is_some()); + } +} diff --git a/rust/lance-file/src/writer/statistics.rs b/rust/lance-file/src/previous/writer/statistics.rs similarity index 100% rename from rust/lance-file/src/writer/statistics.rs rename to rust/lance-file/src/previous/writer/statistics.rs diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index c70a30dd29f..bd491f923a8 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -1,1511 +1,2274 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! Lance Data File Reader - -// Standard -use std::ops::{Range, RangeTo}; -use std::sync::Arc; - -use arrow_arith::numeric::sub; -use arrow_array::{ - builder::PrimitiveBuilder, - cast::AsArray, - types::{Int32Type, Int64Type}, - ArrayRef, ArrowNativeTypeOp, ArrowNumericType, NullArray, OffsetSizeTrait, PrimitiveArray, - RecordBatch, StructArray, UInt32Array, +use std::{ + collections::{BTreeMap, BTreeSet}, + io::Cursor, + ops::Range, + pin::Pin, + sync::Arc, }; -use arrow_buffer::ArrowNativeType; -use arrow_schema::{DataType, FieldRef, Schema as ArrowSchema}; -use arrow_select::concat::{self, concat_batches}; -use async_recursion::async_recursion; -use deepsize::DeepSizeOf; -use futures::{stream, Future, FutureExt, StreamExt, TryStreamExt}; -use lance_arrow::*; -use lance_core::cache::{CacheKey, LanceCache}; -use lance_core::datatypes::{Field, Schema}; -use lance_core::{Error, Result}; -use lance_io::encodings::dictionary::DictionaryDecoder; -use lance_io::encodings::AsyncIndex; -use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter}; -use lance_io::traits::Reader; -use lance_io::utils::{ - read_fixed_stride_array, read_metadata_offset, read_struct, read_struct_from_buf, -}; -use lance_io::{object_store::ObjectStore, ReadBatchParams}; -use std::borrow::Cow; +use arrow_array::RecordBatchReader; +use arrow_schema::Schema as ArrowSchema; +use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; +use bytes::{Bytes, BytesMut}; +use deepsize::{Context, DeepSizeOf}; +use futures::{stream::BoxStream, Stream, StreamExt}; +use lance_encoding::{ + decoder::{ + schedule_and_decode, schedule_and_decode_blocking, ColumnInfo, DecoderConfig, + DecoderPlugins, FilterExpression, PageEncoding, PageInfo, ReadBatchTask, RequestedRows, + SchedulerDecoderConfig, + }, + encoder::EncodedBatch, + version::LanceFileVersion, + EncodingsIo, +}; +use log::debug; use object_store::path::Path; +use prost::{Message, Name}; use snafu::location; -use tracing::instrument; -use crate::format::metadata::Metadata; -use crate::page_table::{PageInfo, PageTable}; +use lance_core::{ + cache::LanceCache, + datatypes::{Field, Schema}, + Error, Result, +}; +use lance_encoding::format::pb as pbenc; +use lance_encoding::format::pb21 as pbenc21; +use lance_io::{ + scheduler::FileScheduler, + stream::{RecordBatchStream, RecordBatchStreamAdapter}, + ReadBatchParams, +}; -/// Lance File Reader. -/// -/// It reads arrow data from one data file. -#[derive(Clone, DeepSizeOf)] -pub struct FileReader { - pub object_reader: Arc, - metadata: Arc, - page_table: Arc, - schema: Schema, +use crate::{ + datatypes::{Fields, FieldsWithMeta}, + format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION}, + io::LanceEncodingsIo, + writer::PAGE_BUFFER_ALIGNMENT, +}; - /// The id of the fragment which this file belong to. - /// For simple file access, this can just be zero. - fragment_id: u64, +/// Default chunk size for reading large pages (8MiB) +/// Pages larger than this will be split into multiple chunks during read +pub const DEFAULT_READ_CHUNK_SIZE: u64 = 8 * 1024 * 1024; + +// For now, we don't use global buffers for anything other than schema. If we +// use these later we should make them lazily loaded and then cached once loaded. +// +// We store their position / length for debugging purposes +#[derive(Debug, DeepSizeOf)] +pub struct BufferDescriptor { + pub position: u64, + pub size: u64, +} - /// Page table for statistics - stats_page_table: Arc>, +/// Statistics summarize some of the file metadata for quick summary info +#[derive(Debug)] +pub struct FileStatistics { + /// Statistics about each of the columns in the file + pub columns: Vec, } -impl std::fmt::Debug for FileReader { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FileReader") - .field("fragment", &self.fragment_id) - .field("path", &self.object_reader.path()) - .finish() - } +/// Summary information describing a column +#[derive(Debug)] +pub struct ColumnStatistics { + /// The number of pages in the column + pub num_pages: usize, + /// The total number of data & metadata bytes in the column + /// + /// This is the compressed on-disk size + pub size_bytes: u64, } -// Generic cache key for string-based keys -struct StringCacheKey<'a, T> { - key: &'a str, - _phantom: std::marker::PhantomData, +// TODO: Caching +#[derive(Debug)] +pub struct CachedFileMetadata { + /// The schema of the file + pub file_schema: Arc, + /// The column metadatas + pub column_metadatas: Vec, + pub column_infos: Vec>, + /// The number of rows in the file + pub num_rows: u64, + pub file_buffers: Vec, + /// The number of bytes contained in the data page section of the file + pub num_data_bytes: u64, + /// The number of bytes contained in the column metadata (not including buffers + /// referenced by the metadata) + pub num_column_metadata_bytes: u64, + /// The number of bytes contained in global buffers + pub num_global_buffer_bytes: u64, + /// The number of bytes contained in the CMO and GBO tables + pub num_footer_bytes: u64, + pub major_version: u16, + pub minor_version: u16, } -impl<'a, T> StringCacheKey<'a, T> { - fn new(key: &'a str) -> Self { - Self { - key, - _phantom: std::marker::PhantomData, - } +impl DeepSizeOf for CachedFileMetadata { + // TODO: include size for `column_metadatas` and `column_infos`. + fn deep_size_of_children(&self, context: &mut Context) -> usize { + self.file_schema.deep_size_of_children(context) + + self + .file_buffers + .iter() + .map(|file_buffer| file_buffer.deep_size_of_children(context)) + .sum::() } } -impl CacheKey for StringCacheKey<'_, T> { - type ValueType = T; - - fn key(&self) -> Cow<'_, str> { - self.key.into() +impl CachedFileMetadata { + pub fn version(&self) -> LanceFileVersion { + match (self.major_version, self.minor_version) { + (0, 3) => LanceFileVersion::V2_0, + (2, 1) => LanceFileVersion::V2_1, + (2, 2) => LanceFileVersion::V2_2, + _ => panic!( + "Unsupported version: {}.{}", + self.major_version, self.minor_version + ), + } } } -impl FileReader { - /// Open file reader +/// Selecting columns from a lance file requires specifying both the +/// index of the column and the data type of the column +/// +/// Partly, this is because it is not strictly required that columns +/// be read into the same type. For example, a string column may be +/// read as a string, large_string or string_view type. +/// +/// A read will only succeed if the decoder for a column is capable +/// of decoding into the requested type. +/// +/// Note that this should generally be limited to different in-memory +/// representations of the same semantic type. An encoding could +/// theoretically support "casting" (e.g. int to string, etc.) but +/// there is little advantage in doing so here. +/// +/// Note: in order to specify a projection the user will need some way +/// to figure out the column indices. In the table format we do this +/// using field IDs and keeping track of the field id->column index mapping. +/// +/// If users are not using the table format then they will need to figure +/// out some way to do this themselves. +#[derive(Debug, Clone)] +pub struct ReaderProjection { + /// The data types (schema) of the selected columns. The names + /// of the schema are arbitrary and ignored. + pub schema: Arc, + /// The indices of the columns to load. /// - /// Open the file at the given path using the provided object store. + /// The content of this vector depends on the file version. /// - /// The passed fragment ID determines the first 32-bits of the row IDs. + /// In Lance File Version 2.0 we need ids for structural fields as + /// well as leaf fields: /// - /// If a manifest is passed in, it will be used to load the schema and dictionary. - /// This is typically done if the file is part of a dataset fragment. If no manifest - /// is passed in, then it is read from the file itself. + /// - Primitive: the index of the column in the schema + /// - List: the index of the list column in the schema + /// followed by the column indices of the children + /// - FixedSizeList (of primitive): the index of the column in the schema + /// (this case is not nested) + /// - FixedSizeList (of non-primitive): not yet implemented + /// - Dictionary: same as primitive + /// - Struct: the index of the struct column in the schema + /// followed by the column indices of the children /// - /// The session passed in is used to cache metadata about the file. If no session - /// is passed in, there will be no caching. - #[instrument(level = "debug", skip(object_store, schema, session))] - pub async fn try_new_with_fragment_id( - object_store: &ObjectStore, - path: &Path, - schema: Schema, - fragment_id: u32, - field_id_offset: i32, - max_field_id: i32, - session: Option<&LanceCache>, - ) -> Result { - let object_reader = object_store.open(path).await?; + /// In other words, this should be a DFS listing of the desired schema. + /// + /// In Lance File Version 2.1 we only need ids for leaf fields. Any structural + /// fields are completely transparent. + /// + /// For example, if the goal is to load: + /// + /// x: int32 + /// y: struct + /// z: list + /// + /// and the schema originally used to store the data was: + /// + /// a: struct + /// b: int64 + /// y: struct + /// z: list + /// + /// Then the column_indices should be: + /// + /// - 2.0: [1, 3, 4, 6, 7, 8] + /// - 2.1: [0, 2, 4, 5] + pub column_indices: Vec, +} - let metadata = Self::read_metadata(object_reader.as_ref(), session).await?; +impl ReaderProjection { + fn from_field_ids_helper<'a>( + file_version: LanceFileVersion, + fields: impl Iterator, + field_id_to_column_index: &BTreeMap, + column_indices: &mut Vec, + ) -> Result<()> { + for field in fields { + let is_structural = file_version >= LanceFileVersion::V2_1; + // In the 2.0 system we needed ids for intermediate fields. In 2.1+ + // we only need ids for leaf fields. + if !is_structural + || field.children.is_empty() + || field.is_blob() + || field.is_packed_struct() + { + if let Some(column_idx) = field_id_to_column_index.get(&(field.id as u32)).copied() + { + column_indices.push(column_idx); + } + } + // Don't recurse into children if the field is a blob or packed struct in 2.1 + if !is_structural || (!field.is_blob() && !field.is_packed_struct()) { + Self::from_field_ids_helper( + file_version, + field.children.iter(), + field_id_to_column_index, + column_indices, + )?; + } + } + Ok(()) + } - Self::try_new_from_reader( - path, - object_reader.into(), - Some(metadata), + /// Creates a projection using a mapping from field IDs to column indices + /// + /// You can obtain such a mapping when the file is written using the + /// [`crate::writer::FileWriter::field_id_to_column_indices`] method. + pub fn from_field_ids( + file_version: LanceFileVersion, + schema: &Schema, + field_id_to_column_index: &BTreeMap, + ) -> Result { + let mut column_indices = Vec::new(); + Self::from_field_ids_helper( + file_version, + schema.fields.iter(), + field_id_to_column_index, + &mut column_indices, + )?; + Ok(Self { + schema: Arc::new(schema.clone()), + column_indices, + }) + } + + /// Creates a projection that reads the entire file + /// + /// If the schema provided is not the schema of the entire file then + /// the projection will be invalid and the read will fail. + /// If the field is a `struct datatype` with `packed` set to true in the field metadata, + /// the whole struct has one column index. + /// To support nested `packed-struct encoding`, this method need to be further adjusted. + pub fn from_whole_schema(schema: &Schema, version: LanceFileVersion) -> Self { + let schema = Arc::new(schema.clone()); + let is_structural = version >= LanceFileVersion::V2_1; + let mut column_indices = vec![]; + let mut curr_column_idx = 0; + let mut packed_struct_fields_num = 0; + for field in schema.fields_pre_order() { + if packed_struct_fields_num > 0 { + packed_struct_fields_num -= 1; + continue; + } + if field.is_packed_struct() { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + packed_struct_fields_num = field.children.len(); + } else if field.children.is_empty() || !is_structural { + column_indices.push(curr_column_idx); + curr_column_idx += 1; + } + } + Self { schema, - fragment_id, - field_id_offset, - max_field_id, - session, - ) - .await + column_indices, + } } - #[allow(clippy::too_many_arguments)] - pub async fn try_new_from_reader( - path: &Path, - object_reader: Arc, - metadata: Option>, - schema: Schema, - fragment_id: u32, - field_id_offset: i32, - max_field_id: i32, - session: Option<&LanceCache>, + /// Creates a projection that reads the specified columns provided by name + /// + /// The syntax for column names is the same as [`lance_core::datatypes::Schema::project`] + /// + /// If the schema provided is not the schema of the entire file then + /// the projection will be invalid and the read will fail. + pub fn from_column_names( + file_version: LanceFileVersion, + schema: &Schema, + column_names: &[&str], ) -> Result { - let metadata = match metadata { - Some(metadata) => metadata, - None => Self::read_metadata(object_reader.as_ref(), session).await?, - }; + let field_id_to_column_index = schema + .fields_pre_order() + // In the 2.0 system we needed ids for intermediate fields. In 2.1+ + // we only need ids for leaf fields. + .filter(|field| { + file_version < LanceFileVersion::V2_1 || field.is_leaf() || field.is_packed_struct() + }) + .enumerate() + .map(|(idx, field)| (field.id as u32, idx as u32)) + .collect::>(); + let projected = schema.project(column_names)?; + let mut column_indices = Vec::new(); + Self::from_field_ids_helper( + file_version, + projected.fields.iter(), + &field_id_to_column_index, + &mut column_indices, + )?; + Ok(Self { + schema: Arc::new(projected), + column_indices, + }) + } +} - let page_table = async { - Self::load_from_cache(session, path.to_string(), |_| async { - PageTable::load( - object_reader.as_ref(), - metadata.page_table_position, - field_id_offset, - max_field_id, - metadata.num_batches() as i32, - ) - .await +/// File Reader Options that can control reading behaviors, such as whether to enable caching on repetition indices +#[derive(Clone, Debug)] +pub struct FileReaderOptions { + pub decoder_config: DecoderConfig, + /// Size of chunks when reading large pages. Pages larger than this + /// will be read in multiple chunks to control memory usage. + /// Default: 8MB (DEFAULT_READ_CHUNK_SIZE) + pub read_chunk_size: u64, +} + +impl Default for FileReaderOptions { + fn default() -> Self { + Self { + decoder_config: DecoderConfig::default(), + read_chunk_size: DEFAULT_READ_CHUNK_SIZE, + } + } +} + +#[derive(Debug)] +pub struct FileReader { + scheduler: Arc, + // The default projection to be applied to all reads + base_projection: ReaderProjection, + num_rows: u64, + metadata: Arc, + decoder_plugins: Arc, + cache: Arc, + options: FileReaderOptions, +} +#[derive(Debug)] +struct Footer { + #[allow(dead_code)] + column_meta_start: u64, + // We don't use this today because we always load metadata for every column + // and don't yet support "metadata projection" + #[allow(dead_code)] + column_meta_offsets_start: u64, + global_buff_offsets_start: u64, + num_global_buffers: u32, + num_columns: u32, + major_version: u16, + minor_version: u16, +} + +const FOOTER_LEN: usize = 40; + +impl FileReader { + pub fn with_scheduler(&self, scheduler: Arc) -> Self { + Self { + scheduler, + base_projection: self.base_projection.clone(), + cache: self.cache.clone(), + decoder_plugins: self.decoder_plugins.clone(), + metadata: self.metadata.clone(), + options: self.options.clone(), + num_rows: self.num_rows, + } + } + + pub fn num_rows(&self) -> u64 { + self.num_rows + } + + pub fn metadata(&self) -> &Arc { + &self.metadata + } + + pub fn file_statistics(&self) -> FileStatistics { + let column_metadatas = &self.metadata().column_metadatas; + + let column_stats = column_metadatas + .iter() + .map(|col_metadata| { + let num_pages = col_metadata.pages.len(); + let size_bytes = col_metadata + .pages + .iter() + .map(|page| page.buffer_sizes.iter().sum::()) + .sum::(); + ColumnStatistics { + num_pages, + size_bytes, + } }) - .await - }; + .collect(); - let stats_page_table = Self::read_stats_page_table(object_reader.as_ref(), session); + FileStatistics { + columns: column_stats, + } + } - // Can concurrently load page tables - let (page_table, stats_page_table) = futures::try_join!(page_table, stats_page_table)?; + pub async fn read_global_buffer(&self, index: u32) -> Result { + let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len()), location!()))?; + self.scheduler + .submit_single( + buffer_desc.position..buffer_desc.position + buffer_desc.size, + 0, + ) + .await + } - Ok(Self { - object_reader, - metadata, - schema, - page_table, - fragment_id: fragment_id as u64, - stats_page_table, - }) + async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> { + let file_size = scheduler.reader().size().await? as u64; + let begin = if file_size < scheduler.reader().block_size() as u64 { + 0 + } else { + file_size - scheduler.reader().block_size() as u64 + }; + let tail_bytes = scheduler.submit_single(begin..file_size, 0).await?; + Ok((tail_bytes, file_size)) } - pub async fn read_metadata( - object_reader: &dyn Reader, - cache: Option<&LanceCache>, - ) -> Result> { - Self::load_from_cache(cache, object_reader.path().to_string(), |_| async { - let file_size = object_reader.size().await?; - let begin = if file_size < object_reader.block_size() { - 0 - } else { - file_size - object_reader.block_size() - }; - let tail_bytes = object_reader.get_range(begin..file_size).await?; - let metadata_pos = read_metadata_offset(&tail_bytes)?; - - let metadata: Metadata = if metadata_pos < file_size - tail_bytes.len() { - // We have not read the metadata bytes yet. - read_struct(object_reader, metadata_pos).await? - } else { - let offset = tail_bytes.len() - (file_size - metadata_pos); - read_struct_from_buf(&tail_bytes.slice(offset..))? - }; - Ok(metadata) + // Checks to make sure the footer is written correctly and returns the + // position of the file descriptor (which comes from the footer) + fn decode_footer(footer_bytes: &Bytes) -> Result