diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 997008d58cd1..14fd0c279139 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -1435,7 +1435,7 @@ impl PrefilterMaskSetting { let is_nested = dtype.is_nested(); // We empirically selected these numbers. - is_nested && prefilter_cost <= 0.01 + !is_nested && prefilter_cost <= 0.01 }, Self::Pre => true, Self::Post => false, diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 1bf97daeceaf..920afae9a160 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -65,18 +65,10 @@ impl ParquetColumnExpr for ColumnPredicateExpr { fn evaluate_mut(&self, values: &dyn Array, bm: &mut MutableBitmap) { // We should never evaluate nulls with this. assert!(values.validity().is_none_or(|v| v.set_bits() == 0)); - assert_eq!( - &self.dtype.to_physical().to_arrow(CompatLevel::newest()), - values.dtype() - ); - - let series = unsafe { - Series::from_chunks_and_dtype_unchecked( - self.column_name.clone(), - vec![values.to_boxed()], - &self.dtype, - ) - }; + + // @NOTE: This is okay because we don't have Enums, Categoricals, or Decimals + let series = + Series::from_arrow_chunks(self.column_name.clone(), vec![values.to_boxed()]).unwrap(); let column = series.into_column(); let df = unsafe { DataFrame::new_no_checks(values.len(), vec![column]) }; diff --git a/crates/polars-plan/src/plans/aexpr/predicates/column_expr.rs b/crates/polars-plan/src/plans/aexpr/predicates/column_expr.rs index 30d73b4fe79f..4daaddfd0fc5 100644 --- a/crates/polars-plan/src/plans/aexpr/predicates/column_expr.rs +++ b/crates/polars-plan/src/plans/aexpr/predicates/column_expr.rs @@ -1,11 +1,14 @@ //! This module creates predicates splits predicates into partial per-column predicates. +use polars_core::datatypes::DataType; +use polars_core::scalar::Scalar; use polars_core::schema::Schema; use polars_io::predicates::SpecializedColumnPredicateExpr; use polars_utils::aliases::PlHashMap; use polars_utils::arena::{Arena, Node}; use polars_utils::pl_str::PlSmallStr; +use super::get_binary_expr_col_and_lv; use crate::dsl::Operator; use crate::plans::{aexpr_to_leaf_names_iter, AExpr, MintermIter}; @@ -19,7 +22,7 @@ pub struct ColumnPredicates { pub fn aexpr_to_column_predicates( root: Node, expr_arena: &mut Arena, - _schema: &Schema, + schema: &Schema, ) -> ColumnPredicates { let mut predicates = PlHashMap::)>::default(); @@ -38,6 +41,32 @@ pub fn aexpr_to_column_predicates( } let column = leaf_names.pop().unwrap(); + let Some(dtype) = schema.get(&column) else { + is_sumwise_complete = false; + continue; + }; + + // We really don't want to deal with these types. + use DataType as D; + match dtype { + #[cfg(feature = "dtype-categorical")] + D::Enum(_, _) | D::Categorical(_, _) => { + is_sumwise_complete = false; + continue; + }, + #[cfg(feature = "dtype-decimal")] + D::Decimal(_, _) => { + is_sumwise_complete = false; + continue; + }, + _ if dtype.is_nested() => { + is_sumwise_complete = false; + continue; + }, + _ => {}, + } + + let dtype = dtype.clone(); let entry = predicates.entry(column); entry @@ -50,7 +79,36 @@ pub fn aexpr_to_column_predicates( }); n.1 = None; }) - .or_insert((minterm, None)); + .or_insert_with(|| { + ( + minterm, + Some(()).and_then(|_| { + if std::env::var("POLARS_SPECIALIZED_COLUMN_PRED").as_deref() != Ok("1") { + return None; + } + + let aexpr = expr_arena.get(minterm); + + let AExpr::BinaryExpr { left, op, right } = aexpr else { + return None; + }; + let ((_, _), (lv, _)) = + get_binary_expr_col_and_lv(*left, *right, expr_arena, schema)?; + let av = lv.to_any_value()?; + if av.dtype() != dtype { + return None; + } + let scalar = Scalar::new(dtype, av.into_static()); + use Operator as O; + match op { + O::Eq | O::EqValidity => { + Some(SpecializedColumnPredicateExpr::Eq(scalar)) + }, + _ => None, + } + }), + ) + }); } ColumnPredicates { diff --git a/crates/polars-plan/src/plans/aexpr/predicates/mod.rs b/crates/polars-plan/src/plans/aexpr/predicates/mod.rs index 07d2717ecf88..88f74ae679f3 100644 --- a/crates/polars-plan/src/plans/aexpr/predicates/mod.rs +++ b/crates/polars-plan/src/plans/aexpr/predicates/mod.rs @@ -1,5 +1,32 @@ mod column_expr; mod skip_batches; +use std::borrow::Cow; + pub use column_expr::*; +use polars_core::schema::Schema; +use polars_utils::arena::{Arena, Node}; +use polars_utils::pl_str::PlSmallStr; pub use skip_batches::*; + +use super::evaluate::{constant_evaluate, into_column}; +use super::{AExpr, LiteralValue}; + +#[allow(clippy::type_complexity)] +fn get_binary_expr_col_and_lv<'a>( + left: Node, + right: Node, + expr_arena: &'a Arena, + schema: &Schema, +) -> Option<((&'a PlSmallStr, Node), (Cow<'a, LiteralValue>, Node))> { + match ( + into_column(left, expr_arena, schema, 0), + into_column(right, expr_arena, schema, 0), + constant_evaluate(left, expr_arena, schema, 0), + constant_evaluate(right, expr_arena, schema, 0), + ) { + (Some(col), _, _, Some(lv)) => Some(((col, left), (lv, right))), + (_, Some(col), Some(lv), _) => Some(((col, right), (lv, left))), + _ => None, + } +} diff --git a/crates/polars-plan/src/plans/aexpr/predicates/skip_batches.rs b/crates/polars-plan/src/plans/aexpr/predicates/skip_batches.rs index 2e48a9132c7b..8af03de1efb8 100644 --- a/crates/polars-plan/src/plans/aexpr/predicates/skip_batches.rs +++ b/crates/polars-plan/src/plans/aexpr/predicates/skip_batches.rs @@ -1,8 +1,6 @@ //! This module creates predicates that can skip record batches of rows based on statistics about //! that record batch. -use std::borrow::Cow; - use polars_core::prelude::AnyValue; use polars_core::schema::Schema; use polars_utils::arena::{Arena, Node}; @@ -12,6 +10,7 @@ use polars_utils::pl_str::PlSmallStr; use super::super::evaluate::{constant_evaluate, into_column}; use super::super::{AExpr, BooleanFunction, Operator, OutputName}; use crate::dsl::FunctionExpr; +use crate::plans::predicates::get_binary_expr_col_and_lv; use crate::plans::{ExprIR, LiteralValue}; use crate::prelude::FunctionOptions; @@ -436,22 +435,3 @@ fn aexpr_to_skip_batch_predicate_rec( AExpr::Len => None, } } - -#[allow(clippy::type_complexity)] -fn get_binary_expr_col_and_lv<'a>( - left: Node, - right: Node, - expr_arena: &'a Arena, - schema: &Schema, -) -> Option<((&'a PlSmallStr, Node), (Cow<'a, LiteralValue>, Node))> { - match ( - into_column(left, expr_arena, schema, 0), - into_column(right, expr_arena, schema, 0), - constant_evaluate(left, expr_arena, schema, 0), - constant_evaluate(right, expr_arena, schema, 0), - ) { - (Some(col), _, _, Some(lv)) => Some(((col, left), (lv, right))), - (_, Some(col), Some(lv), _) => Some(((col, right), (lv, left))), - _ => None, - } -} diff --git a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs index 1b6c75fc619c..8d31d025b3a6 100644 --- a/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs @@ -781,8 +781,8 @@ impl RowGroupDecoder { fn decode_column_prefiltered( arrow_field: &ArrowField, row_group_data: &RowGroupData, - prefilter_cost: f64, - prefilter_setting: &PrefilterMaskSetting, + _prefilter_cost: f64, + _prefilter_setting: &PrefilterMaskSetting, mask: &BooleanChunked, mask_bitmap: &Bitmap, expected_num_rows: usize, @@ -811,7 +811,7 @@ fn decode_column_prefiltered( }) .collect::>(); - let prefilter = prefilter_setting.should_prefilter(prefilter_cost, &arrow_field.dtype); + let prefilter = !arrow_field.dtype.is_nested(); let deserialize_filter = prefilter.then(|| polars_parquet::read::Filter::Mask(mask_bitmap.clone()));