Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}

private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
case r: DataSourceV2Relation => r.withMetadataColumns()
case s: ExposesMetadataColumns => s.withMetadataColumns()
case p: Project =>
p.copy(
projectList = p.metadataOutput ++ p.projectList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -432,3 +432,22 @@ object VirtualColumn {
val groupingIdName: String = "spark_grouping_id"
val groupingIdAttribute: UnresolvedAttribute = UnresolvedAttribute(groupingIdName)
}

/**
* The internal representation of the hidden metadata struct:
* set `__metadata_col` to `true` in AttributeReference metadata
* - apply() will create a metadata attribute reference
* - unapply() will check if an attribute reference is the metadata attribute reference
*/
object MetadataAttribute {
def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference =
AttributeReference(name, dataType, nullable,
new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, value = true).build())()

def unapply(attr: AttributeReference): Option[AttributeReference] = {
if (attr.metadata.contains(METADATA_COL_ATTR_KEY)
&& attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)) {
Some(attr)
} else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,10 @@ object LogicalPlanIntegrity {
checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan)
}
}

/**
* A logical plan node that can generate metadata columns
*/
trait ExposesMetadataColumns extends LogicalPlan {
def withMetadataColumns(): LogicalPlan
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
Expand All @@ -44,7 +44,7 @@ case class DataSourceV2Relation(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap)
extends LeafNode with MultiInstanceRelation with NamedRelation {
extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns {

import DataSourceV2Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -198,6 +199,9 @@ case class FileSourceScanExec(
disableBucketedScan: Boolean = false)
extends DataSourceScanExec {

lazy val metadataColumns: Seq[AttributeReference] =
output.collect { case MetadataAttribute(attr) => attr }

// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsColumnar: Boolean = {
Expand All @@ -216,7 +220,10 @@ case class FileSourceScanExec(
relation.fileFormat.vectorTypes(
requiredSchema = requiredSchema,
partitionSchema = relation.partitionSchema,
relation.sparkSession.sessionState.conf)
relation.sparkSession.sessionState.conf).map { vectorTypes =>
// for column-based file format, append metadata struct column's vector type classes if any
vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[OnHeapColumnVector].getName)
}

private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty

Expand Down Expand Up @@ -359,7 +366,11 @@ case class FileSourceScanExec(
@transient
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
// TODO: should be able to push filters containing metadata columns down to skip files
dataFilters.filterNot(_.references.exists {
case MetadataAttribute(_) => true
case _ => false
}).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}

override lazy val metadata: Map[String, String] = {
Expand Down Expand Up @@ -601,7 +612,8 @@ case class FileSourceScanExec(
}
}

new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions,
requiredSchema, metadataColumns)
}

/**
Expand Down Expand Up @@ -657,7 +669,8 @@ case class FileSourceScanExec(
val partitions =
FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes)

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
requiredSchema, metadataColumns)
}

// Filters unused DynamicPruningExpression expressions - one which has been replaced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts,
file.getModificationTime, file.getLen)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
Expand All @@ -48,7 +49,8 @@ object PartitionedFileUtil {
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts)
PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}


/**
Expand Down Expand Up @@ -171,6 +171,29 @@ trait FileFormat {
def supportFieldName(name: String): Boolean = true
}

object FileFormat {

val FILE_PATH = "file_path"

val FILE_NAME = "file_name"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering do we also plan to deprecate existing expression InputFileName in Spark?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I think we should, as InputFileName is really fragile and can't be used with join for example.


val FILE_SIZE = "file_size"

val FILE_MODIFICATION_TIME = "file_modification_time"

val METADATA_NAME = "_metadata"

// supported metadata struct fields for hadoop fs relation
val METADATA_STRUCT: StructType = new StructType()
.add(StructField(FILE_PATH, StringType))
.add(StructField(FILE_NAME, StringType))
.add(StructField(FILE_SIZE, LongType))
.add(StructField(FILE_MODIFICATION_TIME, TimestampType))

// create a file metadata struct col
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
}

/**
* The base class file format that is based on text file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ import java.io.{Closeable, FileNotFoundException, IOException}

import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.FileFormat._
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.NextIterator

/**
Expand All @@ -38,14 +45,17 @@ import org.apache.spark.util.NextIterator
* @param filePath URI of the file to read
* @param start the beginning offset (in bytes) of the block.
* @param length number of bytes to read.
* @param locations locality information (list of nodes that have the data).
* @param modificationTime The modification time of the input file, in milliseconds.
* @param fileSize The length of the input file (not the block), in bytes.
*/
case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
length: Long,
@transient locations: Array[String] = Array.empty) {
@transient locations: Array[String] = Array.empty,
modificationTime: Long = 0L,
fileSize: Long = 0L) {
override def toString: String = {
s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
}
Expand All @@ -57,7 +67,9 @@ case class PartitionedFile(
class FileScanRDD(
@transient private val sparkSession: SparkSession,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
@transient val filePartitions: Seq[FilePartition],
val readDataSchema: StructType,
val metadataColumns: Seq[AttributeReference] = Seq.empty)
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {

private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
Expand Down Expand Up @@ -103,6 +115,101 @@ class FileScanRDD(
context.killTaskIfInterrupted()
(currentIterator != null && currentIterator.hasNext) || nextIterator()
}

///////////////////////////
// FILE METADATA METHODS //
///////////////////////////

// a metadata internal row, will only be updated when the current file is changed
val metadataRow: InternalRow = new GenericInternalRow(metadataColumns.length)

// an unsafe projection to convert a joined internal row to an unsafe row
private lazy val projection = {
val joinedExpressions =
readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType)
UnsafeProjection.create(joinedExpressions)
}

/**
* For each partitioned file, metadata columns for each record in the file are exactly same.
* Only update metadata row when `currentFile` is changed.
*/
private def updateMetadataRow(): Unit = {
if (metadataColumns.nonEmpty && currentFile != null) {
val path = new Path(currentFile.filePath)
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
attr.name match {
case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString))
case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName))
case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
case FILE_MODIFICATION_TIME =>
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType is stored in microsecond
metadataRow.update(i, currentFile.modificationTime * 1000L)
}
}
}
}

/**
* Create a writable column vector containing all required metadata columns
*/
private def createMetadataColumnVector(c: ColumnarBatch): Array[WritableColumnVector] = {
val path = new Path(currentFile.filePath)
val filePathBytes = path.toString.getBytes
val fileNameBytes = path.getName.getBytes
var rowId = 0
metadataColumns.map(_.name).map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should already know how to fill column vector for each metadata column, so the pattern matching can be done outside of execution, and here it does not need to do pattern matching per batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per-batch should be fine to have some small overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree, it's not a huge issue as it's per batch not per row. But also think it's not hard to organize code as the most efficient way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the comments, really appreciate that!

but, we have to do something per batch right? since we cannot be sure of c.numRows (small file or last batch), and different file formats could have configurable different max rows per batch: Parquet, ORC.

unless we could have a ConstantColumnVector as you mentioned or something else and refer only c.numRows for each batch?

case FILE_PATH =>
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
rowId = 0
// use a tight-loop for better performance
while (rowId < c.numRows()) {
columnVector.putByteArray(rowId, filePathBytes)
rowId += 1
}
columnVector
Comment on lines +164 to +171
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like for each batch of input rows, we need to recreate new column vector onheap, and write the same constant values per each row (i.e. file path, file name, file size, etc). Just wondering the performance penalty when reading a large table, how big of table have we tested?

Maybe a simple optimization here is to come up with something like ConstantColumnVector, where for each row, all values are same, and we only need to save one copy of value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reviewing! make sense - also Bart mentioned something optimizing putByteArray for all rows: #34575 (comment)

case FILE_NAME =>
val columnVector = new OnHeapColumnVector(c.numRows(), StringType)
rowId = 0
// use a tight-loop for better performance
while (rowId < c.numRows()) {
columnVector.putByteArray(rowId, fileNameBytes)
rowId += 1
}
columnVector
case FILE_SIZE =>
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
columnVector.putLongs(0, c.numRows(), currentFile.fileSize)
columnVector
case FILE_MODIFICATION_TIME =>
val columnVector = new OnHeapColumnVector(c.numRows(), LongType)
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType is stored in microsecond
columnVector.putLongs(0, c.numRows(), currentFile.modificationTime * 1000L)
columnVector
}.toArray
}

/**
* Add metadata columns at the end of nextElement if needed.
* For different row implementations, use different methods to update and append.
*/
private def addMetadataColumnsIfNeeded(nextElement: Object): Object = {
if (metadataColumns.nonEmpty) {
nextElement match {
case c: ColumnarBatch =>
new ColumnarBatch(
Array.tabulate(c.numCols())(c.column) ++ createMetadataColumnVector(c),
c.numRows())
case u: UnsafeRow => projection.apply(new JoinedRow(u, metadataRow))
case i: InternalRow => new JoinedRow(i, metadataRow)
}
} else {
nextElement
}
}

def next(): Object = {
val nextElement = currentIterator.next()
// TODO: we should have a better separation of row based and batch based scan, so that we
Expand All @@ -118,7 +225,7 @@ class FileScanRDD(
}
inputMetrics.incRecordsRead(1)
}
nextElement
addMetadataColumnsIfNeeded(nextElement)
}

private def readCurrentFile(): Iterator[InternalRow] = {
Expand All @@ -134,6 +241,7 @@ class FileScanRDD(
private def nextIterator(): Boolean = {
if (files.hasNext) {
currentFile = files.next()
updateMetadataRow()
logInfo(s"Reading File $currentFile")
// Sets InputFileBlockHolder for the file block's information
InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length)
Expand Down Expand Up @@ -201,6 +309,7 @@ class FileScanRDD(
}
} else {
currentFile = null
updateMetadataRow()
InputFileBlockHolder.unset()
false
}
Expand Down
Loading