Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,16 @@ class MicroBatchExecution(
case _ => false
}
val finalDataPlan = dataPlan transformUp {
// SPARK-53625: Propagate metadata columns through Projects
case p: Project if hasFileMetadata =>
// Check if there is any metadata fields not in the output list
val newMetadata = p.metadataOutput.filterNot(p.outputSet.contains)
if (newMetadata.nonEmpty) {
// If so, add it to projection
p.copy(projectList = p.projectList ++ newMetadata)
} else {
p
}
case l: LogicalRelation =>
var newRelation = l
if (hasFileMetadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}

class FileMetadataStructSuite extends QueryTest with SharedSparkSession {

Expand Down Expand Up @@ -1133,4 +1133,98 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
assert(selectSingleRowDf.count() === 1)
}
}

Seq("true", "false").foreach { sideCharPadding =>
test(s"SPARK-53625: file metadata in streaming with char type, " +
s"sideCharPadding=$sideCharPadding") {
withSQLConf(SQLConf.READ_SIDE_CHAR_PADDING.key -> sideCharPadding) {
withTempDir { dir =>
import scala.jdk.CollectionConverters._

val metadata = new MetadataBuilder()
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(1)")
.build()
val charSchemaStruct = new StructType()
.add(StructField("char_col", StringType, metadata = metadata))

val data = Seq(Row("A"), Row("B"))
val df = spark.createDataFrame(data.asJava, charSchemaStruct)
df.coalesce(1).write.format("json")
.save(dir.getCanonicalPath + "/source/new-streaming-data")

val streamDf = spark.readStream.format("json")
.schema(charSchemaStruct)
.load(dir.getCanonicalPath + "/source/new-streaming-data")
.select("*", "_metadata")

val streamQuery0 = streamDf
.writeStream.format("json")
.option("checkpointLocation", dir.getCanonicalPath + "/target/checkpoint")
.trigger(Trigger.AvailableNow())
.start(dir.getCanonicalPath + "/target/new-streaming-data")

streamQuery0.awaitTermination()
assert(streamQuery0.lastProgress.numInputRows == 2L)

val newDF = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data")

val sourceFile = new File(dir, "/source/new-streaming-data").listFiles()
.filter(_.getName.endsWith(".json")).head
val sourceFileMetadata = Map(
METADATA_FILE_PATH -> sourceFile.toURI.toString,
METADATA_FILE_NAME -> sourceFile.getName,
METADATA_FILE_SIZE -> sourceFile.length(),
METADATA_FILE_BLOCK_START -> 0,
METADATA_FILE_BLOCK_LENGTH -> sourceFile.length(),
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(sourceFile.lastModified())
)

// SELECT * will have: char_col, _metadata of /source/new-streaming-data
assert(newDF.select("*").columns.toSet == Set("char_col", "_metadata"))
// Verify the data is expected
checkAnswer(
newDF.select(col("char_col"),
col(METADATA_FILE_PATH), col(METADATA_FILE_NAME),
col(METADATA_FILE_SIZE), col(METADATA_FILE_BLOCK_START),
col(METADATA_FILE_BLOCK_LENGTH),
// since we are writing _metadata to a json file,
// we should explicitly cast the column to timestamp type
to_timestamp(col(METADATA_FILE_MODIFICATION_TIME))),
Seq(
Row(
"A",
sourceFileMetadata(METADATA_FILE_PATH),
sourceFileMetadata(METADATA_FILE_NAME),
sourceFileMetadata(METADATA_FILE_SIZE),
sourceFileMetadata(METADATA_FILE_BLOCK_START),
sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME)),
Row(
"B",
sourceFileMetadata(METADATA_FILE_PATH),
sourceFileMetadata(METADATA_FILE_NAME),
sourceFileMetadata(METADATA_FILE_SIZE),
sourceFileMetadata(METADATA_FILE_BLOCK_START),
sourceFileMetadata(METADATA_FILE_BLOCK_LENGTH),
sourceFileMetadata(METADATA_FILE_MODIFICATION_TIME))
)
)

checkAnswer(
newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.where(s"$METADATA_FILE_SIZE > 0")

I guess this is just a sanity check, right? Is this ever possible where a row is mapped to some file while the file has size 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just a sanity check

Seq(
Row(sourceFileMetadata(METADATA_FILE_SIZE)),
Row(sourceFileMetadata(METADATA_FILE_SIZE)))
)
checkAnswer(
newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH),
Seq(
Row(sourceFileMetadata(METADATA_FILE_PATH)),
Row(sourceFileMetadata(METADATA_FILE_PATH)))
)
}
}
}
}
}