Skip to content

Commit

Permalink
Get basic tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
adriangb committed Jul 12, 2024
1 parent dd3a80e commit d7a4156
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 23 deletions.
35 changes: 35 additions & 0 deletions parquet/src/file/page_index/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,41 @@ impl<T: ParquetValueType> NativeIndex<T> {
boundary_order: index.boundary_order,
})
}

pub(crate) fn to_column_index(&self) -> ColumnIndex {
let min_values = self
.indexes
.iter()
.map(|x| x.min_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);

let max_values = self
.indexes
.iter()
.map(|x| x.max_bytes().map(|x| x.to_vec()))
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![vec![]; self.indexes.len()]);

let null_counts = self
.indexes
.iter()
.map(|x| x.null_count())
.collect::<Option<Vec<_>>>()
.unwrap_or_else(|| vec![0; self.indexes.len()]);

ColumnIndex {
min_values,
max_values,
null_pages: self
.indexes
.iter()
.map(|x| x.min().is_none())
.collect(),
null_counts: Some(null_counts),
boundary_order: self.boundary_order,
}
}
}

#[cfg(test)]
Expand Down
205 changes: 182 additions & 23 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//! using row group writers and column writers respectively.

use crate::bloom_filter::Sbbf;
use crate::data_type::private::ParquetValueType;
use crate::file::page_index::index::{Index, NativeIndex};
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use crate::thrift::TSerializable;
Expand Down Expand Up @@ -260,7 +262,7 @@ impl<W: Write + Send> SerializedFileWriter<W> {
None => Some(self.kv_metadatas.clone()),
};

ParquetMetadataEncoder::write(
let mut encoder = ParquetMetadataWriter::new(
&mut self.buf,
&self.schema,
&self.descr,
Expand All @@ -269,9 +271,10 @@ impl<W: Write + Send> SerializedFileWriter<W> {
&self.column_indexes,
&self.offset_indexes,
&key_value_metadata,
self.props.created_by(),
Some(self.props.created_by().to_string()),
self.props.writer_version(),
)
);
encoder.finish()
}

#[inline]
Expand Down Expand Up @@ -675,20 +678,20 @@ impl<'a, W: Write + Send> PageWriter for SerializedPageWriter<'a, W> {
}
}

pub struct ParquetMetadataEncoder<'a, W: Write> {
struct ParquetMetadataWriter<'a, W: Write> {
buf: &'a mut TrackedWrite<W>,
schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
row_groups: &'a Vec<RowGroupMetaDataPtr>,
bloom_filters: &'a Vec<Vec<Option<Sbbf>>>,
column_indexes: &'a Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: &'a Vec<Vec<Option<OffsetIndex>>>,
row_groups: &'a [RowGroupMetaDataPtr],
bloom_filters: &'a [Vec<Option<Sbbf>>],
column_indexes: &'a [Vec<Option<ColumnIndex>>],
offset_indexes: &'a [Vec<Option<OffsetIndex>>],
key_value_metadata: &'a Option<Vec<KeyValue>>,
created_by: &'a str,
created_by: Option<String>,
writer_version: WriterVersion,
}

impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
impl<'a, W: Write> ParquetMetadataWriter<'a, W> {
/// Serialize all the offset index to the file
fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> {
// iter row group
Expand Down Expand Up @@ -768,12 +771,11 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
}

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
pub fn finish(&mut self) -> Result<parquet::FileMetaData> {
let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();

let mut row_groups = self
.row_groups
.as_slice()
.iter()
.map(|v| v.to_thrift())
.collect::<Vec<_>>();
Expand Down Expand Up @@ -802,7 +804,7 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
key_value_metadata: self.key_value_metadata.clone(),
version: self.writer_version.as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
created_by: Some(self.created_by.to_string()),
created_by: self.created_by.clone(),
column_orders,
encryption_algorithm: None,
footer_signing_key_metadata: None,
Expand All @@ -824,19 +826,19 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
Ok(file_metadata)
}

pub(self) fn write(
pub(self) fn new(
buf: &'a mut TrackedWrite<W>,
schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
row_groups: &'a Vec<RowGroupMetaDataPtr>,
bloom_filters: &'a Vec<Vec<Option<Sbbf>>>,
column_indexes: &'a Vec<Vec<Option<ColumnIndex>>>,
offset_indexes: &'a Vec<Vec<Option<OffsetIndex>>>,
row_groups: &'a [RowGroupMetaDataPtr],
bloom_filters: &'a [Vec<Option<Sbbf>>],
column_indexes: &'a [Vec<Option<ColumnIndex>>],
offset_indexes: &'a [Vec<Option<OffsetIndex>>],
key_value_metadata: &'a Option<Vec<KeyValue>>,
created_by: &'a str,
created_by: Option<String>,
writer_version: WriterVersion,
) -> Result<parquet::FileMetaData> {
let mut encoder = Self {
) -> Self {
Self {
buf,
schema,
schema_descr,
Expand All @@ -847,25 +849,140 @@ impl<'a, W: Write> ParquetMetadataEncoder<'a, W> {
key_value_metadata,
created_by,
writer_version,
}
}
}

struct ParquetMetadataEncoderBuilderOpt<'a, W: Write> {
buf: TrackedWrite<W>,
write_page_index: bool,
bloom_filters: Option<&'a [Vec<Option<Sbbf>>]>,
metadata: &'a ParquetMetaData,
}

impl <'a, W: Write> ParquetMetadataEncoderBuilderOpt<'a, W> {
pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
Self {
buf: TrackedWrite::new(buf),
write_page_index: true,
bloom_filters: None,
metadata,
}
}

fn write_page_index(&mut self, write_page_index: bool) -> &mut Self {
self.write_page_index = write_page_index;
self
}

fn with_bloom_filters(&mut self, bloom_filters: &'a [Vec<Option<Sbbf>>]) -> &mut Self {
self.bloom_filters = Some(bloom_filters);
self
}

fn finish(&mut self) -> Result<()> {
let file_metadata = self.metadata.file_metadata();

let schema = Arc::new(file_metadata.schema().clone());
let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
let created_by = file_metadata.created_by().map(str::to_string);
let writer_version = WriterVersion::PARQUET_2_0; // TODO: how can we get this from ParquetMetadata?

let row_groups = self.metadata.row_groups().iter().map(|rg| Arc::new(rg.clone())).collect::<Vec<_>>();

let key_value_metadata = file_metadata.key_value_metadata().cloned();

let column_indexes = self.convert_column_indexes();
let offset_indexes = self.convert_offset_index();

// if the outer bloom filters is None, iterate over the row groups and create a Vec of None
let bloom_filters = match self.bloom_filters {
Some(bloom_filters) => bloom_filters,
None => {
&row_groups.iter().map(|rg| {
vec![None; rg.columns().len()]
}).collect::<Vec<_>>()
}
};
encoder.write_metadata()

let mut encoder = ParquetMetadataWriter::new(
&mut self.buf,
&schema,
&schema_descr,
&row_groups,
bloom_filters,
&column_indexes,
&offset_indexes,
&key_value_metadata,
created_by,
writer_version,
);
encoder.finish()?;

Ok(())
}

fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
if let Some(row_group_column_indexes) = self.metadata.column_index() {
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
let column_indexes = &row_group_column_indexes[rg_idx];
column_indexes.iter().map(|column_index| {
match column_index {
Index::NONE => None,
Index::BOOLEAN(column_index) => Some(column_index.to_column_index()),
Index::BYTE_ARRAY(column_index) => Some(column_index.to_column_index()),
Index::DOUBLE(column_index) => Some(column_index.to_column_index()),
Index::FIXED_LEN_BYTE_ARRAY(column_index) => Some(column_index.to_column_index()),
Index::FLOAT(column_index) => Some(column_index.to_column_index()),
Index::INT32(column_index) => Some(column_index.to_column_index()),
Index::INT64(column_index) => Some(column_index.to_column_index()),
Index::INT96(column_index) => Some(column_index.to_column_index()),
}
}).collect()
}).collect()
} else {
// make a None for each row group, for each column
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
std::iter::repeat(None).take(rg.columns().len()).collect()
}).collect()
}
}

fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
let column_indexes = &row_group_offset_indexes[rg_idx];
column_indexes.iter().map(|column_index| {
Some(OffsetIndex::new(column_index.clone()))
}).collect()
}).collect()
} else {
// make a None for each row group, for each column
self.metadata.row_groups().iter().enumerate().map(|(rg_idx, rg)| {
std::iter::repeat(None).take(rg.columns().len()).collect()
}).collect()
}
}
}

#[cfg(test)]
mod tests {
use super::*;

use bytes::Bytes;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType as ArrowDataType, Field, Schema};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::fs::File;

use crate::arrow::ArrowWriter;
use crate::basic::{
ColumnOrder, Compression, ConvertedType, Encoding, LogicalType, Repetition, SortOrder, Type,
};
use crate::column::page::{Page, PageReader};
use crate::column::reader::get_typed_column_reader;
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
use crate::data_type::{BoolType, Int32Type};
use crate::file::footer::decode_metadata;
use crate::file::page_index::index::Index;
use crate::file::properties::EnabledStatistics;
use crate::file::serialized_reader::ReadOptionsBuilder;
Expand Down Expand Up @@ -1889,4 +2006,46 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}

fn get_test_metadata() -> ParquetMetaData {
let mut buf = BytesMut::new().writer();
let schema: Arc<Schema> = Arc::new(
Schema::new(vec![Field::new("a", ArrowDataType::Int32, true)]),
);

let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));

let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
writer.write(&batch);
writer.close().unwrap();

let data = buf.into_inner().freeze();

let reader = SerializedFileReader::new(data).unwrap();
let metadata = reader.metadata().clone();
metadata
}

#[test]
fn test_encode_parquet_metadata() {
let metadata = get_test_metadata();

let mut buf = BytesMut::new().writer();
{
let mut writer = ParquetMetadataEncoderBuilderOpt::new(
&mut buf,
&metadata,
);
writer.finish();
}

let data = buf.into_inner().freeze();

let decoded_metadata = decode_metadata(&data).unwrap();

// TODO: decode the metadata and check that it matches
// This requires implementing a metadata decoder
}
}

0 comments on commit d7a4156

Please sign in to comment.