diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index b2940eabc5..c709021f1b 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -949,13 +949,8 @@ impl PhysicalPlanner { )) } OpStruct::NativeScan(scan) => { - let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec(); - - println!("NATIVE: SCAN: {:?}", scan); let data_schema = parse_message_type(&*scan.data_schema).unwrap(); let required_schema = parse_message_type(&*scan.required_schema).unwrap(); - println!("data_schema: {:?}", data_schema); - println!("required_schema: {:?}", required_schema); let data_schema_descriptor = parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema)); @@ -963,7 +958,6 @@ impl PhysicalPlanner { parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None) .unwrap(), ); - println!("data_schema_arrow: {:?}", data_schema_arrow); let required_schema_descriptor = parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema)); @@ -974,8 +968,6 @@ impl PhysicalPlanner { ) .unwrap(), ); - println!("required_schema_arrow: {:?}", required_schema_arrow); - assert!(!required_schema_arrow.fields.is_empty()); let mut projection_vector: Vec = @@ -984,7 +976,6 @@ impl PhysicalPlanner { required_schema_arrow.fields.iter().for_each(|field| { projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap()); }); - println!("projection_vector: {:?}", projection_vector); assert_eq!(projection_vector.len(), required_schema_arrow.fields.len()); @@ -1006,9 +997,6 @@ impl PhysicalPlanner { )) }); - println!("data_filters: {:?}", data_filters); - println!("test_data_filters: {:?}", test_data_filters); - let object_store_url = ObjectStoreUrl::local_filesystem(); let paths: Vec = scan .path @@ -2300,7 +2288,7 @@ mod tests { let input_array = DictionaryArray::new(keys, Arc::new(values)); let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count); - let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap(); + let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap(); scans[0].set_input_batch(input_batch); let session_ctx = SessionContext::new(); @@ -2381,7 +2369,7 @@ mod tests { let input_array = DictionaryArray::new(keys, Arc::new(values)); let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count); - let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap(); + let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap(); // Scan's schema is determined by the input batch, so we need to set it before execution. scans[0].set_input_batch(input_batch); @@ -2453,7 +2441,7 @@ mod tests { let op = create_filter(op_scan, 0); let planner = PhysicalPlanner::default(); - let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap(); + let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap(); let scan = &mut scans[0]; scan.set_input_batch(InputBatch::EOF); diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 83f9570e32..1dec5173b0 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -20,7 +20,6 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool}; use arrow::datatypes::DataType as ArrowDataType; use arrow_array::RecordBatch; -use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::{ execution::{ disk_manager::DiskManagerConfig, @@ -379,12 +378,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let plan = exec_context.root_op.as_ref().unwrap(); - println!( - "Executing partition {} of {}", - partition_id, - plan.output_partitioning().partition_count() - ); - let stream = plan.execute(partition_id as usize, task_ctx)?; exec_context.stream = Some(stream); } else { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 058c809c38..348f9c54cc 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2493,32 +2493,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim // Sink operators don't have children result.clearChildren() - // scalastyle:off println - // System.out.println(op.simpleStringWithNodeId()) - // System.out.println(scanTypes.asJava) // Spark types for output. - System.out.println(scan.output) // This is the names of the output columns. - // System.out.println(cometScan.requiredSchema); // This is the projected columns. - System.out.println( - scan.dataFilters - ); // This is the filter expressions that have been pushed down. - val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output)) nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava) - // System.out.println(cometScan.relation.location.inputFiles(0)) - // System.out.println(cometScan.partitionFilters); - // System.out.println(cometScan.relation.partitionSchema) - // System.out.println(cometScan.metadata); - - // System.out.println("requiredSchema:") - // cometScan.requiredSchema.fields.foreach(field => { - // System.out.println(field.dataType) - // }) - - // System.out.println("relation.dataSchema:") - // cometScan.relation.dataSchema.fields.foreach(field => { - // System.out.println(field.dataType) - // }) - // scalastyle:on println val requiredSchemaParquet = new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)