Skip to content

Commit

Permalink
add to async ReadParquetDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Nov 30, 2023
1 parent ebcc1ea commit 271e9fb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ impl SyncSource for ReadParquetDataSource<true> {
match self.partitions.steal_one(self.id) {
None => Ok(None),
Some(part) => {
if runtime_filter_pruner(
&part,
&self.runtime_filters,
&self.partitions.ctx.get_function_context()?,
)? {
return Ok(None);
}

if let Some(index_reader) = self.index_reader.as_ref() {
let fuse_part = FusePartInfo::from_part(&part)?;
let loc =
Expand Down Expand Up @@ -148,14 +156,6 @@ impl SyncSource for ReadParquetDataSource<true> {
&None
};

if runtime_filter_pruner(
&part,
&self.runtime_filters,
&self.partitions.ctx.get_function_context()?,
)? {
return Ok(None);
}

let source = self.block_reader.sync_read_columns_data_by_merge_io(
&ReadSettings::from_ctx(&self.partitions.ctx)?,
&part,
Expand Down Expand Up @@ -223,12 +223,18 @@ impl Processor for ReadParquetDataSource<false> {

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
dbg!(&self.runtime_filters);
let parts = self.partitions.steal(self.id, self.batch_size);

if !parts.is_empty() {
let mut chunks = Vec::with_capacity(parts.len());
for part in &parts {
if runtime_filter_pruner(
part,
&self.runtime_filters,
&self.partitions.ctx.get_function_context()?,
)? {
continue;
}
let part = part.clone();
let block_reader = self.block_reader.clone();
let settings = ReadSettings::from_ctx(&self.partitions.ctx)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub fn runtime_filter_pruner(
let part = FusePartInfo::from_part(part)?;
Ok(filters.iter().any(|(id, filter)| {
let column_refs = filter.column_refs();
// Currently only support filter with one column(probe key).
assert_debug!(column_refs.len() == 1);
let ty = column_refs.values().last().unwrap();
let name = column_refs.keys().last().unwrap();
if let Some(stats) = &part.columns_stat {
Expand All @@ -52,7 +54,6 @@ pub fn runtime_filter_pruner(
func_ctx,
&BUILTIN_FUNCTIONS,
);
dbg!(&new_expr);
matches!(new_expr, Expr::Constant {
scalar: Scalar::Boolean(false),
..
Expand Down

0 comments on commit 271e9fb

Please sign in to comment.