Skip to content

Commit

Permalink
Auto-detect page compression (#125)
Browse files Browse the repository at this point in the history
This removes the explicit compression setting in `WriteOptions`.

It was somewhat confusing as the pages still needed to be appropriately
compressed, and limiting as it forced the same value for the whole file.
  • Loading branch information
kornholi authored Apr 14, 2022
1 parent cc8f95e commit e32fce0
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 34 deletions.
7 changes: 7 additions & 0 deletions src/page/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ impl CompressedPage {
}
}

pub(crate) fn compression(&self) -> Compression {
match self {
CompressedPage::Data(page) => page.compression(),
CompressedPage::Dict(page) => page.compression(),
}
}

pub(crate) fn num_values(&self) -> usize {
match self {
CompressedPage::Data(page) => page.num_values(),
Expand Down
9 changes: 8 additions & 1 deletion src/page/page_dict/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,25 @@ impl EncodedDictPage {
#[derive(Debug)]
pub struct CompressedDictPage {
pub(crate) buffer: Vec<u8>,
compression: Compression,
pub(crate) num_values: usize,
pub(crate) uncompressed_page_size: usize,
}

impl CompressedDictPage {
pub fn new(buffer: Vec<u8>, uncompressed_page_size: usize, num_values: usize) -> Self {
pub fn new(buffer: Vec<u8>, compression: Compression, uncompressed_page_size: usize, num_values: usize) -> Self {
Self {
buffer,
compression,
uncompressed_page_size,
num_values,
}
}

/// The compression of the data in this page.
pub fn compression(&self) -> Compression {
self.compression
}
}

pub fn read_dict_page(
Expand Down
21 changes: 16 additions & 5 deletions src/write/column_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub fn write_column_chunk<'a, W, E>(
writer: &mut W,
mut offset: u64,
descriptor: &ColumnDescriptor,
compression: Compression,
mut compressed_pages: DynStreamingIterator<'a, CompressedPage, E>,
) -> Result<(ColumnChunk, Vec<PageWriteSpec>, u64)>
where
Expand All @@ -45,7 +44,7 @@ where
}
let mut bytes_written = offset - initial;

let column_chunk = build_column_chunk(&specs, descriptor, compression)?;
let column_chunk = build_column_chunk(&specs, descriptor)?;

// write metadata
let mut protocol = TCompactOutputProtocol::new(writer);
Expand All @@ -63,7 +62,6 @@ pub async fn write_column_chunk_async<W, E>(
writer: &mut W,
mut offset: u64,
descriptor: &ColumnDescriptor,
compression: Compression,
mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,
) -> Result<(ColumnChunk, Vec<PageWriteSpec>, u64)>
where
Expand All @@ -81,7 +79,7 @@ where
}
let mut bytes_written = offset - initial;

let column_chunk = build_column_chunk(&specs, descriptor, compression)?;
let column_chunk = build_column_chunk(&specs, descriptor)?;

// write metadata
let mut protocol = TCompactOutputStreamProtocol::new(writer);
Expand All @@ -99,10 +97,23 @@ where
fn build_column_chunk(
specs: &[PageWriteSpec],
descriptor: &ColumnDescriptor,
compression: Compression,
) -> Result<ColumnChunk> {
// compute stats to build header at the end of the chunk

let compression = specs
.iter()
.map(|spec| spec.compression)
.collect::<HashSet<_>>();
if compression.len() > 1 {
return Err(crate::error::Error::OutOfSpec(
"All pages within a column chunk must be compressed with the same codec".to_string(),
));
}
let compression = compression
.into_iter()
.next()
.unwrap_or(Compression::Uncompressed);

// SPEC: the total compressed size is the total compressed size of each page + the header size
let total_compressed_size = specs
.iter()
Expand Down
1 change: 1 addition & 0 deletions src/write/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ fn compress_dict(
}
Ok(CompressedDictPage::new(
compressed_buffer,
compression,
uncompressed_page_size,
num_values,
))
Expand Down
1 change: 0 additions & 1 deletion src/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ impl<W: Write> FileWriter<W> {
&mut self.writer,
self.offset,
self.schema.columns(),
self.options.compression,
row_group,
ordinal,
)?;
Expand Down
3 changes: 0 additions & 3 deletions src/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub use file::FileWriter;

pub use row_group::ColumnOffsetsMetadata;

use crate::compression::Compression;
use crate::page::CompressedPage;

pub type RowGroupIter<'a, E> =
Expand All @@ -31,8 +30,6 @@ pub type RowGroupIter<'a, E> =
pub struct WriteOptions {
/// Whether to write statistics, including indexes
pub write_statistics: bool,
/// Whether to use compression
pub compression: Compression,
/// Which Parquet version to use
pub version: Version,
}
Expand Down
26 changes: 16 additions & 10 deletions src/write/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use parquet_format_async_temp::thrift::protocol::{
};
use parquet_format_async_temp::{DictionaryPageHeader, Encoding, PageType};

use crate::compression::Compression;
use crate::error::{Error, Result};
use crate::page::{
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, ParquetPageHeader,
Expand Down Expand Up @@ -44,6 +45,7 @@ pub struct PageWriteSpec {
pub header_size: u64,
pub offset: u64,
pub bytes_written: u64,
pub compression: Compression,
pub statistics: Option<Arc<dyn Statistics>>,
}

Expand Down Expand Up @@ -84,6 +86,7 @@ pub fn write_page<W: Write>(
header_size,
offset,
bytes_written,
compression: compressed_page.compression(),
statistics,
num_rows: selected_rows.map(|x| x.last().unwrap().length),
num_values,
Expand Down Expand Up @@ -127,6 +130,7 @@ pub async fn write_page_async<W: AsyncWrite + Unpin + Send>(
header_size,
offset,
bytes_written,
compression: compressed_page.compression(),
statistics,
num_rows: selected_rows.map(|x| x.last().unwrap().length),
num_values,
Expand Down Expand Up @@ -210,21 +214,23 @@ mod tests {

#[test]
fn dict_too_large() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: i32::MAX as usize + 1,
num_values: 100,
};
let page = CompressedDictPage::new(
vec![],
Compression::Uncompressed,
i32::MAX as usize + 1,
100,
);
assert!(assemble_dict_page_header(&page).is_err());
}

#[test]
fn dict_too_many_values() {
let page = CompressedDictPage {
buffer: vec![],
uncompressed_page_size: 0,
num_values: i32::MAX as usize + 1,
};
let page = CompressedDictPage::new(
vec![],
Compression::Uncompressed,
0,
i32::MAX as usize + 1,
);
assert!(assemble_dict_page_header(&page).is_err());
}
}
7 changes: 2 additions & 5 deletions src/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use futures::AsyncWrite;
use parquet_format_async_temp::{ColumnChunk, RowGroup};

use crate::{
compression::Compression,
error::{Error, Result},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::CompressedPage,
Expand Down Expand Up @@ -81,7 +80,6 @@ pub fn write_row_group<
writer: &mut W,
mut offset: u64,
descriptors: &[ColumnDescriptor],
compression: Compression,
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
ordinal: usize,
) -> Result<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
Expand All @@ -96,7 +94,7 @@ where
let columns = column_iter
.map(|(descriptor, page_iter)| {
let (column, page_specs, size) =
write_column_chunk(writer, offset, descriptor, compression, page_iter?)?;
write_column_chunk(writer, offset, descriptor, page_iter?)?;
offset += size;
Ok((column, page_specs))
})
Expand Down Expand Up @@ -147,7 +145,6 @@ pub async fn write_row_group_async<
writer: &mut W,
mut offset: u64,
descriptors: &[ColumnDescriptor],
compression: Compression,
columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,
) -> Result<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>
where
Expand All @@ -161,7 +158,7 @@ where
let mut columns = vec![];
for (descriptor, page_iter) in column_iter {
let (column, page_specs, size) =
write_column_chunk_async(writer, offset, descriptor, compression, page_iter?).await?;
write_column_chunk_async(writer, offset, descriptor, page_iter?).await?;
offset += size;
columns.push((column, page_specs));
}
Expand Down
1 change: 0 additions & 1 deletion src/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
&mut self.writer,
self.offset,
self.schema.columns(),
self.options.compression,
row_group,
)
.await?;
Expand Down
3 changes: 1 addition & 2 deletions tests/it/write/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ fn write_file() -> Result<Vec<u8>> {

let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand All @@ -44,7 +43,7 @@ fn write_file() -> Result<Vec<u8>> {

let pages = DynStreamingIterator::new(Compressor::new(
DynIter::new(pages.into_iter()),
options.compression,
Compression::Uncompressed,
vec![],
));
let columns = std::iter::once(Ok(pages));
Expand Down
9 changes: 3 additions & 6 deletions tests/it/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ fn test_column(column: &str, compression: Compression) -> Result<()> {

let options = WriteOptions {
write_statistics: true,
compression,
version: Version::V1,
};

Expand Down Expand Up @@ -83,7 +82,7 @@ fn test_column(column: &str, compression: Compression) -> Result<()> {
&options,
&a[0].descriptor,
))),
options.compression,
compression,
vec![],
));
let columns = std::iter::once(Ok(pages));
Expand Down Expand Up @@ -187,7 +186,6 @@ fn basic() -> Result<()> {

let options = WriteOptions {
write_statistics: false,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand All @@ -205,7 +203,7 @@ fn basic() -> Result<()> {
&options,
&schema.columns()[0].descriptor,
))),
options.compression,
Compression::Uncompressed,
vec![],
));
let columns = std::iter::once(Ok(pages));
Expand Down Expand Up @@ -237,7 +235,6 @@ async fn test_column_async(column: &str) -> Result<()> {

let options = WriteOptions {
write_statistics: true,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand All @@ -264,7 +261,7 @@ async fn test_column_async(column: &str) -> Result<()> {
&options,
&a[0].descriptor,
))),
options.compression,
Compression::Uncompressed,
vec![],
));
let columns = std::iter::once(Ok(pages));
Expand Down

0 comments on commit e32fce0

Please sign in to comment.