diff --git a/native/core/src/execution/operators/parquet_writer.rs b/native/core/src/execution/operators/parquet_writer.rs index 4a53ff51b8..b7b45aef99 100644 --- a/native/core/src/execution/operators/parquet_writer.rs +++ b/native/core/src/execution/operators/parquet_writer.rs @@ -17,6 +17,11 @@ //! Parquet writer operator for writing RecordBatches to Parquet files +use arrow::array::{ArrayRef, AsArray}; +use arrow::compute::{ + cast, lexsort_to_indices, partition, take, Partitions, SortColumn, SortOptions, +}; +use opendal::Operator; use std::{ any::Any, collections::HashMap, @@ -27,13 +32,11 @@ use std::{ sync::Arc, }; -use opendal::Operator; - use crate::execution::shuffle::CompressionCodec; use crate::parquet::parquet_support::{ create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs, }; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::{ @@ -72,6 +75,54 @@ enum ParquetWriter { ), } +fn needs_escaping(c: char) -> bool { + matches!( + c, + '"' | '#' | '%' | '\'' | '*' | '/' | ':' | '=' | '?' | '\\' | '\x7F' + ) || c.is_control() +} + +fn escape_partition_value(value: &str) -> String { + let mut result = String::with_capacity(value.len()); + for c in value.chars() { + if needs_escaping(c) { + result.push_str(&format!("%{:02X}", c as u32)); + } else { + result.push(c); + } + } + result +} + +fn build_partition_path( + batch: &RecordBatch, + row: usize, + partition_columns: &[String], + partition_indices: &[usize], +) -> String { + let mut path = String::new(); + for (name, &idx) in partition_columns.iter().zip(partition_indices.iter()) { + let value = get_partition_value(batch.column(idx), row); + if !path.is_empty() { + path.push('/'); + } + path.push_str(name); + path.push('='); + path.push_str(&escape_partition_value(&value.unwrap())); + } + path +} + +fn get_partition_value(array: &ArrayRef, row: usize) -> Result { + if array.is_null(row) { + Ok("__HIVE_DEFAULT_PARTITION__".to_string()) + } else { + // relying on arrow's cast op to get string representation + let string_array = cast(array, &DataType::Utf8)?; + Ok(string_array.as_string::().value(row).to_string()) + } +} + impl ParquetWriter { /// Write a RecordBatch to the underlying writer async fn write( @@ -209,6 +260,8 @@ pub struct ParquetWriterExec { metrics: ExecutionPlanMetricsSet, /// Cache for plan properties cache: PlanProperties, + // partition columns + partition_columns: Vec, } impl ParquetWriterExec { @@ -223,6 +276,7 @@ impl ParquetWriterExec { compression: CompressionCodec, partition_id: i32, column_names: Vec, + partition_columns: Vec, object_store_options: HashMap, ) -> Result { // Preserve the input's partitioning so each partition writes its own file @@ -247,6 +301,7 @@ impl ParquetWriterExec { object_store_options, metrics: ExecutionPlanMetricsSet::new(), cache, + partition_columns, }) } @@ -435,6 +490,7 @@ impl ExecutionPlan for ParquetWriterExec { self.compression.clone(), self.partition_id, self.column_names.clone(), + self.partition_columns.clone(), self.object_store_options.clone(), )?)), _ => Err(DataFusionError::Internal( @@ -445,23 +501,28 @@ impl ExecutionPlan for ParquetWriterExec { fn execute( &self, - partition: usize, + partition_size: usize, context: Arc, ) -> Result { use datafusion::physical_plan::metrics::MetricBuilder; // Create metrics for tracking write statistics - let files_written = MetricBuilder::new(&self.metrics).counter("files_written", partition); - let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition); - let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition); + let files_written = + MetricBuilder::new(&self.metrics).counter("files_written", partition_size); + let bytes_written = + MetricBuilder::new(&self.metrics).counter("bytes_written", partition_size); + let rows_written = + MetricBuilder::new(&self.metrics).counter("rows_written", partition_size); let runtime_env = context.runtime_env(); - let input = self.input.execute(partition, context)?; + let input = self.input.execute(partition_size, context)?; let input_schema = self.input.schema(); let work_dir = self.work_dir.clone(); let task_attempt_id = self.task_attempt_id; let compression = self.compression_to_parquet()?; let column_names = self.column_names.clone(); + let partition_cols = self.partition_columns.clone(); + let partition_id = self.partition_id; assert_eq!(input_schema.fields().len(), column_names.len()); @@ -474,96 +535,222 @@ impl ExecutionPlan for ParquetWriterExec { .collect(); let output_schema = Arc::new(arrow::datatypes::Schema::new(fields)); - // Generate part file name for this partition - // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename - let part_file = if let Some(attempt_id) = task_attempt_id { - format!( - "{}/part-{:05}-{:05}.parquet", - work_dir, self.partition_id, attempt_id - ) - } else { - format!("{}/part-{:05}.parquet", work_dir, self.partition_id) - }; - // Configure writer properties let props = WriterProperties::builder() .set_compression(compression) .build(); - - let object_store_options = self.object_store_options.clone(); - let mut writer = Self::create_arrow_writer( - &part_file, - Arc::clone(&output_schema), - props, - runtime_env, - &object_store_options, - )?; - + // Clone schema for use in async closure let schema_for_write = Arc::clone(&output_schema); - // Write batches - let write_task = async move { - let mut stream = input; - let mut total_rows = 0i64; + if !self.partition_columns.is_empty() { + // get partition col idx + let partition_indices: Vec = self + .partition_columns + .iter() + .map(|name| schema_for_write.index_of(name).unwrap()) + .collect(); + + // get all other col idx + let non_partition_indices: Vec = (0..schema_for_write.fields().len()) + .filter(|i| !partition_indices.contains(i)) + .collect(); + + let props = props.clone(); + + let write_task = async move { + let mut stream = input; + let mut total_rows = 0i64; + let mut writers: HashMap = HashMap::new(); + + while let Some(batch_result) = stream.try_next().await.transpose() { + let batch = batch_result?; + + total_rows += batch.num_rows() as i64; + + let renamed_batch = RecordBatch::try_new( + Arc::clone(&schema_for_write), + batch.columns().to_vec(), + )?; + + // sort batch by the partition columns and split them later to write into separate files + let sort_columns: Vec = partition_indices + .iter() + .map(|&idx| SortColumn { + values: Arc::clone(renamed_batch.column(idx)), + options: Some(SortOptions::default()), + }) + .collect(); + + // TODO : benchmark against row comparator + let sorted_indices = lexsort_to_indices(&sort_columns, None)?; + let sorted_batch = RecordBatch::try_new( + Arc::clone(&schema_for_write), + renamed_batch + .columns() + .iter() + .map(|col| take(col.as_ref(), &sorted_indices, None).unwrap()) + .collect(), + )?; + + let partition_columns: Vec = partition_indices + .iter() + .map(|&idx| Arc::clone(sorted_batch.column(idx))) + .collect(); + + let partition_ranges: Partitions = partition(&partition_columns)?; + + for partition_batch in partition_ranges.ranges() { + let partition_path: String = build_partition_path( + &sorted_batch, + partition_batch.start, + &partition_cols, + &partition_indices, + ); + + let record_batch: RecordBatch = sorted_batch + .slice( + partition_batch.start, + partition_batch.end - partition_batch.start, + ) + .project(&non_partition_indices) + .expect("cannot project partition columns"); + eprintln!("Partition path: {:?}", partition_path); + + let full_path_part_file = if let Some(attempt_id) = task_attempt_id { + format!( + "{}/{}/part-{:05}-{:05}.parquet", + work_dir, partition_path, partition_id, attempt_id + ) + } else { + format!( + "{}/{}/part-{:05}.parquet", + work_dir, partition_path, partition_id + ) + }; + eprintln!("full path: {:?}", full_path_part_file); + let write_schema = Arc::new(output_schema.project(&non_partition_indices)?); + + if !writers.contains_key(&partition_path) { + let writer = Self::create_arrow_writer( + &full_path_part_file, + Arc::clone(&write_schema), + props.clone(), + runtime_env.clone(), + &HashMap::new(), + )?; + writers.insert(partition_path.clone(), writer); + } + + // write data now + // TODO : Write success file in base dir after the writes are completed and also support dynamic partition overwrite + writers + .get_mut(&partition_path) + .unwrap() + .write(&record_batch) + .await + .map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch: {}", e)) + })?; + + let file_size = std::fs::metadata(&full_path_part_file) + .map(|m| m.len() as i64) + .unwrap_or(0); + + files_written.add(1); + bytes_written.add(file_size as usize); + rows_written.add(total_rows as usize); + } + } - while let Some(batch_result) = stream.try_next().await.transpose() { - let batch = batch_result?; + for (_, writer) in writers { + writer.close().await?; + } - // Track row count - total_rows += batch.num_rows() as i64; + Ok::<_, DataFusionError>(futures::stream::empty()) + }; - // Rename columns in the batch to match output schema - let renamed_batch = if !column_names.is_empty() { - RecordBatch::try_new(Arc::clone(&schema_for_write), batch.columns().to_vec()) + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(write_task).try_flatten(), + ))) + } else { + // Generate part file name for this partition + // If using FileCommitProtocol (work_dir is set), include task_attempt_id in the filename + let part_file = if let Some(attempt_id) = task_attempt_id { + format!( + "{}/part-{:05}-{:05}.parquet", + work_dir, self.partition_id, attempt_id + ) + } else { + format!("{}/part-{:05}.parquet", work_dir, self.partition_id) + }; + let mut writer = + Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props,runtime_env, + &HashMap::new(),)?; + + // Write batches + let write_task = async move { + let mut stream = input; + let mut total_rows = 0i64; + + while let Some(batch_result) = stream.try_next().await.transpose() { + let batch = batch_result?; + + // Track row count + total_rows += batch.num_rows() as i64; + + // Rename columns in the batch to match output schema + let renamed_batch = if !column_names.is_empty() { + RecordBatch::try_new( + Arc::clone(&schema_for_write), + batch.columns().to_vec(), + ) .map_err(|e| { DataFusionError::Execution(format!( "Failed to rename batch columns: {}", e )) })? - } else { - batch - }; - - writer.write(&renamed_batch).await.map_err(|e| { - DataFusionError::Execution(format!("Failed to write batch: {}", e)) - })?; - } - - writer.close().await.map_err(|e| { - DataFusionError::Execution(format!("Failed to close writer: {}", e)) - })?; + } else { + batch + }; - // Get file size - strip file:// prefix if present for local filesystem access - let local_path = part_file - .strip_prefix("file://") - .or_else(|| part_file.strip_prefix("file:")) - .unwrap_or(&part_file); - let file_size = std::fs::metadata(local_path) - .map(|m| m.len() as i64) - .unwrap_or(0); - - // Update metrics with write statistics - files_written.add(1); - bytes_written.add(file_size as usize); - rows_written.add(total_rows as usize); - - // Log metadata for debugging - eprintln!( - "Wrote Parquet file: path={}, size={}, rows={}", - part_file, file_size, total_rows - ); + writer.write(&renamed_batch).await.map_err(|e| { + DataFusionError::Execution(format!("Failed to write batch: {}", e)) + })?; + } - // Return empty stream to indicate completion - Ok::<_, DataFusionError>(futures::stream::empty()) - }; + writer.close().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to close writer: {}", e)) + })?; - // Execute the write task and create a stream that does not return any batches - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - futures::stream::once(write_task).try_flatten(), - ))) + // Get file size + let file_size = std::fs::metadata(&part_file) + .map(|m| m.len() as i64) + .unwrap_or(0); + + // Update metrics with write statistics + files_written.add(1); + bytes_written.add(file_size as usize); + rows_written.add(total_rows as usize); + + // Log metadata for debugging + eprintln!( + "Wrote Parquet file: path={}, size={}, rows={}", + part_file, file_size, total_rows + ); + + // Return empty stream to indicate completion + Ok::<_, DataFusionError>(futures::stream::empty()) + }; + + // Execute the write task and create a stream that does not return any batches + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(write_task).try_flatten(), + ))) + } } } @@ -809,6 +996,7 @@ mod tests { let output_path = "unused".to_string(); let work_dir = "hdfs://namenode:9000/user/test_parquet_writer_exec".to_string(); let column_names = vec!["id".to_string(), "name".to_string()]; + let partition_columns = vec!["id".to_string(), "name".to_string()]; let parquet_writer = ParquetWriterExec::try_new( memory_exec, @@ -819,6 +1007,7 @@ mod tests { CompressionCodec::None, 0, // partition_id column_names, + partition_columns, HashMap::new(), // object_store_options )?; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 44ff20a44f..c815148ef7 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1246,6 +1246,7 @@ impl PhysicalPlanner { codec, self.partition, writer.column_names.clone(), + writer.partition_columns.clone(), object_store_options, )?); diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 73c087cf36..ef98c3bc16 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -298,6 +298,8 @@ message ParquetWriter { // configuration value "spark.hadoop.fs.s3a.access.key" will be stored as "fs.s3a.access.key" in // the map. map object_store_options = 8; + // set of partition columns + repeated string partition_columns = 9; } enum AggregateMode { diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala index c98f8314a7..f86797f5b2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometDataWritingCommand.scala @@ -25,11 +25,13 @@ import java.util.Locale import scala.jdk.CollectionConverters._ import org.apache.spark.SparkException +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.comet.{CometNativeExec, CometNativeWriteExec} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, WriteFilesExec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.comet.{CometConf, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.withInfo @@ -63,8 +65,12 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec return Unsupported(Some("Bucketed writes are not supported")) } + if (SQLConf.get.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) { + return Unsupported(Some("Dynamic partition overwrite is not supported")) + } + if (cmd.partitionColumns.nonEmpty || cmd.staticPartitions.nonEmpty) { - return Unsupported(Some("Partitioned writes are not supported")) + return Incompatible(Some("Partitioned writes are highly experimental")) } val codec = parseCompressionCodec(cmd) @@ -129,6 +135,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec .setOutputPath(outputPath) .setCompression(codec) .addAllColumnNames(cmd.query.output.map(_.name).asJava) + .addAllPartitionColumns(cmd.partitionColumns.map(_.name).asJava) // Note: work_dir, job_id, and task_attempt_id will be set at execution time // in CometNativeWriteExec, as they depend on the Spark task context @@ -165,6 +172,14 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec val cmd = op.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand] val outputPath = cmd.outputPath.toString +// TODO : support dynamic partition overwrite + if (cmd.mode == SaveMode.Overwrite) { + val fs = cmd.outputPath.getFileSystem(SparkSession.active.sparkContext.hadoopConfiguration) + if (fs.exists(cmd.outputPath)) { + fs.delete(cmd.outputPath, true) + } + } + // Get the child plan from the WriteFilesExec or use the child directly val childPlan = op.child match { case writeFiles: WriteFilesExec => @@ -186,11 +201,7 @@ object CometDataWritingCommand extends CometOperatorSerde[DataWritingCommandExec committerClass.getConstructor(classOf[String], classOf[String], classOf[Boolean]) Some( constructor - .newInstance( - jobId, - outputPath, - java.lang.Boolean.FALSE // dynamicPartitionOverwrite = false for now - ) + .newInstance(jobId, outputPath, false: java.lang.Boolean) .asInstanceOf[org.apache.spark.internal.io.FileCommitProtocol]) } catch { case e: Exception => diff --git a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala index c4856c3cc2..7144cd9fb3 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row} import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometNativeWriteExec, CometScanExec} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, SparkPlan} import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -533,4 +534,95 @@ class CometParquetWriterSuite extends CometTestBase { rows } + test("partitioned parquet write") { + withTempPath { dir => + val outputPath = dir.getAbsolutePath + withTempPath { inputDir => + val inputPath = createTestData(inputDir) + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Halifax", + CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + + val df = spark.read.parquet(inputPath) + // Pick first column to partition by + val partCols = df.columns.take(3) + + val uniquePartitions = df.select(partCols.map(col): _*).distinct().collect() + val expectedPaths = uniquePartitions.map { row => + partCols.zipWithIndex + .map { case (colName, i) => + val value = + if (row.isNullAt(i)) "__HIVE_DEFAULT_PARTITION__" else row.get(i).toString + s"$colName=$value" + } + .mkString("/") + }.toSet + + df.write.partitionBy(partCols: _*).parquet(outputPath) + + val result = spark.read.parquet(outputPath) + val actualFiles = result.inputFiles + actualFiles.foreach { filePath => + val matchesPartition = expectedPaths.exists(p => filePath.contains(p)) + assert( + matchesPartition, + s"File $filePath doesn't match any expected partition: $expectedPaths") + } + // Verify data + assert(result.count() == 1000) + } + } + } + } + + test("partitioned write - data correctness per partition") { + withTempPath { dir => + val outputPath = new File(dir, "output").getAbsolutePath + + withTempPath { inputDir => + val inputPath = createTestData(inputDir) + + withSQLConf( + CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true", + CometConf.getOperatorAllowIncompatConfigKey( + classOf[DataWritingCommandExec]) -> "true") { + + val inputDf = spark.read.parquet(inputPath).filter(col("c1") <= lit(10)) + val partCols = inputDf.columns.take(2) + val col1 = partCols(0) + val col2 = partCols(1) + + inputDf.write.partitionBy(partCols: _*).parquet(outputPath) + + // unique combinations + val combinations = inputDf + .select(partCols.head, partCols.last) + .distinct() + .collect() + .map(r => (r.getBoolean(0), r.getByte(1))) + + combinations.foreach { tuple => + val val1 = tuple._1 + val val2 = tuple._2 + + val partitionPath = s"$outputPath/${partCols.head}=$val1/${partCols.last}=$val2" + + val actualDf = spark.read.parquet(partitionPath) + val expectedDf = inputDf + .filter(col(col1) === val1) + .filter(col(col2) === val2) + .drop(col1, col2) + + checkAnswer(actualDf, expectedDf) + } + + // Verify total count as well + checkAnswer(spark.read.parquet(outputPath), inputDf) + } + } + } + } }