Skip to content

Commit 374fcec

Browse files
authored
cherry-pick inlist fix (#17254)
1 parent d1a6e9a commit 374fcec

File tree

2 files changed

+45
-24
lines changed

2 files changed

+45
-24
lines changed

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -611,29 +611,6 @@ impl protobuf::PhysicalPlanNode {
611611
) -> Result<Arc<dyn ExecutionPlan>> {
612612
let input: Arc<dyn ExecutionPlan> =
613613
into_physical_plan(&filter.input, registry, runtime, extension_codec)?;
614-
let projection = if !filter.projection.is_empty() {
615-
Some(
616-
filter
617-
.projection
618-
.iter()
619-
.map(|i| *i as usize)
620-
.collect::<Vec<_>>(),
621-
)
622-
} else {
623-
None
624-
};
625-
626-
// Use the projected schema if projection is present, otherwise use the full schema
627-
let predicate_schema = if let Some(ref proj_indices) = projection {
628-
// Create projected schema for parsing the predicate
629-
let projected_fields: Vec<_> = proj_indices
630-
.iter()
631-
.map(|&i| input.schema().field(i).clone())
632-
.collect();
633-
Arc::new(Schema::new(projected_fields))
634-
} else {
635-
input.schema()
636-
};
637614

638615
let predicate = filter
639616
.expr
@@ -642,7 +619,7 @@ impl protobuf::PhysicalPlanNode {
642619
parse_physical_expr(
643620
expr,
644621
registry,
645-
predicate_schema.as_ref(),
622+
input.schema().as_ref(),
646623
extension_codec,
647624
)
648625
})
@@ -653,6 +630,17 @@ impl protobuf::PhysicalPlanNode {
653630
)
654631
})?;
655632
let filter_selectivity = filter.default_filter_selectivity.try_into();
633+
let projection = if !filter.projection.is_empty() {
634+
Some(
635+
filter
636+
.projection
637+
.iter()
638+
.map(|i| *i as usize)
639+
.collect::<Vec<_>>(),
640+
)
641+
} else {
642+
None
643+
};
656644
let filter =
657645
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
658646
match filter_selectivity {

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1839,3 +1839,36 @@ async fn test_round_trip_tpch_queries() -> Result<()> {
18391839

18401840
Ok(())
18411841
}
1842+
1843+
#[tokio::test]
1844+
async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> {
1845+
use datafusion_common::test_util::datafusion_test_data;
1846+
1847+
let ctx = SessionContext::new();
1848+
1849+
// Register the TPC-H part table using the local test data
1850+
let test_data = datafusion_test_data();
1851+
let table_sql = format!(
1852+
"CREATE EXTERNAL TABLE part STORED AS PARQUET LOCATION '{test_data}/tpch_part_small.parquet'"
1853+
);
1854+
ctx.sql(&table_sql).await.map_err(|e| {
1855+
DataFusionError::External(format!("Failed to create part table: {e}").into())
1856+
})?;
1857+
1858+
// Test the exact problematic query
1859+
let sql =
1860+
"SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31) and p_partkey > 1000";
1861+
1862+
let logical_plan = ctx.sql(sql).await?.into_unoptimized_plan();
1863+
let optimized_plan = ctx.state().optimize(&logical_plan)?;
1864+
let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?;
1865+
1866+
// Serialize the physical plan - bug may happen here already but not necessarily manifests
1867+
let codec = DefaultPhysicalExtensionCodec {};
1868+
let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?;
1869+
1870+
// This will fail with the bug, but should succeed when fixed
1871+
let _deserialized_plan =
1872+
proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?;
1873+
Ok(())
1874+
}

0 commit comments

Comments
 (0)