diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 461936b40218..174421fcf835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -801,6 +801,16 @@ class MicroBatchExecution( case _ => false } val finalDataPlan = dataPlan transformUp { + // SPARK-53625: Propagate metadata columns through Projects + case p: Project if hasFileMetadata => + // Check if there is any metadata fields not in the output list + val newMetadata = p.metadataOutput.filterNot(p.outputSet.contains) + if (newMetadata.nonEmpty) { + // If so, add it to projection + p.copy(projectList = p.projectList ++ newMetadata) + } else { + p + } case l: LogicalRelation => var newRelation = l if (hasFileMetadata) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 880f1dd9af8f..c38113f50558 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType} class FileMetadataStructSuite extends QueryTest with SharedSparkSession { @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { assert(selectSingleRowDf.count() === 1) } } + + Seq("true", "false").foreach { sideCharPadding => + test(s"SPARK-53625: file metadata in streaming with char type, " + + s"sideCharPadding=$sideCharPadding") { + withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) { + withTempDir { dir => + import scala.jdk.CollectionConverters._ + + val metadata = new MetadataBuilder() + .putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)") + .build() + val charSchemaStruct = new StructType() + .add(StructField("char_col", StringType, metadata = metadata)) + + val data = Seq(Row("A"), Row("B")) + val df = spark.createDataFrame(data.asJava, charSchemaStruct) + df.coalesce(1).write.format("json") + .save(dir.getCanonicalPath + "/source/new-streaming-data") + + val streamDf = spark.readStream.format("json") + .schema(charSchemaStruct) + .load(dir.getCanonicalPath + "/source/new-streaming-data") + .select("*", "_metadata") + + val streamQuery0 = streamDf + .writeStream.format("json") + .option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint") + .trigger(Trigger.AvailableNow()) + .start(dir.getCanonicalPath + "/target/new-streaming-data") + + streamQuery0.awaitTermination() + assert(streamQuery0.lastProgress.numInputRows == 2L) + + val newDF = spark.read.format("json") + .load(dir.getCanonicalPath + "/target/new-streaming-data") + + val sourceFile = new File(dir, "/source/new-streaming-data").listFiles() + .filter(_.getName.endsWith(".json")).head + val sourceFileMetadata = Map( + METADATA_FILE_PATH -> sourceFile.toURI.toString, + METADATA_FILE_NAME -> sourceFile.getName, + METADATA_FILE_SIZE -> sourceFile.length(), + METADATA_FILE_BLOCK_START -> 0, + METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified()) + ) + + // SELECT * will have: char_col, _metadata of /source/new-streaming-data + assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata")) + // Verify the data is expected + checkAnswer( + newDF.select(col("char_col"), + col(METADATA_FILE_PATH), col(METADATA_FILE_NAME), + col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START), + col(METADATA_FILE_BLOCK_LENGTH), + // since we are writing _metadata to a json file, + // we should explicitly cast the column to timestamp type + to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))), + Seq( + Row( + "A", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)), + Row( + "B", + sourceFileMetadata(METADATA_FILE_PATH), + sourceFileMetadata(METADATA_FILE_NAME), + sourceFileMetadata(METADATA_FILE_SIZE), + sourceFileMetadata(METADATA_FILE_BLOCK_START), + sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH), + sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE), + Seq( + Row(sourceFileMetadata(METADATA_FILE_SIZE)), + Row(sourceFileMetadata(METADATA_FILE_SIZE))) + ) + checkAnswer( + newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH), + Seq( + Row(sourceFileMetadata(METADATA_FILE_PATH)), + Row(sourceFileMetadata(METADATA_FILE_PATH))) + ) + } + } + } + } }