diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index c5147d772e..a2a75bf985 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1095,11 +1095,9 @@ impl PhysicalPlanner { table_parquet_options.global.pushdown_filters = true; table_parquet_options.global.reorder_filters = true; - let mut builder = ParquetExecBuilder::new(file_scan_config) - .with_table_parquet_options(table_parquet_options) - .with_schema_adapter_factory( - Arc::new(CometSchemaAdapterFactory::default()), - ); + let mut builder = ParquetExecBuilder::new(file_scan_config) + .with_table_parquet_options(table_parquet_options) + .with_schema_adapter_factory(Arc::new(CometSchemaAdapterFactory::default())); if let Some(filter) = test_data_filters { builder = builder.with_predicate(filter); diff --git a/native/core/src/execution/datafusion/schema_adapter.rs b/native/core/src/execution/datafusion/schema_adapter.rs index 16d4b9d678..664f92d4ce 100644 --- a/native/core/src/execution/datafusion/schema_adapter.rs +++ b/native/core/src/execution/datafusion/schema_adapter.rs @@ -120,9 +120,9 @@ impl SchemaAdapter for CometSchemaAdapter { Ok(( Arc::new(SchemaMapping { - projected_table_schema: self.projected_table_schema.clone(), + projected_table_schema: Arc::::clone(&self.projected_table_schema), field_mappings, - table_schema: self.table_schema.clone(), + table_schema: Arc::::clone(&self.table_schema), }), projection, )) @@ -218,7 +218,7 @@ impl SchemaMapper for SchemaMapping { // Necessary to handle empty batches let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); - let schema = self.projected_table_schema.clone(); + let schema = Arc::::clone(&self.projected_table_schema); let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } @@ -259,7 +259,8 @@ impl SchemaMapper for SchemaMapping { EvalMode::Legacy, "UTC", false, - )?.into_array(batch_col.len()) + )? + .into_array(batch_col.len()) // and if that works, return the field and column. .map(|new_col| (new_col, table_field.clone())) }) diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index ffd3421671..afca606624 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -621,8 +621,10 @@ fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReade Ok(&mut get_batch_context(handle)?.batch_reader) } +/// # Safety +/// This function is inherently unsafe since it deals with raw pointers passed from JNI. #[no_mangle] -pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader( +pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReader( e: JNIEnv, _jclass: JClass, file_path: jstring, @@ -646,62 +648,66 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReade .unwrap() .with_batch_size(8192); // TODO: (ARROW NATIVE) Use batch size configured in JVM + let num_row_groups; + let mut total_rows: i64 = 0; //TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized, we need not do this. - let metadata = builder.metadata().clone(); - - let mut columns_to_read: Vec = Vec::new(); - let columns_to_read_array = JObjectArray::from_raw(required_columns); - let array_len = env.get_array_length(&columns_to_read_array)?; - let mut required_columns: Vec = Vec::new(); - for i in 0..array_len { - let p: JString = env - .get_object_array_element(&columns_to_read_array, i)? - .into(); - required_columns.push(env.get_string(&p)?.into()); - } - for (i, col) in metadata - .file_metadata() - .schema_descr() - .columns() - .iter() - .enumerate() { - for (_, required) in required_columns.iter().enumerate() { - if col.name().to_uppercase().eq(&required.to_uppercase()) { - columns_to_read.push(i); - break; + let metadata = builder.metadata(); + + let mut columns_to_read: Vec = Vec::new(); + let columns_to_read_array = JObjectArray::from_raw(required_columns); + let array_len = env.get_array_length(&columns_to_read_array)?; + let mut required_columns: Vec = Vec::new(); + for i in 0..array_len { + let p: JString = env + .get_object_array_element(&columns_to_read_array, i)? + .into(); + required_columns.push(env.get_string(&p)?.into()); + } + for (i, col) in metadata + .file_metadata() + .schema_descr() + .columns() + .iter() + .enumerate() + { + for required in required_columns.iter() { + if col.name().to_uppercase().eq(&required.to_uppercase()) { + columns_to_read.push(i); + break; + } } } - } - //TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs) - let mask = ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read); - // Set projection mask to read only root columns 1 and 2. - builder = builder.with_projection(mask); - - let mut row_groups_to_read: Vec = Vec::new(); - let mut total_rows: i64 = 0; - // get row groups - - for (i, rg) in metadata.row_groups().into_iter().enumerate() { - let rg_start = rg.file_offset().unwrap(); - let rg_end = rg_start + rg.compressed_size(); - if rg_start >= start && rg_end <= start + length { - row_groups_to_read.push(i); - total_rows += rg.num_rows(); + //TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs) + let mask = + ProjectionMask::leaves(metadata.file_metadata().schema_descr(), columns_to_read); + // Set projection mask to read only root columns 1 and 2. + + let mut row_groups_to_read: Vec = Vec::new(); + // get row groups - + for (i, rg) in metadata.row_groups().iter().enumerate() { + let rg_start = rg.file_offset().unwrap(); + let rg_end = rg_start + rg.compressed_size(); + if rg_start >= start && rg_end <= start + length { + row_groups_to_read.push(i); + total_rows += rg.num_rows(); + } } + num_row_groups = row_groups_to_read.len(); + builder = builder + .with_projection(mask) + .with_row_groups(row_groups_to_read.clone()) } // Build a sync parquet reader. - let batch_reader = builder - .with_row_groups(row_groups_to_read.clone()) - .build() - .unwrap(); + let batch_reader = builder.build().unwrap(); let ctx = BatchContext { batch_reader, current_batch: None, reader_state: ParquetReaderState::Init, - num_row_groups: row_groups_to_read.len() as i32, - total_rows: total_rows, + num_row_groups: num_row_groups as i32, + total_rows, }; let res = Box::new(ctx); Ok(Box::into_raw(res) as i64)