diff --git a/rust/arrow/src/bitmap.rs b/rust/arrow/src/bitmap.rs index cd05b595531..f82f773e6ba 100644 --- a/rust/arrow/src/bitmap.rs +++ b/rust/arrow/src/bitmap.rs @@ -55,6 +55,10 @@ impl Bitmap { assert!(i < (self.bits.len() << 3)); unsafe { bit_util::get_bit_raw(self.bits.raw_data(), i) } } + + pub fn to_buffer(self) -> Buffer { + self.bits + } } impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap { diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs new file mode 100644 index 00000000000..3a4a7864cbf --- /dev/null +++ b/rust/parquet/src/arrow/array_reader.rs @@ -0,0 +1,1044 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; +use std::collections::{HashMap, HashSet}; +use std::marker::PhantomData; +use std::mem::size_of; +use std::mem::transmute; +use std::rc::Rc; +use std::result::Result::Ok; +use std::slice::from_raw_parts_mut; +use std::sync::Arc; +use std::vec::Vec; + +use arrow::array::{ + ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, + Int16BufferBuilder, StructArray, +}; +use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::datatypes::{DataType as ArrowType, Field}; + +use crate::arrow::converter::{ + BooleanConverter, Converter, Float32Converter, Float64Converter, Int16Converter, + Int32Converter, Int64Converter, Int8Converter, UInt16Converter, UInt32Converter, + UInt64Converter, UInt8Converter, +}; +use crate::arrow::record_reader::RecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::{Repetition, Type as PhysicalType}; +use crate::column::page::PageIterator; +use crate::data_type::{ + BoolType, ByteArrayType, DataType, DoubleType, FloatType, Int32Type, Int64Type, + Int96Type, +}; +use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; +use crate::file::reader::{FilePageIterator, FileReader}; +use crate::schema::types::{ + ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, +}; +use crate::schema::visitor::TypeVisitor; + +/// Array reader reads parquet data into arrow array. +pub trait ArrayReader { + /// Returns the arrow type of this array reader. + fn get_data_type(&self) -> &ArrowType; + + /// Reads at most `batch_size` records into an arrow array and return it. + fn next_batch(&mut self, batch_size: usize) -> Result; + + /// Returns the definition levels of data from last call of `next_batch`. + /// The result is used by parent array reader to calculate its own definition + /// levels and repetition levels, so that its parent can calculate null bitmap. + fn get_def_levels(&self) -> Option<&[i16]>; + + /// Return the repetition levels of data from last call of `next_batch`. + /// The result is used by parent array reader to calculate its own definition + /// levels and repetition levels, so that its parent can calculate null bitmap. + fn get_rep_levels(&self) -> Option<&[i16]>; +} + +/// Primitive array readers are leaves of array reader tree. They accept page iterator +/// and read them into primitive arrays. +pub struct PrimitiveArrayReader { + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: RecordReader, + _type_marker: PhantomData, +} + +impl PrimitiveArrayReader { + /// Construct primitive array reader. + pub fn new( + mut pages: Box, + column_desc: ColumnDescPtr, + ) -> Result { + let data_type = parquet_to_arrow_field(column_desc.clone())? + .data_type() + .clone(); + + let mut record_reader = RecordReader::::new(column_desc.clone()); + record_reader.set_page_reader( + pages + .next() + .ok_or_else(|| general_err!("Can't build array without pages!"))??, + )?; + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + _type_marker: PhantomData, + }) + } +} + +/// Implementation of primitive array reader. +impl ArrayReader for PrimitiveArrayReader { + /// Returns data type of primitive array. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Reads at most `batch_size` records into array. + fn next_batch(&mut self, batch_size: usize) -> Result { + let mut records_read = 0usize; + while records_read < batch_size { + let records_to_read = batch_size - records_read; + + let records_read_once = self.record_reader.read_records(records_to_read)?; + records_read = records_read + records_read_once; + + // Record reader exhausted + if records_read_once < records_to_read { + if let Some(page_reader) = self.pages.next() { + // Read from new page reader + self.record_reader.set_page_reader(page_reader?)?; + } else { + // Page reader also exhausted + break; + } + } + } + + // convert to arrays + let array = match (&self.data_type, T::get_physical_type()) { + (ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe { + BooleanConverter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int8, PhysicalType::INT32) => unsafe { + Int8Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int16, PhysicalType::INT32) => unsafe { + Int16Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int32, PhysicalType::INT32) => unsafe { + Int32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt8, PhysicalType::INT32) => unsafe { + UInt8Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt16, PhysicalType::INT32) => unsafe { + UInt16Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt32, PhysicalType::INT32) => unsafe { + UInt32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Int64, PhysicalType::INT64) => unsafe { + Int64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::UInt64, PhysicalType::INT64) => unsafe { + UInt64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Float32, PhysicalType::FLOAT) => unsafe { + Float32Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (ArrowType::Float64, PhysicalType::DOUBLE) => unsafe { + Float64Converter::convert(transmute::< + &mut RecordReader, + &mut RecordReader, + >(&mut self.record_reader)) + }, + (arrow_type, _) => Err(general_err!( + "Reading {:?} type from parquet is not supported yet.", + arrow_type + )), + }?; + + // save definition and repetition buffers + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } +} + +/// Implementation of struct array reader. +struct StructArrayReader { + children: Vec>, + data_type: ArrowType, + struct_def_level: i16, + struct_rep_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, +} + +impl StructArrayReader { + /// Construct struct array reader. + pub fn new( + data_type: ArrowType, + children: Vec>, + def_level: i16, + rep_level: i16, + ) -> Self { + Self { + data_type, + children, + struct_def_level: def_level, + struct_rep_level: rep_level, + def_level_buffer: None, + rep_level_buffer: None, + } + } +} + +impl ArrayReader for StructArrayReader { + /// Returns data type. + /// This must be a struct. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Read `batch_size` struct records. + /// + /// Definition levels of struct array is calculated as following: + /// ```ignore + /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., + /// childn_def_levels[i]); + /// ``` + /// + /// Repetition levels of struct array is calculated as following: + /// ```ignore + /// rep_levels[i] = child1_rep_levels[i]; + /// ``` + /// + /// The null bitmap of struct array is calculated from def_levels: + /// ```ignore + /// null_bitmap[i] = (def_levels[i] >= self.def_level); + /// ``` + fn next_batch(&mut self, batch_size: usize) -> Result { + if self.children.len() == 0 { + self.def_level_buffer = None; + self.rep_level_buffer = None; + return Ok(Arc::new(StructArray::from(Vec::new()))); + } + + let children_array = self + .children + .iter_mut() + .map(|reader| reader.next_batch(batch_size)) + .try_fold( + Vec::new(), + |mut result, child_array| -> Result> { + result.push(child_array?); + Ok(result) + }, + )?; + + // check that array child data has same size + let children_array_len = + children_array.first().map(|arr| arr.len()).ok_or_else(|| { + general_err!("Struct array reader should have at least one child!") + })?; + + let all_children_len_eq = children_array + .iter() + .all(|arr| arr.len() == children_array_len); + if !all_children_len_eq { + return Err(general_err!("Not all children array length are the same!")); + } + + // calculate struct def level data + let buffer_size = children_array_len * size_of::(); + let mut def_level_data_buffer = MutableBuffer::new(buffer_size); + def_level_data_buffer.resize(buffer_size)?; + + let def_level_data = unsafe { + let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); + from_raw_parts_mut(ptr, children_array_len) + }; + + def_level_data + .iter_mut() + .for_each(|v| *v = self.struct_def_level); + + for child in &self.children { + if let Some(current_child_def_levels) = child.get_def_levels() { + if current_child_def_levels.len() != children_array_len { + return Err(general_err!("Child array length are not equal!")); + } else { + for i in 0..children_array_len { + def_level_data[i] = + min(def_level_data[i], current_child_def_levels[i]); + } + } + } + } + + // calculate bitmap for current array + let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); + let mut null_count = 0; + for def_level in def_level_data { + let not_null = *def_level >= self.struct_def_level; + if !not_null { + null_count += 1; + } + bitmap_builder.append(not_null)?; + } + + // Now we can build array data + let array_data = ArrayDataBuilder::new(self.data_type.clone()) + .len(children_array_len) + .null_count(null_count) + .null_bit_buffer(bitmap_builder.finish()) + .child_data( + children_array + .iter() + .map(|x| x.data()) + .collect::>(), + ) + .build(); + + // calculate struct rep level data, since struct doesn't add to repetition + // levels, here we just need to keep repetition levels of first array + // TODO: Verify that all children array reader has same repetition levels + let rep_level_data = self + .children + .first() + .ok_or_else(|| { + general_err!("Struct array reader should have at least one child!") + })? + .get_rep_levels() + .map(|data| -> Result { + let mut buffer = Int16BufferBuilder::new(children_array_len); + buffer.append_slice(data)?; + Ok(buffer.finish()) + }) + .transpose()?; + + self.def_level_buffer = Some(def_level_data_buffer.freeze()); + self.rep_level_buffer = rep_level_data; + Ok(Arc::new(StructArray::from(array_data))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer.as_ref().map(|buf| buf.typed_data()) + } +} + +/// Create array reader from parquet schema, column indices, and parquet file reader. +pub fn build_array_reader( + parquet_schema: SchemaDescPtr, + column_indices: T, + file_reader: Rc, +) -> Result> +where + T: IntoIterator, +{ + let mut base_nodes = Vec::new(); + let mut base_nodes_set = HashSet::new(); + let mut leaves = HashMap::<*const Type, usize>::new(); + + for c in column_indices { + let column = parquet_schema.column(c).self_type() as *const Type; + let root = parquet_schema.get_column_root_ptr(c); + let root_raw_ptr = root.clone().as_ref() as *const Type; + + leaves.insert(column, c); + if !base_nodes_set.contains(&root_raw_ptr) { + base_nodes.push(root); + base_nodes_set.insert(root_raw_ptr); + } + } + + if leaves.is_empty() { + return Err(general_err!("Can't build array reader without columns!")); + } + + ArrayReaderBuilder::new( + Rc::new(parquet_schema.root_schema().clone()), + Rc::new(leaves), + file_reader, + ) + .build_array_reader() +} + +/// Used to build array reader. +struct ArrayReaderBuilder { + root_schema: TypePtr, + // Key: columns that need to be included in final array builder + // Value: column index in schema + columns_included: Rc>, + file_reader: Rc, +} + +/// Used in type visitor. +#[derive(Clone)] +struct ArrayReaderBuilderContext { + def_level: i16, + rep_level: i16, + path: ColumnPath, +} + +impl Default for ArrayReaderBuilderContext { + fn default() -> Self { + Self { + def_level: 0i16, + rep_level: 0i16, + path: ColumnPath::new(Vec::new()), + } + } +} + +/// Create array reader by visiting schema. +impl<'a> TypeVisitor>, &'a ArrayReaderBuilderContext> + for ArrayReaderBuilder +{ + /// Build array reader for primitive type. + /// Currently we don't have a list reader implementation, so repeated type is not + /// supported yet. + fn visit_primitive( + &mut self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + if self.is_included(cur_type.as_ref()) { + let mut new_context = context.clone(); + new_context.path.append(vec![cur_type.name().to_string()]); + + match cur_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + + let reader = + self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; + + if cur_type.get_basic_info().repetition() == Repetition::REPEATED { + Err(ArrowError( + "Reading repeated field is not supported yet!".to_string(), + )) + } else { + Ok(Some(reader)) + } + } else { + Ok(None) + } + } + + /// Build array reader for struct type. + fn visit_struct( + &mut self, + cur_type: Rc, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + let mut new_context = context.clone(); + new_context.path.append(vec![cur_type.name().to_string()]); + + if cur_type.get_basic_info().has_repetition() { + match cur_type.get_basic_info().repetition() { + Repetition::REPEATED => { + new_context.def_level += 1; + new_context.rep_level += 1; + } + Repetition::OPTIONAL => { + new_context.def_level += 1; + } + _ => (), + } + } + + if let Some(reader) = + self.build_for_struct_type_inner(cur_type.clone(), &new_context)? + { + if cur_type.get_basic_info().has_repetition() + && cur_type.get_basic_info().repetition() == Repetition::REPEATED + { + Err(ArrowError( + "Reading repeated field is not supported yet!".to_string(), + )) + } else { + Ok(Some(reader)) + } + } else { + Ok(None) + } + } + + /// Build array reader for map type. + /// Currently this is not supported. + fn visit_map( + &mut self, + _cur_type: Rc, + _context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + Err(ArrowError( + "Reading parquet map array into arrow is not supported yet!".to_string(), + )) + } + + /// Build array reader for list type. + /// Currently this is not supported. + fn visit_list_with_item( + &mut self, + _list_type: Rc, + _item_type: &Type, + _context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + Err(ArrowError( + "Reading parquet list array into arrow is not supported yet!".to_string(), + )) + } +} + +impl<'a> ArrayReaderBuilder { + /// Construct array reader builder. + fn new( + root_schema: TypePtr, + columns_included: Rc>, + file_reader: Rc, + ) -> Self { + Self { + root_schema, + columns_included, + file_reader, + } + } + + /// Main entry point. + fn build_array_reader(&mut self) -> Result> { + let context = ArrayReaderBuilderContext::default(); + + self.visit_struct(self.root_schema.clone(), &context) + .and_then(|reader_opt| { + reader_opt.ok_or_else(|| general_err!("Failed to build array reader!")) + }) + } + + // Utility functions + + /// Check whether one column in included in this array reader builder. + fn is_included(&self, t: &Type) -> bool { + self.columns_included.contains_key(&(t as *const Type)) + } + + /// Creates primitive array reader for each primitive type. + fn build_for_primitive_type_inner( + &self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result> { + let column_desc = Rc::new(ColumnDescriptor::new( + cur_type.clone(), + Some(self.root_schema.clone()), + context.def_level, + context.rep_level, + context.path.clone(), + )); + let page_iterator = Box::new(FilePageIterator::new( + self.columns_included[&(cur_type.as_ref() as *const Type)], + self.file_reader.clone(), + )?); + + match cur_type.get_physical_type() { + PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::::new( + page_iterator, + column_desc, + )?)), + PhysicalType::DOUBLE => Ok(Box::new( + PrimitiveArrayReader::::new(page_iterator, column_desc)?, + )), + PhysicalType::BYTE_ARRAY => Ok(Box::new(PrimitiveArrayReader::< + ByteArrayType, + >::new( + page_iterator, column_desc + )?)), + other => Err(ArrowError(format!( + "Unable to create primite array reader for parquet physical type {}", + other + ))), + } + } + + /// Constructs struct array reader without considering repetition. + fn build_for_struct_type_inner( + &mut self, + cur_type: TypePtr, + context: &'a ArrayReaderBuilderContext, + ) -> Result>> { + let mut fields = Vec::with_capacity(cur_type.get_fields().len()); + let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); + + for child in cur_type.get_fields() { + if let Some(child_reader) = self.dispatch(child.clone(), context)? { + fields.push(Field::new( + child.name(), + child_reader.get_data_type().clone(), + child.is_optional(), + )); + children_reader.push(child_reader); + } + } + + if !fields.is_empty() { + let arrow_type = ArrowType::Struct(fields); + Ok(Some(Box::new(StructArrayReader::new( + arrow_type, + children_reader, + context.def_level, + context.rep_level, + )))) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use crate::arrow::array_reader::{ + build_array_reader, ArrayReader, PrimitiveArrayReader, StructArrayReader, + }; + use crate::basic::Encoding; + use crate::column::page::Page; + use crate::data_type::{DataType, Int32Type}; + use crate::errors::Result; + use crate::file::reader::{FileReader, SerializedFileReader}; + use crate::schema::parser::parse_message_type; + use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; + use crate::util::test_common::page_util::InMemoryPageIterator; + use crate::util::test_common::{get_test_file, make_pages}; + use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; + use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32}; + use rand::distributions::range::SampleRange; + use std::collections::VecDeque; + use std::rc::Rc; + use std::sync::Arc; + + fn make_column_chuncks( + column_desc: ColumnDescPtr, + encoding: Encoding, + num_levels: usize, + min_value: T::T, + max_value: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + page_lists: &mut Vec>, + use_v2: bool, + num_chuncks: usize, + ) where + T::T: PartialOrd + SampleRange + Copy, + { + for _i in 0..num_chuncks { + let mut pages = VecDeque::new(); + let mut data = Vec::new(); + let mut page_def_levels = Vec::new(); + let mut page_rep_levels = Vec::new(); + + make_pages::( + column_desc.clone(), + encoding, + 1, + num_levels, + min_value, + max_value, + &mut page_def_levels, + &mut page_rep_levels, + &mut data, + &mut pages, + use_v2, + ); + + def_levels.append(&mut page_def_levels); + rep_levels.append(&mut page_rep_levels); + values.append(&mut data); + page_lists.push(Vec::from(pages)); + } + } + + #[test] + fn test_primitive_array_reader_data() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chuncks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new( + schema.clone(), + column_desc.clone(), + page_lists, + ); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + // Read first 50 values, which are all from the first column chunck + let array = array_reader.next_batch(50).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[0..50].iter().cloned().collect::>() + ), + array + ); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[50..150].iter().cloned().collect::>() + ), + array + ); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + + assert_eq!( + &PrimitiveArray::::from( + data[150..200].iter().cloned().collect::>() + ), + array + ); + } + } + + #[test] + fn test_primitive_array_reader_def_and_rep_levels() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL INT32 leaf; + } + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chuncks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut def_levels, + &mut rep_levels, + &mut Vec::new(), + &mut page_lists, + true, + 2, + ); + + let page_iterator = InMemoryPageIterator::new( + schema.clone(), + column_desc.clone(), + page_lists, + ); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc.clone(), + ) + .unwrap(); + + let mut accu_len: usize = 0; + + // Read first 50 values, which are all from the first column chunck + let array = array_reader.next_batch(50).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + } + + /// Array reader for test. + struct InMemoryArrayReader { + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + } + + impl InMemoryArrayReader { + pub fn new( + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + ) -> Self { + Self { + data_type, + array, + def_levels, + rep_levels, + } + } + } + + impl ArrayReader for InMemoryArrayReader { + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, _batch_size: usize) -> Result { + Ok(self.array.clone()) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels.as_ref().map(|v| v.as_slice()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels.as_ref().map(|v| v.as_slice()) + } + } + + #[test] + fn test_struct_array_reader() { + let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5])); + let array_reader_1 = InMemoryArrayReader::new( + ArrowType::Int32, + array_1.clone(), + Some(vec![0, 1, 2, 3, 1]), + Some(vec![1, 1, 1, 1, 1]), + ); + + let array_2 = Arc::new(PrimitiveArray::::from(vec![5, 4, 3, 2, 1])); + let array_reader_2 = InMemoryArrayReader::new( + ArrowType::Int32, + array_2.clone(), + Some(vec![0, 1, 3, 1, 2]), + Some(vec![1, 1, 1, 1, 1]), + ); + + let struct_type = ArrowType::Struct(vec![ + Field::new("f1", array_1.data_type().clone(), true), + Field::new("f2", array_2.data_type().clone(), true), + ]); + + let mut struct_array_reader = StructArrayReader::new( + struct_type, + vec![Box::new(array_reader_1), Box::new(array_reader_2)], + 1, + 1, + ); + + let struct_array = struct_array_reader.next_batch(5).unwrap(); + let struct_array = struct_array.as_any().downcast_ref::().unwrap(); + + assert_eq!(5, struct_array.len()); + assert_eq!( + vec![true, false, false, false, false], + (0..5) + .map(|idx| struct_array.data_ref().is_null(idx)) + .collect::>() + ); + assert_eq!( + Some(vec![0, 1, 1, 1, 1].as_slice()), + struct_array_reader.get_def_levels() + ); + assert_eq!( + Some(vec![1, 1, 1, 1, 1].as_slice()), + struct_array_reader.get_rep_levels() + ); + } + + #[test] + fn test_create_array_reader() { + let file = get_test_file("nulls.snappy.parquet"); + let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); + + let array_reader = build_array_reader( + file_reader.metadata().file_metadata().schema_descr_ptr(), + vec![0usize].into_iter(), + file_reader, + ) + .unwrap(); + + // Create arrow types + let arrow_type = ArrowType::Struct(vec![Field::new( + "b_struct", + ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), + true, + )]); + + assert_eq!(array_reader.get_data_type(), &arrow_type); + } +} diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 263e78a7fe6..6056271a759 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -67,9 +67,9 @@ where let mut array_data = ArrayDataBuilder::new(ArrowSourceType::get_data_type()) .len(record_reader.num_values()) - .add_buffer(record_data); + .add_buffer(record_data?); - if let Some(b) = record_reader.consume_bitmap_buffer() { + if let Some(b) = record_reader.consume_bitmap_buffer()? { array_data = array_data.null_bit_buffer(b); } diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs index af1d00c91b2..a2c6031cfb8 100644 --- a/rust/parquet/src/arrow/mod.rs +++ b/rust/parquet/src/arrow/mod.rs @@ -20,6 +20,7 @@ //! //! This mod provides API for converting between arrow and parquet. +pub(in crate::arrow) mod array_reader; pub(in crate::arrow) mod converter; pub(in crate::arrow) mod record_reader; pub mod schema; diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 803f4a0d2f5..de42ae7f953 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -16,9 +16,9 @@ // under the License. use std::cmp::{max, min}; -use std::mem::replace; use std::mem::size_of; use std::mem::transmute; +use std::mem::{replace, swap}; use std::slice; use crate::column::{page::PageReader, reader::ColumnReaderImpl}; @@ -187,46 +187,138 @@ impl RecordReader { } /// Returns definition level data. - pub fn consume_def_levels(&mut self) -> Option { - let empty_def_buffer = if self.column_desc.max_def_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) + /// The implementation has side effects. It will create a new buffer to hold those + /// definition level values that have already been read into memory but not counted + /// as record values, e.g. those from `self.num_values` to `self.values_written`. + pub fn consume_def_levels(&mut self) -> Result> { + let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new( + size_of::() * max(MIN_BATCH_SIZE, num_left_values), + ); + new_buffer.resize(num_left_values * size_of::())?; + + let new_def_levels = FatPtr::::with_offset(&new_buffer, 0); + let new_def_levels = new_def_levels.to_slice_mut(); + let left_def_levels = + FatPtr::::with_offset(&def_levels_buf, self.num_values); + let left_def_levels = left_def_levels.to_slice(); + + new_def_levels[0..num_left_values] + .copy_from_slice(&left_def_levels[0..num_left_values]); + + def_levels_buf.resize(self.num_values * size_of::())?; + Some(new_buffer) } else { None }; - replace(&mut self.def_levels, empty_def_buffer).map(|x| x.freeze()) + Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.freeze())) } - /// Return repetition level data - pub fn consume_rep_levels(&mut self) -> Option { - let empty_def_buffer = if self.column_desc.max_rep_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) + /// Return repetition level data. + /// The side effect is similar to `consume_def_levels`. + pub fn consume_rep_levels(&mut self) -> Result> { + // TODO: Optimize to reduce the copy + let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new( + size_of::() * max(MIN_BATCH_SIZE, num_left_values), + ); + new_buffer.resize(num_left_values * size_of::())?; + + let new_rep_levels = FatPtr::::with_offset(&new_buffer, 0); + let new_rep_levels = new_rep_levels.to_slice_mut(); + let left_rep_levels = + FatPtr::::with_offset(&rep_levels_buf, self.num_values); + let left_rep_levels = left_rep_levels.to_slice(); + + new_rep_levels[0..num_left_values] + .copy_from_slice(&left_rep_levels[0..num_left_values]); + + rep_levels_buf.resize(self.num_values * size_of::())?; + + Some(new_buffer) } else { None }; - replace(&mut self.rep_levels, empty_def_buffer).map(|x| x.freeze()) + Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.freeze())) } /// Returns currently stored buffer data. - pub fn consume_record_data(&mut self) -> Buffer { - replace(&mut self.records, MutableBuffer::new(MIN_BATCH_SIZE)).freeze() + /// The side effect is similar to `consume_def_levels`. + pub fn consume_record_data(&mut self) -> Result { + // TODO: Optimize to reduce the copy + let num_left_values = self.values_written - self.num_values; + let mut new_buffer = MutableBuffer::new(max(MIN_BATCH_SIZE, num_left_values)); + new_buffer.resize(num_left_values * T::get_type_size())?; + + let new_records = + FatPtr::::with_offset_and_size(&new_buffer, 0, T::get_type_size()); + let new_records = new_records.to_slice_mut(); + let left_records = FatPtr::::with_offset_and_size( + &self.records, + self.num_values, + T::get_type_size(), + ); + let left_records = left_records.to_slice_mut(); + + for idx in 0..num_left_values { + swap(&mut new_records[idx], &mut left_records[idx]); + } + + self.records.resize(self.num_values * T::get_type_size())?; + + Ok(replace(&mut self.records, new_buffer).freeze()) } - pub fn consume_bitmap_buffer(&mut self) -> Option { - let bitmap_builder = if self.column_desc.max_def_level() > 0 { - Some(BooleanBufferBuilder::new(MIN_BATCH_SIZE)) + /// Returns currently stored null bitmap data. + /// The side effect is similar to `consume_def_levels`. + pub fn consume_bitmap_buffer(&mut self) -> Result> { + // TODO: Optimize to reduce the copy + if self.column_desc.max_def_level() > 0 { + assert!(self.null_bitmap.is_some()); + let num_left_values = self.values_written - self.num_values; + let new_bitmap_builder = Some(BooleanBufferBuilder::new(max( + MIN_BATCH_SIZE, + num_left_values, + ))); + + let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder) + .map(|mut builder| builder.finish()) + .unwrap(); + + let old_bitmap = Bitmap::from(old_bitmap); + + for i in self.num_values..self.values_written { + self.null_bitmap + .as_mut() + .unwrap() + .append(old_bitmap.is_set(i))?; + } + + Ok(Some(old_bitmap.to_buffer())) } else { - None - }; + Ok(None) + } + } - replace(&mut self.null_bitmap, bitmap_builder).map(|mut builder| builder.finish()) + /// Reset state of record reader. + /// Should be called after consuming data, e.g. `consume_rep_levels`, + /// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`. + pub fn reset(&mut self) { + self.values_written = self.values_written - self.num_values; + self.num_records = 0; + self.num_values = 0; + self.values_seen = 0; + self.in_middle_of_record = false; } /// Returns bitmap data. - pub fn consume_bitmap(&mut self) -> Option { + pub fn consume_bitmap(&mut self) -> Result> { self.consume_bitmap_buffer() - .map(|buffer| Bitmap::from(buffer)) + .map(|buffer| buffer.map(|b| Bitmap::from(b))) } /// Try to read one batch of data. @@ -488,9 +580,12 @@ mod tests { let mut bb = Int32BufferBuilder::new(7); bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); - assert_eq!(None, record_reader.consume_def_levels()); - assert_eq!(None, record_reader.consume_bitmap()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); + assert_eq!(None, record_reader.consume_def_levels().unwrap()); + assert_eq!(None, record_reader.consume_bitmap().unwrap()); } #[test] @@ -573,7 +668,10 @@ mod tests { let mut bb = Int32BufferBuilder::new(7); bb.append_slice(&[0, 7, 0, 6, 3, 0, 8]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); // Verify result def levels let mut bb = Int16BufferBuilder::new(7); @@ -582,7 +680,7 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels() + record_reader.consume_def_levels().unwrap() ); // Verify bitmap @@ -590,7 +688,10 @@ mod tests { bb.append_slice(&[false, true, false, true, true, false, true]) .unwrap(); let expected_bitmap = Bitmap::from(bb.finish()); - assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); + assert_eq!( + Some(expected_bitmap), + record_reader.consume_bitmap().unwrap() + ); } #[test] @@ -677,7 +778,10 @@ mod tests { let mut bb = Int32BufferBuilder::new(9); bb.append_slice(&[4, 0, 0, 7, 6, 3, 2, 8, 9]).unwrap(); let expected_buffer = bb.finish(); - assert_eq!(expected_buffer, record_reader.consume_record_data()); + assert_eq!( + expected_buffer, + record_reader.consume_record_data().unwrap() + ); // Verify result def levels let mut bb = Int16BufferBuilder::new(9); @@ -686,7 +790,7 @@ mod tests { let expected_def_levels = bb.finish(); assert_eq!( Some(expected_def_levels), - record_reader.consume_def_levels() + record_reader.consume_def_levels().unwrap() ); // Verify bitmap @@ -694,7 +798,10 @@ mod tests { bb.append_slice(&[true, false, false, true, true, true, true, true, true]) .unwrap(); let expected_bitmap = Bitmap::from(bb.finish()); - assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap()); + assert_eq!( + Some(expected_bitmap), + record_reader.consume_bitmap().unwrap() + ); } #[test] diff --git a/rust/parquet/src/column/reader.rs b/rust/parquet/src/column/reader.rs index 8f7c7a3061a..cc3c26f72cd 100644 --- a/rust/parquet/src/column/reader.rs +++ b/rust/parquet/src/column/reader.rs @@ -517,13 +517,8 @@ mod tests { use crate::basic::Type as PhysicalType; use crate::column::page::Page; - use crate::encodings::encoding::{DictEncoder, Encoder}; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; - use crate::util::{ - memory::MemTracker, - test_common::page_util::{DataPageBuilder, DataPageBuilderImpl}, - test_common::random_numbers_range, - }; + use crate::util::test_common::make_pages; const NUM_LEVELS: usize = 128; const NUM_PAGES: usize = 2; @@ -1383,94 +1378,4 @@ mod tests { Ok(self.pages.next()) } } - - fn make_pages( - desc: ColumnDescPtr, - encoding: Encoding, - num_pages: usize, - levels_per_page: usize, - min: T::T, - max: T::T, - def_levels: &mut Vec, - rep_levels: &mut Vec, - values: &mut Vec, - pages: &mut VecDeque, - use_v2: bool, - ) where - T::T: PartialOrd + SampleRange + Copy, - { - let mut num_values = 0; - let max_def_level = desc.max_def_level(); - let max_rep_level = desc.max_rep_level(); - - let mem_tracker = Rc::new(MemTracker::new()); - let mut dict_encoder = DictEncoder::::new(desc.clone(), mem_tracker); - - for i in 0..num_pages { - let mut num_values_cur_page = 0; - let level_range = i * levels_per_page..(i + 1) * levels_per_page; - - if max_def_level > 0 { - random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels); - for dl in &def_levels[level_range.clone()] { - if *dl == max_def_level { - num_values_cur_page += 1; - } - } - } else { - num_values_cur_page = levels_per_page; - } - if max_rep_level > 0 { - random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels); - } - random_numbers_range(num_values_cur_page, min, max, values); - - // Generate the current page - - let mut pb = DataPageBuilderImpl::new( - desc.clone(), - num_values_cur_page as u32, - use_v2, - ); - if max_rep_level > 0 { - pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]); - } - if max_def_level > 0 { - pb.add_def_levels(max_def_level, &def_levels[level_range]); - } - - let value_range = num_values..num_values + num_values_cur_page; - match encoding { - Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { - let _ = dict_encoder.put(&values[value_range.clone()]); - let indices = dict_encoder - .write_indices() - .expect("write_indices() should be OK"); - pb.add_indices(indices); - } - Encoding::PLAIN => { - pb.add_values::(encoding, &values[value_range]); - } - enc @ _ => panic!("Unexpected encoding {}", enc), - } - - let data_page = pb.consume(); - pages.push_back(data_page); - num_values += num_values_cur_page; - } - - if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY - { - let dict = dict_encoder - .write_dict() - .expect("write_dict() should be OK"); - let dict_page = Page::DictionaryPage { - buf: dict, - num_values: dict_encoder.num_entries() as u32, - encoding: Encoding::RLE_DICTIONARY, - is_sorted: false, - }; - pages.push_front(dict_page); - } - } } diff --git a/rust/parquet/src/compression.rs b/rust/parquet/src/compression.rs index bdc9729b155..d29024ed5c8 100644 --- a/rust/parquet/src/compression.rs +++ b/rust/parquet/src/compression.rs @@ -338,5 +338,4 @@ mod tests { fn test_codec_zstd() { test_codec(CodecType::ZSTD); } - } diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs index 08dd2e14e6d..4d4d54969d7 100644 --- a/rust/parquet/src/schema/types.rs +++ b/rust/parquet/src/schema/types.rs @@ -164,6 +164,13 @@ impl Type { _ => false, } } + + /// Returns `true` if this type is repeated or optional. + /// If this type doesn't have repetition defined, we still treat it as optional. + pub fn is_optional(&self) -> bool { + self.get_basic_info().has_repetition() + && self.get_basic_info().repetition() != Repetition::REQUIRED + } } /// A builder for primitive types. All attributes are optional @@ -527,6 +534,21 @@ impl ColumnPath { pub fn string(&self) -> String { self.parts.join(".") } + + /// Appends more components to end of column path. + /// ```rust + /// use parquet::schema::types::ColumnPath; + /// + /// let mut path = ColumnPath::new(vec!["a".to_string(), "b".to_string(), "c" + /// .to_string()]); + /// assert_eq!(&path.string(), "a.b.c"); + /// + /// path.append(vec!["d".to_string(), "e".to_string()]); + /// assert_eq!(&path.string(), "a.b.c.d.e"); + /// ``` + pub fn append(&mut self, mut tail: Vec) -> () { + self.parts.append(&mut tail); + } } impl fmt::Display for ColumnPath { diff --git a/rust/parquet/src/util/test_common/mod.rs b/rust/parquet/src/util/test_common/mod.rs index c24afdf40ab..79a970e0a82 100644 --- a/rust/parquet/src/util/test_common/mod.rs +++ b/rust/parquet/src/util/test_common/mod.rs @@ -28,3 +28,5 @@ pub use self::rand_gen::RandGen; pub use self::file_util::get_temp_file; pub use self::file_util::get_test_file; pub use self::file_util::get_test_path; + +pub use self::page_util::make_pages; diff --git a/rust/parquet/src/util/test_common/page_util.rs b/rust/parquet/src/util/test_common/page_util.rs index d12b734f2d5..f8316d6f2c4 100644 --- a/rust/parquet/src/util/test_common/page_util.rs +++ b/rust/parquet/src/util/test_common/page_util.rs @@ -16,19 +16,23 @@ // under the License. use crate::basic::Encoding; -use crate::column::page::Page; use crate::column::page::PageReader; +use crate::column::page::{Page, PageIterator}; use crate::data_type::DataType; -use crate::encodings::encoding::{get_encoder, Encoder}; +use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::encodings::levels::max_buffer_size; use crate::encodings::levels::LevelEncoder; use crate::errors::Result; -use crate::schema::types::ColumnDescPtr; +use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; use crate::util::memory::ByteBufferPtr; use crate::util::memory::MemTracker; use crate::util::memory::MemTrackerPtr; +use crate::util::test_common::random_numbers_range; +use rand::distributions::range::SampleRange; +use std::collections::VecDeque; use std::mem; use std::rc::Rc; +use std::vec::IntoIter; pub trait DataPageBuilder { fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]); @@ -176,3 +180,134 @@ impl PageReader for InMemoryPageReader { Ok(self.pages.next()) } } + +/// A utility page iterator which stores page readers in memory, used for tests. +pub struct InMemoryPageIterator { + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + page_readers: IntoIter>, +} + +impl InMemoryPageIterator { + pub fn new( + schema: SchemaDescPtr, + column_desc: ColumnDescPtr, + pages: Vec>, + ) -> Self { + let page_readers = pages + .into_iter() + .map(|pages| Box::new(InMemoryPageReader::new(pages)) as Box) + .collect::>>() + .into_iter(); + + Self { + schema, + column_desc, + page_readers, + } + } +} + +impl Iterator for InMemoryPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + self.page_readers.next().map(|page_reader| Ok(page_reader)) + } +} + +impl PageIterator for InMemoryPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.column_desc.clone()) + } +} + +pub fn make_pages( + desc: ColumnDescPtr, + encoding: Encoding, + num_pages: usize, + levels_per_page: usize, + min: T::T, + max: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + pages: &mut VecDeque, + use_v2: bool, +) where + T::T: PartialOrd + SampleRange + Copy, +{ + let mut num_values = 0; + let max_def_level = desc.max_def_level(); + let max_rep_level = desc.max_rep_level(); + + let mem_tracker = Rc::new(MemTracker::new()); + let mut dict_encoder = DictEncoder::::new(desc.clone(), mem_tracker); + + for i in 0..num_pages { + let mut num_values_cur_page = 0; + let level_range = i * levels_per_page..(i + 1) * levels_per_page; + + if max_def_level > 0 { + random_numbers_range(levels_per_page, 0, max_def_level + 1, def_levels); + for dl in &def_levels[level_range.clone()] { + if *dl == max_def_level { + num_values_cur_page += 1; + } + } + } else { + num_values_cur_page = levels_per_page; + } + if max_rep_level > 0 { + random_numbers_range(levels_per_page, 0, max_rep_level + 1, rep_levels); + } + random_numbers_range(num_values_cur_page, min, max, values); + + // Generate the current page + + let mut pb = + DataPageBuilderImpl::new(desc.clone(), num_values_cur_page as u32, use_v2); + if max_rep_level > 0 { + pb.add_rep_levels(max_rep_level, &rep_levels[level_range.clone()]); + } + if max_def_level > 0 { + pb.add_def_levels(max_def_level, &def_levels[level_range]); + } + + let value_range = num_values..num_values + num_values_cur_page; + match encoding { + Encoding::PLAIN_DICTIONARY | Encoding::RLE_DICTIONARY => { + let _ = dict_encoder.put(&values[value_range.clone()]); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + pb.add_indices(indices); + } + Encoding::PLAIN => { + pb.add_values::(encoding, &values[value_range]); + } + enc @ _ => panic!("Unexpected encoding {}", enc), + } + + let data_page = pb.consume(); + pages.push_back(data_page); + num_values += num_values_cur_page; + } + + if encoding == Encoding::PLAIN_DICTIONARY || encoding == Encoding::RLE_DICTIONARY { + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + pages.push_front(dict_page); + } +}