Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object PartitionedFileUtil {
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime * 1000L, file.getLen)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
Expand All @@ -50,7 +50,7 @@ object PartitionedFileUtil {
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime * 1000L, file.getLen)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}


/**
Expand Down Expand Up @@ -188,7 +188,7 @@ object FileFormat {
.add(StructField(FILE_PATH, StringType))
.add(StructField(FILE_NAME, StringType))
.add(StructField(FILE_SIZE, LongType))
.add(StructField(FILE_MODIFICATION_TIME, LongType))
.add(StructField(FILE_MODIFICATION_TIME, TimestampType))

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.NextIterator
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param modificationTime The modification time of the input file, in milliseconds.
* @param modificationTime The modification time of the input file, in microseconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can still put milliseconds here, as it matches file.getModificationTime. We can * 1000 in FileScanRDD when we set the value to the internal row.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure! done

* @param fileSize The length of the input file (not the block), in bytes.
*/
case class PartitionedFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources

import java.io.File
import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
Expand Down Expand Up @@ -101,13 +102,13 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
METADATA_FILE_PATH -> realF0.toURI.toString,
METADATA_FILE_NAME -> realF0.getName,
METADATA_FILE_SIZE -> realF0.length(),
METADATA_FILE_MODIFICATION_TIME -> realF0.lastModified()
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF0.lastModified())
)
val f1Metadata = Map(
METADATA_FILE_PATH -> realF1.toURI.toString,
METADATA_FILE_NAME -> realF1.getName,
METADATA_FILE_SIZE -> realF1.length(),
METADATA_FILE_MODIFICATION_TIME -> realF1.lastModified()
METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF1.lastModified())
)

f(df, f0Metadata, f1Metadata)
Expand Down Expand Up @@ -158,9 +159,9 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
df.select(
// substring of file name
substring(col(METADATA_FILE_NAME), 1, 3),
// convert timestamp in millis to unixtime and to date format
from_unixtime(col(METADATA_FILE_MODIFICATION_TIME).divide(lit(1000)), "yyyy-MM")
.as("_file_modification_date"),
// format timestamp
date_format(col(METADATA_FILE_MODIFICATION_TIME), "yyyy-MM")
.as("_file_modification_year_month"),
// convert to kb
col(METADATA_FILE_SIZE).divide(lit(1024)).as("_file_size_kb"),
// get the file format
Expand Down