From 306c3d90d60c75f2e3e222b8511b8abd58add303 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Thu, 30 May 2024 04:34:18 -0700 Subject: [PATCH] push down filter to partition listing (#10693) --- .../core/src/datasource/listing/helpers.rs | 205 +++++++++++++++++- 1 file changed, 202 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 5b8709009665..b531cf8369cf 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -17,11 +17,13 @@ //! Helper functions for the table implementation +use std::collections::HashMap; use std::sync::Arc; use super::PartitionedFile; use crate::datasource::listing::ListingTableUrl; use crate::execution::context::SessionState; +use crate::logical_expr::{BinaryExpr, Operator}; use crate::{error::Result, scalar::ScalarValue}; use arrow::{ @@ -169,9 +171,17 @@ async fn list_partitions( store: &dyn ObjectStore, table_path: &ListingTableUrl, max_depth: usize, + partition_prefix: Option, ) -> Result> { let partition = Partition { - path: table_path.prefix().clone(), + path: match partition_prefix { + Some(prefix) => Path::from_iter( + Path::from(table_path.prefix().as_ref()) + .parts() + .chain(Path::from(prefix.as_ref()).parts()), + ), + None => table_path.prefix().clone(), + }, depth: 0, files: None, }; @@ -305,6 +315,80 @@ async fn prune_partitions( Ok(filtered) } +#[derive(Debug)] +enum PartitionValue { + Single(String), + Multi, +} + +fn populate_partition_values<'a>( + partition_values: &mut HashMap<&'a str, PartitionValue>, + filter: &'a Expr, +) { + if let Expr::BinaryExpr(BinaryExpr { + ref left, + op, + ref right, + }) = filter + { + match op { + Operator::Eq => match (left.as_ref(), right.as_ref()) { + (Expr::Column(Column { ref name, .. }), Expr::Literal(val)) + | (Expr::Literal(val), Expr::Column(Column { ref name, .. })) => { + if partition_values + .insert(name, PartitionValue::Single(val.to_string())) + .is_some() + { + partition_values.insert(name, PartitionValue::Multi); + } + } + _ => {} + }, + Operator::And => { + populate_partition_values(partition_values, left); + populate_partition_values(partition_values, right); + } + _ => {} + } + } +} + +fn evaluate_partition_prefix<'a>( + partition_cols: &'a [(String, DataType)], + filters: &'a [Expr], +) -> Option { + let mut partition_values = HashMap::new(); + for filter in filters { + populate_partition_values(&mut partition_values, filter); + } + + if partition_values.is_empty() { + return None; + } + + let mut parts = vec![]; + for (p, _) in partition_cols { + match partition_values.get(p.as_str()) { + Some(PartitionValue::Single(val)) => { + // if a partition only has a single literal value, then it can be added to the + // prefix + parts.push(format!("{p}={val}")); + } + _ => { + // break on the first unconstrainted partition to create a common prefix + // for all covered partitions. + break; + } + } + } + + if parts.is_empty() { + None + } else { + Some(Path::from_iter(parts)) + } +} + /// Discover the partitions on the given path and prune out files /// that belong to irrelevant partitions using `filters` expressions. /// `filters` might contain expressions that can be resolved only at the @@ -327,7 +411,10 @@ pub async fn pruned_partition_list<'a>( )); } - let partitions = list_partitions(store, table_path, partition_cols.len()).await?; + let partition_prefix = evaluate_partition_prefix(partition_cols, filters); + let partitions = + list_partitions(store, table_path, partition_cols.len(), partition_prefix) + .await?; debug!("Listed {} partitions", partitions.len()); let pruned = @@ -416,7 +503,9 @@ where mod tests { use std::ops::Not; - use crate::logical_expr::{case, col, lit}; + use futures::StreamExt; + + use crate::logical_expr::{case, col, lit, Expr}; use crate::test::object_store::make_test_store_and_state; use super::*; @@ -675,4 +764,114 @@ mod tests { // this helper function assert!(expr_applicable_for_cols(&[], &lit(true))); } + + #[test] + fn test_evaluate_partition_prefix() { + let partitions = &[ + ("a".to_string(), DataType::Utf8), + ("b".to_string(), DataType::Int16), + ("c".to_string(), DataType::Boolean), + ]; + + assert_eq!( + evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]), + Some(Path::from("a=foo")), + ); + + assert_eq!( + evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]), + Some(Path::from("a=foo")), + ); + + assert_eq!( + evaluate_partition_prefix( + partitions, + &[col("a").eq(lit("foo")).and((col("b").eq(lit("bar"))))], + ), + Some(Path::from("a=foo/b=bar")), + ); + + assert_eq!( + evaluate_partition_prefix( + partitions, + // list of filters should be evaluated as AND + &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),], + ), + Some(Path::from("a=foo/b=bar")), + ); + + assert_eq!( + evaluate_partition_prefix( + partitions, + &[col("a") + .eq(lit("foo")) + .and(col("b").eq(lit("1"))) + .and(col("c").eq(lit("true")))], + ), + Some(Path::from("a=foo/b=1/c=true")), + ); + + // no prefix when filter is empty + assert_eq!(evaluate_partition_prefix(partitions, &[]), None); + + // b=foo results in no prefix because a is not restricted + assert_eq!( + evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]), + None, + ); + + // a=foo and c=baz only results in preifx a=foo because b is not restricted + assert_eq!( + evaluate_partition_prefix( + partitions, + &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))], + ), + Some(Path::from("a=foo")), + ); + + // partition with multiple values results in no prefix + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))], + ), + None, + ); + + // no prefix because partition a is not restricted to a single literal + assert_eq!( + evaluate_partition_prefix( + partitions, + &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))], + ), + None, + ); + assert_eq!( + evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],), + None, + ); + } + + #[test] + fn test_evaluate_date_partition_prefix() { + let partitions = &[("a".to_string(), DataType::Date32)]; + assert_eq!( + evaluate_partition_prefix( + partitions, + &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3))))], + ), + Some(Path::from("a=1970-01-04")), + ); + + let partitions = &[("a".to_string(), DataType::Date64)]; + assert_eq!( + evaluate_partition_prefix( + partitions, + &[col("a").eq(Expr::Literal(ScalarValue::Date64(Some( + 4 * 24 * 60 * 60 * 1000 + )))),], + ), + Some(Path::from("a=1970-01-05")), + ); + } }