@@ -27,10 +27,11 @@ use arrow_array::RecordBatch;
2727use arrow_schema:: { Schema , SchemaRef , SortOptions } ;
2828use bytes:: Bytes ;
2929use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
30+ use datafusion:: catalog:: Session ;
3031use datafusion:: common:: stats:: Precision ;
3132use datafusion:: logical_expr:: utils:: conjunction;
3233use datafusion:: {
33- catalog:: schema :: SchemaProvider ,
34+ catalog:: SchemaProvider ,
3435 common:: {
3536 tree_node:: { TreeNode , TreeNodeRecursion } ,
3637 ToDFSchema ,
@@ -122,7 +123,7 @@ async fn create_parquet_physical_plan(
122123 projection : Option < & Vec < usize > > ,
123124 filters : & [ Expr ] ,
124125 limit : Option < usize > ,
125- state : & SessionState ,
126+ state : & dyn Session ,
126127 time_partition : Option < String > ,
127128) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
128129 let filters = if let Some ( expr) = conjunction ( filters. to_vec ( ) ) {
@@ -149,7 +150,7 @@ async fn create_parquet_physical_plan(
149150 // create the execution plan
150151 let plan = file_format
151152 . create_physical_plan (
152- state,
153+ state. as_any ( ) . downcast_ref :: < SessionState > ( ) . unwrap ( ) , // Remove this when ParquetFormat catches up
153154 FileScanConfig {
154155 object_store_url,
155156 file_schema : schema. clone ( ) ,
@@ -216,8 +217,8 @@ async fn collect_from_snapshot(
216217fn partitioned_files (
217218 manifest_files : Vec < catalog:: manifest:: File > ,
218219 table_schema : & Schema ,
219- target_partition : usize ,
220220) -> ( Vec < Vec < PartitionedFile > > , datafusion:: common:: Statistics ) {
221+ let target_partition = num_cpus:: get ( ) ;
221222 let mut partitioned_files = Vec :: from_iter ( ( 0 ..target_partition) . map ( |_| Vec :: new ( ) ) ) ;
222223 let mut column_statistics = HashMap :: < String , Option < catalog:: column:: TypedStatistics > > :: new ( ) ;
223224 let mut count = 0 ;
@@ -288,7 +289,7 @@ impl TableProvider for StandardTableProvider {
288289
289290 async fn scan (
290291 & self ,
291- state : & SessionState ,
292+ state : & dyn Session ,
292293 projection : Option < & Vec < usize > > ,
293294 filters : & [ Expr ] ,
294295 limit : Option < usize > ,
@@ -435,7 +436,7 @@ impl TableProvider for StandardTableProvider {
435436 ) ;
436437 }
437438
438- let ( partitioned_files, statistics) = partitioned_files ( manifest_files, & self . schema , 1 ) ;
439+ let ( partitioned_files, statistics) = partitioned_files ( manifest_files, & self . schema ) ;
439440 let remote_exec = create_parquet_physical_plan (
440441 ObjectStoreUrl :: parse ( glob_storage. store_url ( ) ) . unwrap ( ) ,
441442 partitioned_files,
@@ -496,7 +497,7 @@ async fn get_cache_exectuion_plan(
496497 projection : Option < & Vec < usize > > ,
497498 filters : & [ Expr ] ,
498499 limit : Option < usize > ,
499- state : & SessionState ,
500+ state : & dyn Session ,
500501 time_partition : Option < String > ,
501502) -> Result < Option < Arc < dyn ExecutionPlan > > , DataFusionError > {
502503 let ( cached, remainder) = cache_manager
@@ -519,7 +520,7 @@ async fn get_cache_exectuion_plan(
519520 } )
520521 . collect ( ) ;
521522
522- let ( partitioned_files, statistics) = partitioned_files ( cached, & schema, 1 ) ;
523+ let ( partitioned_files, statistics) = partitioned_files ( cached, & schema) ;
523524 let plan = create_parquet_physical_plan (
524525 ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
525526 partitioned_files,
@@ -545,7 +546,7 @@ async fn get_hottier_exectuion_plan(
545546 projection : Option < & Vec < usize > > ,
546547 filters : & [ Expr ] ,
547548 limit : Option < usize > ,
548- state : & SessionState ,
549+ state : & dyn Session ,
549550 time_partition : Option < String > ,
550551) -> Result < Option < Arc < dyn ExecutionPlan > > , DataFusionError > {
551552 let ( hot_tier_files, remainder) = hot_tier_manager
@@ -570,7 +571,7 @@ async fn get_hottier_exectuion_plan(
570571 } )
571572 . collect ( ) ;
572573
573- let ( partitioned_files, statistics) = partitioned_files ( hot_tier_files, & schema, 1 ) ;
574+ let ( partitioned_files, statistics) = partitioned_files ( hot_tier_files, & schema) ;
574575 let plan = create_parquet_physical_plan (
575576 ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
576577 partitioned_files,
@@ -594,7 +595,7 @@ async fn legacy_listing_table(
594595 object_store : Arc < dyn ObjectStore > ,
595596 time_filters : & [ PartialTimeFilter ] ,
596597 schema : Arc < Schema > ,
597- state : & SessionState ,
598+ state : & dyn Session ,
598599 projection : Option < & Vec < usize > > ,
599600 filters : & [ Expr ] ,
600601 limit : Option < usize > ,
@@ -868,10 +869,7 @@ fn extract_timestamp_bound(
868869 binexpr : BinaryExpr ,
869870 time_partition : Option < String > ,
870871) -> Option < ( Operator , NaiveDateTime ) > {
871- Some ( (
872- binexpr. op . clone ( ) ,
873- extract_from_lit ( binexpr, time_partition) ?,
874- ) )
872+ Some ( ( binexpr. op , extract_from_lit ( binexpr, time_partition) ?) )
875873}
876874
877875async fn collect_manifest_files (
@@ -942,7 +940,7 @@ trait ManifestExt: ManifestFile {
942940 return None ;
943941 } ;
944942 /* `BinaryExp` doesn't implement `Copy` */
945- Some ( ( expr. op . clone ( ) , value) )
943+ Some ( ( expr. op , value) )
946944 }
947945
948946 let Some ( col) = self . find_matching_column ( partial_filter) else {
0 commit comments