Skip to content

Commit c4ed4fe

Browse files
committed
Bug fixes and a new test suite
1 parent a29e663 commit c4ed4fe

File tree

4 files changed

+591
-485
lines changed

4 files changed

+591
-485
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -142,25 +142,31 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
142142

143143
// The table scan operator (PhysicalRDD) which retrieves required columns from data files.
144144
// Notice that the schema of data files, represented by `relation.dataSchema`, may contain
145-
// some partition column(s). Those partition columns that are only encoded in partition
146-
// directory paths are not covered by this table scan operator.
145+
// some partition column(s).
147146
val scan =
148147
pruneFilterProject(
149148
logicalRelation,
150149
projections,
151150
filters,
152151
(requiredColumns, filters) => {
153-
// Don't require any partition columns to save I/O. Note that here we are being
154-
// optimistic and assuming partition columns data stored in data files are always
155-
// consistent with those encoded in partition directory paths.
156-
relation.buildScan(
157-
requiredColumns.filterNot(partitionColumns.fieldNames.contains),
158-
filters,
159-
dataFilePaths)
152+
val partitionColNames = partitionColumns.fieldNames
153+
154+
// Don't scan any partition columns to save I/O. Here we are being optimistic and
155+
// assuming partition columns data stored in data files are always consistent with those
156+
// partition values encoded in partition directory paths.
157+
val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains)
158+
val dataRows = relation.buildScan(nonPartitionColumns, filters, dataFilePaths)
159+
160+
// Merges data values with partition values.
161+
mergeWithPartitionValues(
162+
relation.schema,
163+
requiredColumns,
164+
partitionColNames,
165+
partitionValues,
166+
dataRows)
160167
})
161168

162-
// Merges in those partition values that are not contained in data rows.
163-
mergePartitionValues(output, partitionValues, scan)
169+
scan.execute()
164170
}
165171

166172
val unionedRows = perPartitionRows.reduceOption(_ ++ _).getOrElse {
@@ -170,41 +176,48 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
170176
createPhysicalRDD(logicalRelation.relation, output, unionedRows)
171177
}
172178

173-
private def mergePartitionValues(
174-
output: Seq[Attribute],
179+
private def mergeWithPartitionValues(
180+
schema: StructType,
181+
requiredColumns: Array[String],
182+
partitionColumns: Array[String],
175183
partitionValues: Row,
176-
scan: SparkPlan): RDD[Row] = {
177-
val mergeWithPartitionValues = {
178-
val outputColNames = output.map(_.name)
179-
val outputDataColNames = scan.schema.fieldNames
180-
181-
outputColNames.zipWithIndex.map { case (name, index) =>
182-
val i = outputDataColNames.indexOf(name)
183-
if (i > -1) {
184-
// Column appears in data files, retrieve it from data rows
184+
dataRows: RDD[Row]): RDD[Row] = {
185+
val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)
186+
187+
// If output columns contain any partition column(s), we need to merge scanned data
188+
// columns and requested partition columns to form the final result.
189+
if (!requiredColumns.sameElements(nonPartitionColumns)) {
190+
val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
191+
// To see whether the `index`-th column is a partition column...
192+
val i = partitionColumns.indexOf(name)
193+
if (i != -1) {
194+
// If yes, gets column value from partition values.
185195
(mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => {
186-
mutableRow(ordinal) = dataRow(i)
196+
mutableRow(ordinal) = partitionValues(i)
187197
}
188198
} else {
189-
// Column doesn't appear in data file (must be a partition column), retrieve it from
190-
// partition values of this partition.
199+
// Otherwise, inherits the value from scanned data.
200+
val i = nonPartitionColumns.indexOf(name)
191201
(mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => {
192-
mutableRow(ordinal) = partitionValues(i)
202+
mutableRow(ordinal) = dataRow(i)
193203
}
194204
}
195205
}
196-
}
197206

198-
scan.execute().mapPartitions { iterator =>
199-
val mutableRow = new SpecificMutableRow(output.map(_.dataType))
200-
iterator.map { row =>
201-
var i = 0
202-
while (i < mutableRow.length) {
203-
mergeWithPartitionValues(i)(mutableRow, row, i)
204-
i += 1
207+
dataRows.mapPartitions { iterator =>
208+
val dataTypes = requiredColumns.map(schema(_).dataType)
209+
val mutableRow = new SpecificMutableRow(dataTypes)
210+
iterator.map { dataRow =>
211+
var i = 0
212+
while (i < mutableRow.length) {
213+
mergers(i)(mutableRow, dataRow, i)
214+
i += 1
215+
}
216+
mutableRow.asInstanceOf[expressions.Row]
205217
}
206-
mutableRow.asInstanceOf[expressions.Row]
207218
}
219+
} else {
220+
dataRows
208221
}
209222
}
210223

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,10 @@ abstract class OutputWriter {
260260
/**
261261
* Initializes this [[OutputWriter]] before any rows are persisted.
262262
*
263-
* @param path The file path to which this [[OutputWriter]] is supposed to write.
263+
* @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
264+
* this may not point to the final output file. For example, `FileOutputFormat` writes to
265+
* temporary directories and then merge written files back to the final destination. In
266+
* this case, `path` points to a temporary output file under the temporary directory.
264267
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
265268
* schema if the corresponding relation is partitioned.
266269
* @param context The Hadoop MapReduce task context.
@@ -310,6 +313,19 @@ abstract class FSBasedRelation private[sql](
310313
maybePartitionSpec: Option[PartitionSpec])
311314
extends BaseRelation {
312315

316+
/**
317+
* Constructs an [[FSBasedRelation]].
318+
*
319+
* @param paths Base paths of this relation. For partitioned relations, it should be either root
320+
* directories of all partition directories.
321+
* @param partitionColumns Partition columns of this relation.
322+
*/
323+
def this(paths: Array[String], partitionColumns: StructType) =
324+
this(paths, {
325+
if (partitionColumns.isEmpty) None
326+
else Some(PartitionSpec(partitionColumns, Array.empty[Partition]))
327+
})
328+
313329
/**
314330
* Constructs an [[FSBasedRelation]].
315331
*

0 commit comments

Comments
 (0)