diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a82314c77ede..99efd40f70b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -972,7 +972,7 @@ class Analyzer(override val catalogManager: CatalogManager) } private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match { - case r: DataSourceV2Relation => r.withMetadataColumns() + case s: ExposesMetadataColumns => s.withMetadataColumns() case p: Project => p.copy( projectList = p.metadataOutput ++ p.projectList, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 0dd7dc131812..c51030fdd640 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.trees.TreePattern import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util.quoteIfNeeded +import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.util.collection.BitSet @@ -432,3 +432,22 @@ object VirtualColumn { val groupingIdName: String = "spark_grouping_id" val groupingIdAttribute: UnresolvedAttribute = UnresolvedAttribute(groupingIdName) } + +/** + * The internal representation of the hidden metadata struct: + * set `__metadata_col` to `true` in AttributeReference metadata + * - apply() will create a metadata attribute reference + * - unapply() will check if an attribute reference is the metadata attribute reference + */ +object MetadataAttribute { + def apply(name: String, dataType: DataType, nullable: Boolean = true): AttributeReference = + AttributeReference(name, dataType, nullable, + new MetadataBuilder().putBoolean(METADATA_COL_ATTR_KEY, value = true).build())() + + def unapply(attr: AttributeReference): Option[AttributeReference] = { + if (attr.metadata.contains(METADATA_COL_ATTR_KEY) + && attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)) { + Some(attr) + } else None + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4aa7bf1c4f9a..49634a2a0eb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -277,3 +277,10 @@ object LogicalPlanIntegrity { checkIfSameExprIdNotReused(plan) && hasUniqueExprIdsForOutput(plan) } } + +/** + * A logical plan node that can generate metadata columns + */ +trait ExposesMetadataColumns extends LogicalPlan { + def withMetadataColumns(): LogicalPlan +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index eff0cd8364a8..d93864991fc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability} import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics} @@ -44,7 +44,7 @@ case class DataSourceV2Relation( catalog: Option[CatalogPlugin], identifier: Option[Identifier], options: CaseInsensitiveStringMap) - extends LeafNode with MultiInstanceRelation with NamedRelation { + extends LeafNode with MultiInstanceRelation with NamedRelation with ExposesMetadataColumns { import DataSourceV2Implicits._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 930456680127..79f2b981b649 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType @@ -198,6 +199,9 @@ case class FileSourceScanExec( disableBucketedScan: Boolean = false) extends DataSourceScanExec { + lazy val metadataColumns: Seq[AttributeReference] = + output.collect { case MetadataAttribute(attr) => attr } + // Note that some vals referring the file-based relation are lazy intentionally // so that this plan can be canonicalized on executor side too. See SPARK-23731. override lazy val supportsColumnar: Boolean = { @@ -216,7 +220,10 @@ case class FileSourceScanExec( relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, partitionSchema = relation.partitionSchema, - relation.sparkSession.sessionState.conf) + relation.sparkSession.sessionState.conf).map { vectorTypes => + // for column-based file format, append metadata struct column's vector type classes if any + vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[OnHeapColumnVector].getName) + } private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty @@ -359,7 +366,11 @@ case class FileSourceScanExec( @transient private lazy val pushedDownFilters = { val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) - dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) + // TODO: should be able to push filters containing metadata columns down to skip files + dataFilters.filterNot(_.references.exists { + case MetadataAttribute(_) => true + case _ => false + }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) } override lazy val metadata: Map[String, String] = { @@ -601,7 +612,8 @@ case class FileSourceScanExec( } } - new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, + requiredSchema, metadataColumns) } /** @@ -657,7 +669,8 @@ case class FileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - new FileScanRDD(fsRelation.sparkSession, readFile, partitions) + new FileScanRDD(fsRelation.sparkSession, readFile, partitions, + requiredSchema, metadataColumns) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 7dece29eb021..4cccd4132e91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -36,7 +36,8 @@ object PartitionedFileUtil { val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(getBlockLocations(file), offset, size) - PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts) + PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts, + file.getModificationTime, file.getLen) } } else { Seq(getPartitionedFile(file, filePath, partitionValues)) @@ -48,7 +49,8 @@ object PartitionedFileUtil { filePath: Path, partitionValues: InternalRow): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen) - PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts) + PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts, + file.getModificationTime, file.getLen) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index beb1f4d38e8b..c3bcf06b6e5f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType} /** @@ -171,6 +171,29 @@ trait FileFormat { def supportFieldName(name: String): Boolean = true } +object FileFormat { + + val FILE_PATH = "file_path" + + val FILE_NAME = "file_name" + + val FILE_SIZE = "file_size" + + val FILE_MODIFICATION_TIME = "file_modification_time" + + val METADATA_NAME = "_metadata" + + // supported metadata struct fields for hadoop fs relation + val METADATA_STRUCT: StructType = new StructType() + .add(StructField(FILE_PATH, StringType)) + .add(StructField(FILE_NAME, StringType)) + .add(StructField(FILE_SIZE, LongType)) + .add(StructField(FILE_MODIFICATION_TIME, TimestampType)) + + // create a file metadata struct col + def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT) +} + /** * The base class file format that is based on text file. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 2268a24b5cce..47f279babef5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -21,13 +21,20 @@ import java.io.{Closeable, FileNotFoundException, IOException} import scala.util.control.NonFatal +import org.apache.hadoop.fs.Path + import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, JoinedRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.datasources.FileFormat._ +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.types.{LongType, StringType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator /** @@ -38,14 +45,17 @@ import org.apache.spark.util.NextIterator * @param filePath URI of the file to read * @param start the beginning offset (in bytes) of the block. * @param length number of bytes to read. - * @param locations locality information (list of nodes that have the data). + * @param modificationTime The modification time of the input file, in milliseconds. + * @param fileSize The length of the input file (not the block), in bytes. */ case class PartitionedFile( partitionValues: InternalRow, filePath: String, start: Long, length: Long, - @transient locations: Array[String] = Array.empty) { + @transient locations: Array[String] = Array.empty, + modificationTime: Long = 0L, + fileSize: Long = 0L) { override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" } @@ -57,7 +67,9 @@ case class PartitionedFile( class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], - @transient val filePartitions: Seq[FilePartition]) + @transient val filePartitions: Seq[FilePartition], + val readDataSchema: StructType, + val metadataColumns: Seq[AttributeReference] = Seq.empty) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles @@ -103,6 +115,101 @@ class FileScanRDD( context.killTaskIfInterrupted() (currentIterator != null && currentIterator.hasNext) || nextIterator() } + + /////////////////////////// + // FILE METADATA METHODS // + /////////////////////////// + + // a metadata internal row, will only be updated when the current file is changed + val metadataRow: InternalRow = new GenericInternalRow(metadataColumns.length) + + // an unsafe projection to convert a joined internal row to an unsafe row + private lazy val projection = { + val joinedExpressions = + readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType) + UnsafeProjection.create(joinedExpressions) + } + + /** + * For each partitioned file, metadata columns for each record in the file are exactly same. + * Only update metadata row when `currentFile` is changed. + */ + private def updateMetadataRow(): Unit = { + if (metadataColumns.nonEmpty && currentFile != null) { + val path = new Path(currentFile.filePath) + metadataColumns.zipWithIndex.foreach { case (attr, i) => + attr.name match { + case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString)) + case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName)) + case FILE_SIZE => metadataRow.update(i, currentFile.fileSize) + case FILE_MODIFICATION_TIME => + // the modificationTime from the file is in millisecond, + // while internally, the TimestampType is stored in microsecond + metadataRow.update(i, currentFile.modificationTime * 1000L) + } + } + } + } + + /** + * Create a writable column vector containing all required metadata columns + */ + private def createMetadataColumnVector(c: ColumnarBatch): Array[WritableColumnVector] = { + val path = new Path(currentFile.filePath) + val filePathBytes = path.toString.getBytes + val fileNameBytes = path.getName.getBytes + var rowId = 0 + metadataColumns.map(_.name).map { + case FILE_PATH => + val columnVector = new OnHeapColumnVector(c.numRows(), StringType) + rowId = 0 + // use a tight-loop for better performance + while (rowId < c.numRows()) { + columnVector.putByteArray(rowId, filePathBytes) + rowId += 1 + } + columnVector + case FILE_NAME => + val columnVector = new OnHeapColumnVector(c.numRows(), StringType) + rowId = 0 + // use a tight-loop for better performance + while (rowId < c.numRows()) { + columnVector.putByteArray(rowId, fileNameBytes) + rowId += 1 + } + columnVector + case FILE_SIZE => + val columnVector = new OnHeapColumnVector(c.numRows(), LongType) + columnVector.putLongs(0, c.numRows(), currentFile.fileSize) + columnVector + case FILE_MODIFICATION_TIME => + val columnVector = new OnHeapColumnVector(c.numRows(), LongType) + // the modificationTime from the file is in millisecond, + // while internally, the TimestampType is stored in microsecond + columnVector.putLongs(0, c.numRows(), currentFile.modificationTime * 1000L) + columnVector + }.toArray + } + + /** + * Add metadata columns at the end of nextElement if needed. + * For different row implementations, use different methods to update and append. + */ + private def addMetadataColumnsIfNeeded(nextElement: Object): Object = { + if (metadataColumns.nonEmpty) { + nextElement match { + case c: ColumnarBatch => + new ColumnarBatch( + Array.tabulate(c.numCols())(c.column) ++ createMetadataColumnVector(c), + c.numRows()) + case u: UnsafeRow => projection.apply(new JoinedRow(u, metadataRow)) + case i: InternalRow => new JoinedRow(i, metadataRow) + } + } else { + nextElement + } + } + def next(): Object = { val nextElement = currentIterator.next() // TODO: we should have a better separation of row based and batch based scan, so that we @@ -118,7 +225,7 @@ class FileScanRDD( } inputMetrics.incRecordsRead(1) } - nextElement + addMetadataColumnsIfNeeded(nextElement) } private def readCurrentFile(): Iterator[InternalRow] = { @@ -134,6 +241,7 @@ class FileScanRDD( private def nextIterator(): Boolean = { if (files.hasNext) { currentFile = files.next() + updateMetadataRow() logInfo(s"Reading File $currentFile") // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) @@ -201,6 +309,7 @@ class FileScanRDD( } } else { currentFile = null + updateMetadataRow() InputFileBlockHolder.unset() false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 1bfde7515dc9..c1282fa69ca8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} -import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME +import org.apache.spark.sql.types.{DoubleType, FloatType, StructType} import org.apache.spark.util.collection.BitSet /** @@ -212,7 +213,19 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { val outputSchema = readDataColumns.toStructType logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}") - val outputAttributes = readDataColumns ++ partitionColumns + val metadataStructOpt = requiredAttributes.collectFirst { + case MetadataAttribute(attr) => attr + } + + // TODO (yaohua): should be able to prune the metadata struct only containing what needed + val metadataColumns = metadataStructOpt.map { metadataStruct => + metadataStruct.dataType.asInstanceOf[StructType].fields.map { field => + MetadataAttribute(field.name, field.dataType) + }.toSeq + }.getOrElse(Seq.empty) + + // outputAttributes should also include the metadata columns at the very end + val outputAttributes = readDataColumns ++ partitionColumns ++ metadataColumns val scan = FileSourceScanExec( @@ -225,8 +238,18 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { dataFilters, table.map(_.identifier)) + // extra Project node: wrap flat metadata columns to a metadata struct + val withMetadataProjections = metadataStructOpt.map { metadataStruct => + val metadataAlias = + Alias(CreateStruct(metadataColumns), METADATA_NAME)(exprId = metadataStruct.exprId) + execution.ProjectExec( + scan.output.dropRight(metadataColumns.length) :+ metadataAlias, scan) + }.getOrElse(scan) + val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) - val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) + val withFilter = afterScanFilter + .map(execution.FilterExec(_, withMetadataProjections)) + .getOrElse(withMetadataProjections) val withProjections = if (projects == withFilter.output) { withFilter } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index cfe5f046bd7b..291b98fb37c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils} import org.apache.spark.sql.sources.BaseRelation @@ -32,7 +32,7 @@ case class LogicalRelation( output: Seq[AttributeReference], catalogTable: Option[CatalogTable], override val isStreaming: Boolean) - extends LeafNode with MultiInstanceRelation { + extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns { // Only care about relation when canonicalizing. override def doCanonicalize(): LogicalPlan = copy( @@ -67,6 +67,28 @@ case class LogicalRelation( s"Relation ${catalogTable.map(_.identifier.unquotedString).getOrElse("")}" + s"[${truncatedString(output, ",", maxFields)}] $relation" } + + override lazy val metadataOutput: Seq[AttributeReference] = relation match { + case _: HadoopFsRelation => + val resolve = conf.resolver + val outputNames = outputSet.map(_.name) + def isOutputColumn(col: AttributeReference): Boolean = { + outputNames.exists(name => resolve(col.name, name)) + } + // filter out the metadata struct column if it has the name conflicting with output columns. + // if the file has a column "_metadata", + // then the data column should be returned not the metadata struct column + Seq(FileFormat.createFileMetadataCol).filterNot(isOutputColumn) + case _ => Nil + } + + override def withMetadataColumns(): LogicalRelation = { + if (metadataOutput.nonEmpty) { + this.copy(output = output ++ metadataOutput) + } else { + this + } + } } object LogicalRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala index 4f331c7bf487..93bd1acc7377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaPruning.scala @@ -164,7 +164,12 @@ object SchemaPruning extends Rule[LogicalPlan] { outputRelation: LogicalRelation, prunedBaseRelation: HadoopFsRelation) = { val prunedOutput = getPrunedOutput(outputRelation.output, prunedBaseRelation.schema) - outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput) + // also add the metadata output if any + // TODO: should be able to prune the metadata schema + val metaOutput = outputRelation.output.collect { + case MetadataAttribute(attr) => attr + } + outputRelation.copy(relation = prunedBaseRelation, output = prunedOutput ++ metaOutput) } // Prune the given output to make it consistent with `requiredSchema`. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala new file mode 100644 index 000000000000..fffac885da5f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.File +import java.sql.Timestamp +import java.text.SimpleDateFormat + +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} + +class FileMetadataStructSuite extends QueryTest with SharedSparkSession { + + val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom"))) + + val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb"))) + + val schema: StructType = new StructType() + .add(StructField("name", StringType)) + .add(StructField("age", IntegerType)) + .add(StructField("info", new StructType() + .add(StructField("id", LongType)) + .add(StructField("university", StringType)))) + + val schemaWithNameConflicts: StructType = new StructType() + .add(StructField("name", StringType)) + .add(StructField("age", IntegerType)) + .add(StructField("_METADATA", new StructType() + .add(StructField("id", LongType)) + .add(StructField("university", StringType)))) + + private val METADATA_FILE_PATH = "_metadata.file_path" + + private val METADATA_FILE_NAME = "_metadata.file_name" + + private val METADATA_FILE_SIZE = "_metadata.file_size" + + private val METADATA_FILE_MODIFICATION_TIME = "_metadata.file_modification_time" + + /** + * This test wrapper will test for both row-based and column-based file formats: + * (json and parquet) with nested schema: + * 1. create df0 and df1 and save them as testFileFormat under /data/f0 and /data/f1 + * 2. read the path /data, return the df for further testing + * 3. create actual metadata maps for both files under /data/f0 and /data/f1 for further testing + * + * The final df will have data: + * jack | 24 | {12345, uom} + * lily | 31 | {54321, ucb} + * + * The schema of the df will be the `fileSchema` provided to this method + * + * This test wrapper will provide a `df` and actual metadata map `f0`, `f1` + */ + private def metadataColumnsTest( + testName: String, fileSchema: StructType) + (f: (DataFrame, Map[String, Any], Map[String, Any]) => Unit): Unit = { + Seq("json", "parquet").foreach { testFileFormat => + test(s"metadata struct ($testFileFormat): " + testName) { + withTempDir { dir => + import scala.collection.JavaConverters._ + + // 1. create df0 and df1 and save under /data/f0 and /data/f1 + val df0 = spark.createDataFrame(data0.asJava, fileSchema) + val f0 = new File(dir, "data/f0").getCanonicalPath + df0.coalesce(1).write.format(testFileFormat).save(f0) + + val df1 = spark.createDataFrame(data1.asJava, fileSchema) + val f1 = new File(dir, "data/f1").getCanonicalPath + df1.coalesce(1).write.format(testFileFormat).save(f1) + + // 2. read both f0 and f1 + val df = spark.read.format(testFileFormat).schema(fileSchema) + .load(new File(dir, "data").getCanonicalPath + "/*") + + val realF0 = new File(dir, "data/f0").listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")).head + + val realF1 = new File(dir, "data/f1").listFiles() + .filter(_.getName.endsWith(s".$testFileFormat")).head + + // 3. create f0 and f1 metadata data + val f0Metadata = Map( + METADATA_FILE_PATH -> realF0.toURI.toString, + METADATA_FILE_NAME -> realF0.getName, + METADATA_FILE_SIZE -> realF0.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF0.lastModified()) + ) + val f1Metadata = Map( + METADATA_FILE_PATH -> realF1.toURI.toString, + METADATA_FILE_NAME -> realF1.getName, + METADATA_FILE_SIZE -> realF1.length(), + METADATA_FILE_MODIFICATION_TIME -> new Timestamp(realF1.lastModified()) + ) + + f(df, f0Metadata, f1Metadata) + } + } + } + } + + metadataColumnsTest("read partial/all metadata struct fields", schema) { (df, f0, f1) => + // read all available metadata struct fields + checkAnswer( + df.select("name", "age", "info", + METADATA_FILE_NAME, METADATA_FILE_PATH, + METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME), + Seq( + Row("jack", 24, Row(12345L, "uom"), + f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)), + Row("lily", 31, Row(54321L, "ucb"), + f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + // read a part of metadata struct fields + checkAnswer( + df.select("name", "info.university", METADATA_FILE_NAME, METADATA_FILE_SIZE), + Seq( + Row("jack", "uom", f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)), + Row("lily", "ucb", f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE)) + ) + ) + } + + metadataColumnsTest("read metadata struct fields with random ordering", schema) { (df, f0, f1) => + // read a part of metadata struct fields with random ordering + checkAnswer( + df.select(METADATA_FILE_NAME, "name", METADATA_FILE_SIZE, "info.university"), + Seq( + Row(f0(METADATA_FILE_NAME), "jack", f0(METADATA_FILE_SIZE), "uom"), + Row(f1(METADATA_FILE_NAME), "lily", f1(METADATA_FILE_SIZE), "ucb") + ) + ) + } + + metadataColumnsTest("read metadata struct fields with expressions", schema) { (df, f0, f1) => + checkAnswer( + df.select( + // substring of file name + substring(col(METADATA_FILE_NAME), 1, 3), + // format timestamp + date_format(col(METADATA_FILE_MODIFICATION_TIME), "yyyy-MM") + .as("_file_modification_year_month"), + // convert to kb + col(METADATA_FILE_SIZE).divide(lit(1024)).as("_file_size_kb"), + // get the file format + substring_index(col(METADATA_FILE_PATH), ".", -1).as("_file_format") + ), + Seq( + Row( + f0(METADATA_FILE_NAME).toString.substring(0, 3), // sql substring vs scala substring + new SimpleDateFormat("yyyy-MM").format(f0(METADATA_FILE_MODIFICATION_TIME)), + f0(METADATA_FILE_SIZE).asInstanceOf[Long] / 1024.toDouble, + f0(METADATA_FILE_PATH).toString.split("\\.").takeRight(1).head + ), + Row( + f1(METADATA_FILE_NAME).toString.substring(0, 3), // sql substring vs scala substring + new SimpleDateFormat("yyyy-MM").format(f1(METADATA_FILE_MODIFICATION_TIME)), + f1(METADATA_FILE_SIZE).asInstanceOf[Long] / 1024.toDouble, + f1(METADATA_FILE_PATH).toString.split("\\.").takeRight(1).head + ) + ) + ) + } + + metadataColumnsTest("select all will not select metadata struct fields", schema) { (df, _, _) => + checkAnswer( + df.select("*"), + Seq( + Row("jack", 24, Row(12345L, "uom")), + Row("lily", 31, Row(54321L, "ucb")) + ) + ) + } + + metadataColumnsTest("metadata will not overwrite user data", + schemaWithNameConflicts) { (df, _, _) => + // the user data has the schema: name, age, _metadata.id, _metadata.university + + // select user data + checkAnswer( + df.select("name", "age", "_METADATA", "_metadata"), + Seq( + Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom")), + Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb")) + ) + ) + + // select metadata will fail when analysis + val ex = intercept[AnalysisException] { + df.select("name", METADATA_FILE_NAME).collect() + } + assert(ex.getMessage.contains("No such struct field file_name in id, university")) + } + + metadataColumnsTest("select only metadata", schema) { (df, f0, f1) => + checkAnswer( + df.select(METADATA_FILE_NAME, METADATA_FILE_PATH, + METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME), + Seq( + Row(f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)), + Row(f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + checkAnswer( + df.select("_metadata"), + Seq( + Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), + Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + ) + ) + } + + metadataColumnsTest("select and re-select", schema) { (df, f0, f1) => + checkAnswer( + df.select("name", "age", "info", + METADATA_FILE_NAME, METADATA_FILE_PATH, + METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME) + .select("name", "file_path"), // cast _metadata.file_path as file_path + Seq( + Row("jack", f0(METADATA_FILE_PATH)), + Row("lily", f1(METADATA_FILE_PATH)) + ) + ) + } + + metadataColumnsTest("alias", schema) { (df, f0, f1) => + + val aliasDF = df.select( + Column("name").as("myName"), + Column("age").as("myAge"), + Column(METADATA_FILE_NAME).as("myFileName"), + Column(METADATA_FILE_SIZE).as("myFileSize") + ) + + // check schema + val expectedSchema = new StructType() + .add(StructField("myName", StringType)) + .add(StructField("myAge", IntegerType)) + .add(StructField("myFileName", StringType)) + .add(StructField("myFileSize", LongType)) + + assert(aliasDF.schema.fields.toSet == expectedSchema.fields.toSet) + + // check data + checkAnswer( + aliasDF, + Seq( + Row("jack", 24, f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)), + Row("lily", 31, f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE)) + ) + ) + } + + metadataColumnsTest("filter", schema) { (df, f0, _) => + checkAnswer( + df.select("name", "age", METADATA_FILE_NAME) + .where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)), + Seq( + // _file_name == f0's name, so we will only have 1 row + Row("jack", 24, f0(METADATA_FILE_NAME)) + ) + ) + } + + Seq(true, false).foreach { caseSensitive => + metadataColumnsTest(s"upper/lower case when case " + + s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + // file schema: name, age, _METADATA.id, _METADATA.university + + if (caseSensitive) { + // for case sensitive mode: + // _METADATA is user data + // _metadata is metadata + checkAnswer( + df.select("name", "age", "_METADATA", "_metadata"), + Seq( + Row("jack", 24, Row(12345L, "uom"), + Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))), + Row("lily", 31, Row(54321L, "ucb"), + Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME))) + ) + ) + } else { + // for case insensitive mode: + // _METADATA and _metadata are both user data + + // select user data + checkAnswer( + df.select("name", "age", + // user columns + "_METADATA", "_metadata", + "_metadata.ID", "_METADATA.UniVerSity"), + Seq( + Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), 12345L, "uom"), + Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), 54321L, "ucb") + ) + ) + + // select metadata will fail when analysis - metadata cannot overwrite user data + val ex = intercept[AnalysisException] { + df.select("name", "_metadata.file_name").collect() + } + assert(ex.getMessage.contains("No such struct field file_name in id, university")) + + val ex1 = intercept[AnalysisException] { + df.select("name", "_METADATA.file_NAME").collect() + } + assert(ex1.getMessage.contains("No such struct field file_NAME in id, university")) + } + } + } + } + + Seq("true", "false").foreach { offHeapColumnVectorEnabled => + withSQLConf("spark.sql.columnVector.offheap.enabled" -> offHeapColumnVectorEnabled) { + metadataColumnsTest(s"read metadata with " + + s"offheap set to $offHeapColumnVectorEnabled", schema) { (df, f0, f1) => + // read all available metadata struct fields + checkAnswer( + df.select("name", "age", "info", + METADATA_FILE_NAME, METADATA_FILE_PATH, + METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME), + Seq( + Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME), f0(METADATA_FILE_PATH), + f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME)), + Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH), + f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)) + ) + ) + + // read a part of metadata struct fields + checkAnswer( + df.select("name", "info.university", METADATA_FILE_NAME, METADATA_FILE_SIZE), + Seq( + Row("jack", "uom", f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)), + Row("lily", "ucb", f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE)) + ) + ) + } + } + } + + Seq("true", "false").foreach { enabled => + withSQLConf("spark.sql.optimizer.nestedSchemaPruning.enabled" -> enabled) { + metadataColumnsTest(s"read metadata with" + + s"nestedSchemaPruning set to $enabled", schema) { (df, f0, f1) => + // read a part of data: schema pruning + checkAnswer( + df.select("name", "info.university", METADATA_FILE_NAME, METADATA_FILE_SIZE), + Seq( + Row("jack", "uom", f0(METADATA_FILE_NAME), f0(METADATA_FILE_SIZE)), + Row("lily", "ucb", f1(METADATA_FILE_NAME), f1(METADATA_FILE_SIZE)) + ) + ) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4bfb7cf59d08..634016664dfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -292,7 +292,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre val fakeRDD = new FileScanRDD( spark, (file: PartitionedFile) => Iterator.empty, - Seq(partition) + Seq(partition), + StructType(Seq.empty) ) assertResult(Set("host0", "host1", "host2")) {