Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,21 +949,15 @@ impl PhysicalPlanner {
))
}
OpStruct::NativeScan(scan) => {
let data_types = scan.fields.iter().map(to_arrow_datatype).collect_vec();

println!("NATIVE: SCAN: {:?}", scan);
let data_schema = parse_message_type(&*scan.data_schema).unwrap();
let required_schema = parse_message_type(&*scan.required_schema).unwrap();
println!("data_schema: {:?}", data_schema);
println!("required_schema: {:?}", required_schema);

let data_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(data_schema));
let data_schema_arrow = Arc::new(
parquet::arrow::schema::parquet_to_arrow_schema(&data_schema_descriptor, None)
.unwrap(),
);
println!("data_schema_arrow: {:?}", data_schema_arrow);

let required_schema_descriptor =
parquet::schema::types::SchemaDescriptor::new(Arc::new(required_schema));
Expand All @@ -974,8 +968,6 @@ impl PhysicalPlanner {
)
.unwrap(),
);
println!("required_schema_arrow: {:?}", required_schema_arrow);

assert!(!required_schema_arrow.fields.is_empty());

let mut projection_vector: Vec<usize> =
Expand All @@ -984,7 +976,6 @@ impl PhysicalPlanner {
required_schema_arrow.fields.iter().for_each(|field| {
projection_vector.push(data_schema_arrow.index_of(field.name()).unwrap());
});
println!("projection_vector: {:?}", projection_vector);

assert_eq!(projection_vector.len(), required_schema_arrow.fields.len());

Expand All @@ -1006,9 +997,6 @@ impl PhysicalPlanner {
))
});

println!("data_filters: {:?}", data_filters);
println!("test_data_filters: {:?}", test_data_filters);

let object_store_url = ObjectStoreUrl::local_filesystem();
let paths: Vec<Url> = scan
.path
Expand Down Expand Up @@ -2300,7 +2288,7 @@ mod tests {
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count);

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();
scans[0].set_input_batch(input_batch);

let session_ctx = SessionContext::new();
Expand Down Expand Up @@ -2381,7 +2369,7 @@ mod tests {
let input_array = DictionaryArray::new(keys, Arc::new(values));
let input_batch = InputBatch::Batch(vec![Arc::new(input_array)], row_count);

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();

// Scan's schema is determined by the input batch, so we need to set it before execution.
scans[0].set_input_batch(input_batch);
Expand Down Expand Up @@ -2453,7 +2441,7 @@ mod tests {
let op = create_filter(op_scan, 0);
let planner = PhysicalPlanner::default();

let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![]).unwrap();
let (mut scans, datafusion_plan) = planner.create_plan(&op, &mut vec![], 1).unwrap();

let scan = &mut scans[0];
scan.set_input_batch(InputBatch::EOF);
Expand Down
7 changes: 0 additions & 7 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
Expand Down Expand Up @@ -379,12 +378,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

let plan = exec_context.root_op.as_ref().unwrap();

println!(
"Executing partition {} of {}",
partition_id,
plan.output_partitioning().partition_count()
);

let stream = plan.execute(partition_id as usize, task_ctx)?;
exec_context.stream = Some(stream);
} else {
Expand Down
24 changes: 0 additions & 24 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2493,32 +2493,8 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
// Sink operators don't have children
result.clearChildren()

// scalastyle:off println
// System.out.println(op.simpleStringWithNodeId())
// System.out.println(scanTypes.asJava) // Spark types for output.
System.out.println(scan.output) // This is the names of the output columns.
// System.out.println(cometScan.requiredSchema); // This is the projected columns.
System.out.println(
scan.dataFilters
); // This is the filter expressions that have been pushed down.

val dataFilters = scan.dataFilters.map(exprToProto(_, scan.output))
nativeScanBuilder.addAllDataFilters(dataFilters.map(_.get).asJava)
// System.out.println(cometScan.relation.location.inputFiles(0))
// System.out.println(cometScan.partitionFilters);
// System.out.println(cometScan.relation.partitionSchema)
// System.out.println(cometScan.metadata);

// System.out.println("requiredSchema:")
// cometScan.requiredSchema.fields.foreach(field => {
// System.out.println(field.dataType)
// })

// System.out.println("relation.dataSchema:")
// cometScan.relation.dataSchema.fields.foreach(field => {
// System.out.println(field.dataType)
// })
// scalastyle:on println

val requiredSchemaParquet =
new SparkToParquetSchemaConverter(conf).convert(scan.requiredSchema)
Expand Down
Loading