Skip to content

Commit

Permalink
push down filter to partition listing (apache#10693)
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp authored and findepi committed Jul 16, 2024
1 parent 6e54e13 commit 306c3d9
Showing 1 changed file with 202 additions and 3 deletions.
205 changes: 202 additions & 3 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

//! Helper functions for the table implementation

use std::collections::HashMap;
use std::sync::Arc;

use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use crate::logical_expr::{BinaryExpr, Operator};
use crate::{error::Result, scalar::ScalarValue};

use arrow::{
Expand Down Expand Up @@ -169,9 +171,17 @@ async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
partition_prefix: Option<Path>,
) -> Result<Vec<Partition>> {
let partition = Partition {
path: table_path.prefix().clone(),
path: match partition_prefix {
Some(prefix) => Path::from_iter(
Path::from(table_path.prefix().as_ref())
.parts()
.chain(Path::from(prefix.as_ref()).parts()),
),
None => table_path.prefix().clone(),
},
depth: 0,
files: None,
};
Expand Down Expand Up @@ -305,6 +315,80 @@ async fn prune_partitions(
Ok(filtered)
}

#[derive(Debug)]
enum PartitionValue {
Single(String),
Multi,
}

fn populate_partition_values<'a>(
partition_values: &mut HashMap<&'a str, PartitionValue>,
filter: &'a Expr,
) {
if let Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) = filter
{
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { ref name, .. }), Expr::Literal(val))
| (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => {
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
{
partition_values.insert(name, PartitionValue::Multi);
}
}
_ => {}
},
Operator::And => {
populate_partition_values(partition_values, left);
populate_partition_values(partition_values, right);
}
_ => {}
}
}
}

fn evaluate_partition_prefix<'a>(
partition_cols: &'a [(String, DataType)],
filters: &'a [Expr],
) -> Option<Path> {
let mut partition_values = HashMap::new();
for filter in filters {
populate_partition_values(&mut partition_values, filter);
}

if partition_values.is_empty() {
return None;
}

let mut parts = vec![];
for (p, _) in partition_cols {
match partition_values.get(p.as_str()) {
Some(PartitionValue::Single(val)) => {
// if a partition only has a single literal value, then it can be added to the
// prefix
parts.push(format!("{p}={val}"));
}
_ => {
// break on the first unconstrainted partition to create a common prefix
// for all covered partitions.
break;
}
}
}

if parts.is_empty() {
None
} else {
Some(Path::from_iter(parts))
}
}

/// Discover the partitions on the given path and prune out files
/// that belong to irrelevant partitions using `filters` expressions.
/// `filters` might contain expressions that can be resolved only at the
Expand All @@ -327,7 +411,10 @@ pub async fn pruned_partition_list<'a>(
));
}

let partitions = list_partitions(store, table_path, partition_cols.len()).await?;
let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
let partitions =
list_partitions(store, table_path, partition_cols.len(), partition_prefix)
.await?;
debug!("Listed {} partitions", partitions.len());

let pruned =
Expand Down Expand Up @@ -416,7 +503,9 @@ where
mod tests {
use std::ops::Not;

use crate::logical_expr::{case, col, lit};
use futures::StreamExt;

use crate::logical_expr::{case, col, lit, Expr};
use crate::test::object_store::make_test_store_and_state;

use super::*;
Expand Down Expand Up @@ -675,4 +764,114 @@ mod tests {
// this helper function
assert!(expr_applicable_for_cols(&[], &lit(true)));
}

#[test]
fn test_evaluate_partition_prefix() {
let partitions = &[
("a".to_string(), DataType::Utf8),
("b".to_string(), DataType::Int16),
("c".to_string(), DataType::Boolean),
];

assert_eq!(
evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
Some(Path::from("a=foo")),
);

assert_eq!(
evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
Some(Path::from("a=foo")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(lit("foo")).and((col("b").eq(lit("bar"))))],
),
Some(Path::from("a=foo/b=bar")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
// list of filters should be evaluated as AND
&[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
),
Some(Path::from("a=foo/b=bar")),
);

assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a")
.eq(lit("foo"))
.and(col("b").eq(lit("1")))
.and(col("c").eq(lit("true")))],
),
Some(Path::from("a=foo/b=1/c=true")),
);

// no prefix when filter is empty
assert_eq!(evaluate_partition_prefix(partitions, &[]), None);

// b=foo results in no prefix because a is not restricted
assert_eq!(
evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
None,
);

// a=foo and c=baz only results in preifx a=foo because b is not restricted
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
),
Some(Path::from("a=foo")),
);

// partition with multiple values results in no prefix
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
),
None,
);

// no prefix because partition a is not restricted to a single literal
assert_eq!(
evaluate_partition_prefix(
partitions,
&[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
),
None,
);
assert_eq!(
evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
None,
);
}

#[test]
fn test_evaluate_date_partition_prefix() {
let partitions = &[("a".to_string(), DataType::Date32)];
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3))))],
),
Some(Path::from("a=1970-01-04")),
);

let partitions = &[("a".to_string(), DataType::Date64)];
assert_eq!(
evaluate_partition_prefix(
partitions,
&[col("a").eq(Expr::Literal(ScalarValue::Date64(Some(
4 * 24 * 60 * 60 * 1000
)))),],
),
Some(Path::from("a=1970-01-05")),
);
}
}

0 comments on commit 306c3d9

Please sign in to comment.