Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add method for async read bloom filter #4917

Merged
merged 4 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 142 additions & 5 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@

use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
Expand All @@ -88,7 +87,6 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
Expand All @@ -102,15 +100,18 @@ use crate::arrow::arrow_reader::{
};
use crate::arrow::ProjectionMask;

use crate::bloom_filter::{
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
use crate::column::page::{PageIterator, PageReader};

use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::PageLocation;

use crate::file::FOOTER_SIZE;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation,
};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -302,6 +303,71 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
Self::new_builder(AsyncReader(input), metadata)
}

/// Read bloom filter for a column in a row group
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
pub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);

let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
})?
} else {
return Ok(None);
};

let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
None => self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
}
.await?;

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
// this match exists to future proof the singleton algorithm enum
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
// this match exists to future proof the singleton compression enum
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
// this match exists to future proof the singleton hash enum
}
}

let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset as usize - offset)..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input
.0
.get_bytes(
bitset_offset as usize..bitset_offset as usize + bitset_length,
)
.await?
}
};
Ok(Some(Sbbf::new(&bitset)))
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down Expand Up @@ -1540,4 +1606,75 @@ mod tests {
assert_ne!(1024, file_rows);
assert_eq!(stream.batch_size, file_rows);
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_without_length() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
test_get_row_group_column_bloom_filter(data, false).await;
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);
let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let schema = builder.schema().clone();
let stream = builder.build().unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();

let mut parquet_data = Vec::new();
let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
let mut writer =
ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
for batch in batches {
writer.write(&batch).unwrap();
}
writer.close().unwrap();

// test the new parquet file
test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
}

async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
let column = row_group.column(0);
assert_eq!(column.bloom_filter_length().is_some(), with_length);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();

let sbbf = builder
.get_row_group_column_bloom_filter(0, 0)
.await
.unwrap()
.unwrap();
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}
}
8 changes: 4 additions & 4 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ impl std::ops::IndexMut<usize> for Block {
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset(
pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
Expand All @@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset(
/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
/// length of the header.
#[inline]
fn read_bloom_filter_header_and_length(
pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
Expand Down Expand Up @@ -199,7 +199,7 @@ impl Sbbf {
Self::new(&bitset)
}

fn new(bitset: &[u8]) -> Self {
pub(crate) fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
Expand Down
Loading