Skip to content

Commit

Permalink
feat: add method for async read bloom filter (#4917)
Browse files Browse the repository at this point in the history
* feat: add method for async read bloomfilter

* fix: compatible for bloom filter length

* test: add unit tests for read bloom filter

* fix: format code for unit test
  • Loading branch information
hengfeiyang authored Oct 12, 2023
1 parent 0503d65 commit 6e49f31
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 9 deletions.
147 changes: 142 additions & 5 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@

use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
Expand All @@ -88,7 +87,6 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow_array::RecordBatch;
Expand All @@ -102,15 +100,18 @@ use crate::arrow::arrow_reader::{
};
use crate::arrow::ProjectionMask;

use crate::bloom_filter::{
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
};
use crate::column::page::{PageIterator, PageReader};

use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::PageLocation;

use crate::file::FOOTER_SIZE;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, PageLocation,
};

mod metadata;
pub use metadata::*;
Expand Down Expand Up @@ -302,6 +303,71 @@ impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
Self::new_builder(AsyncReader(input), metadata)
}

/// Read bloom filter for a column in a row group
/// Returns `None` if the column does not have a bloom filter
///
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
pub async fn get_row_group_column_bloom_filter(
&mut self,
row_group_idx: usize,
column_idx: usize,
) -> Result<Option<Sbbf>> {
let metadata = self.metadata.row_group(row_group_idx);
let column_metadata = metadata.column(column_idx);

let offset: usize = if let Some(offset) = column_metadata.bloom_filter_offset() {
offset.try_into().map_err(|_| {
ParquetError::General("Bloom filter offset is invalid".to_string())
})?
} else {
return Ok(None);
};

let buffer = match column_metadata.bloom_filter_length() {
Some(length) => self.input.0.get_bytes(offset..offset + length as usize),
None => self
.input
.0
.get_bytes(offset..offset + SBBF_HEADER_SIZE_ESTIMATE),
}
.await?;

let (header, bitset_offset) =
chunk_read_bloom_filter_header_and_offset(offset as u64, buffer.clone())?;

match header.algorithm {
BloomFilterAlgorithm::BLOCK(_) => {
// this match exists to future proof the singleton algorithm enum
}
}
match header.compression {
BloomFilterCompression::UNCOMPRESSED(_) => {
// this match exists to future proof the singleton compression enum
}
}
match header.hash {
BloomFilterHash::XXHASH(_) => {
// this match exists to future proof the singleton hash enum
}
}

let bitset = match column_metadata.bloom_filter_length() {
Some(_) => buffer.slice((bitset_offset as usize - offset)..),
None => {
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
ParquetError::General("Bloom filter length is invalid".to_string())
})?;
self.input
.0
.get_bytes(
bitset_offset as usize..bitset_offset as usize + bitset_length,
)
.await?
}
};
Ok(Some(Sbbf::new(&bitset)))
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down Expand Up @@ -1540,4 +1606,75 @@ mod tests {
assert_ne!(1024, file_rows);
assert_eq!(stream.batch_size, file_rows);
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_without_length() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
test_get_row_group_column_bloom_filter(data, false).await;
}

#[tokio::test]
async fn test_get_row_group_column_bloom_filter_with_length() {
// convert to new parquet file with bloom_filter_length
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
let data = Bytes::from(std::fs::read(path).unwrap());
let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);
let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();
let schema = builder.schema().clone();
let stream = builder.build().unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();

let mut parquet_data = Vec::new();
let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.build();
let mut writer =
ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
for batch in batches {
writer.write(&batch).unwrap();
}
writer.close().unwrap();

// test the new parquet file
test_get_row_group_column_bloom_filter(parquet_data.into(), true).await;
}

async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);
let column = row_group.column(0);
assert_eq!(column.bloom_filter_length().is_some(), with_length);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
.await
.unwrap();

let sbbf = builder
.get_row_group_column_bloom_filter(0, 0)
.await
.unwrap()
.unwrap();
assert!(sbbf.check(&"Hello"));
assert!(!sbbf.check(&"Hello_Not_Exists"));
}
}
8 changes: 4 additions & 4 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ impl std::ops::IndexMut<usize> for Block {
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
pub(crate) const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

/// given an initial offset, and a byte buffer, try to read out a bloom filter header and return
/// both the header and the offset after it (for bitset).
fn chunk_read_bloom_filter_header_and_offset(
pub(crate) fn chunk_read_bloom_filter_header_and_offset(
offset: u64,
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
Expand All @@ -147,7 +147,7 @@ fn chunk_read_bloom_filter_header_and_offset(
/// given a [Bytes] buffer, try to read out a bloom filter header and return both the header and
/// length of the header.
#[inline]
fn read_bloom_filter_header_and_length(
pub(crate) fn read_bloom_filter_header_and_length(
buffer: Bytes,
) -> Result<(BloomFilterHeader, u64), ParquetError> {
let total_length = buffer.len();
Expand Down Expand Up @@ -199,7 +199,7 @@ impl Sbbf {
Self::new(&bitset)
}

fn new(bitset: &[u8]) -> Self {
pub(crate) fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
Expand Down

0 comments on commit 6e49f31

Please sign in to comment.