Skip to content

Commit 5849dd0

Browse files
committed
Fixes doc typos. Fixes partition discovery refresh.
1 parent 51be443 commit 5849dd0

File tree

4 files changed

+42
-58
lines changed

4 files changed

+42
-58
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,16 @@ private[sql] object ResolvedDataSource {
224224
case dataSource: SchemaRelationProvider =>
225225
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
226226
case dataSource: FSBasedRelationProvider =>
227+
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
228+
None
229+
} else {
230+
Some(partitionColumnsSchema(schema, partitionColumns))
231+
}
232+
227233
dataSource.createRelation(
228234
sqlContext,
229235
Some(schema),
230-
Some(partitionColumnsSchema(schema, partitionColumns)),
236+
maybePartitionsSchema,
231237
new CaseInsensitiveMap(options))
232238
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
233239
throw new AnalysisException(s"$className does not allow user-specified schemas.")

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.sql.sources
2020
import org.apache.hadoop.conf.Configuration
2121
import org.apache.hadoop.fs.{FileStatus, Path}
2222
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
23-
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2423

2524
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2625
import org.apache.spark.deploy.SparkHadoopUtil
@@ -294,7 +293,7 @@ abstract class OutputWriter {
294293
* filter using selected predicates before producing an RDD containing all matching tuples as
295294
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
296295
* systems, it's able to discover partitioning information from the paths of input directories, and
297-
* perform partition pruning before start reading the data.Subclasses of [[FSBasedRelation()]] must
296+
* perform partition pruning before start reading the data. Subclasses of [[FSBasedRelation()]] must
298297
* override one of the three `buildScan` methods to implement the read path.
299298
*
300299
* For the write path, it provides the ability to write to both non-partitioned and partitioned
@@ -329,39 +328,45 @@ abstract class FSBasedRelation private[sql](
329328
/**
330329
* Constructs an [[FSBasedRelation]].
331330
*
332-
* @param paths Base paths of this relation. For partitioned relations, it should be either root
331+
* @param paths Base paths of this relation. For partitioned relations, it should be root
333332
* directories of all partition directories.
334333
*/
335334
def this(paths: Array[String]) = this(paths, None)
336335

337336
private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
338337

339-
private var _partitionSpec: PartitionSpec = _
340-
refreshPartitions()
338+
private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
339+
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
340+
}.getOrElse {
341+
discoverPartitions()
342+
}
341343

342344
private[sql] def partitionSpec: PartitionSpec = _partitionSpec
343345

346+
/**
347+
* Partition columns. Note that they are always nullable.
348+
*/
349+
def partitionColumns: StructType = partitionSpec.partitionColumns
350+
344351
private[sql] def refresh(): Unit = {
345-
refreshPartitions()
352+
_partitionSpec = discoverPartitions()
346353
}
347354

348-
private def refreshPartitions(): Unit = {
349-
_partitionSpec = maybePartitionSpec.getOrElse {
350-
val basePaths = paths.map(new Path(_))
351-
val leafDirs = basePaths.flatMap { path =>
352-
val fs = path.getFileSystem(hadoopConf)
353-
if (fs.exists(path)) {
354-
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
355-
} else {
356-
Seq.empty[FileStatus]
357-
}
358-
}.map(_.getPath)
359-
360-
if (leafDirs.nonEmpty) {
361-
PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
355+
private def discoverPartitions(): PartitionSpec = {
356+
val basePaths = paths.map(new Path(_))
357+
val leafDirs = basePaths.flatMap { path =>
358+
val fs = path.getFileSystem(hadoopConf)
359+
if (fs.exists(path)) {
360+
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
362361
} else {
363-
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
362+
Seq.empty[FileStatus]
364363
}
364+
}.map(_.getPath)
365+
366+
if (leafDirs.nonEmpty) {
367+
PartitioningUtils.parsePartitions(leafDirs, "__HIVE_DEFAULT_PARTITION__")
368+
} else {
369+
PartitionSpec(StructType(Array.empty[StructField]), Array.empty[Partition])
365370
}
366371
}
367372

sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -269,34 +269,6 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
269269
}
270270
}
271271

272-
ignore("save()/load() - partitioned table - Append - mismatched partition columns") {
273-
withTempPath { file =>
274-
partitionedTestDF1.save(
275-
source = classOf[SimpleTextSource].getCanonicalName,
276-
mode = SaveMode.Overwrite,
277-
options = Map("path" -> file.getCanonicalPath),
278-
partitionColumns = Seq("p1", "p2"))
279-
280-
// Using only a subset of all partition columns
281-
intercept[IllegalArgumentException] {
282-
partitionedTestDF2.save(
283-
source = classOf[SimpleTextSource].getCanonicalName,
284-
mode = SaveMode.Append,
285-
options = Map("path" -> file.getCanonicalPath),
286-
partitionColumns = Seq("p1"))
287-
}
288-
289-
// Using different order of partition columns
290-
intercept[IllegalArgumentException] {
291-
partitionedTestDF2.save(
292-
source = classOf[SimpleTextSource].getCanonicalName,
293-
mode = SaveMode.Append,
294-
options = Map("path" -> file.getCanonicalPath),
295-
partitionColumns = Seq("p2", "p1"))
296-
}
297-
}
298-
}
299-
300272
test("save()/load() - partitioned table - ErrorIfExists") {
301273
withTempDir { file =>
302274
intercept[RuntimeException] {
@@ -452,7 +424,7 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
452424
}
453425
}
454426

455-
ignore("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
427+
test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") {
456428
partitionedTestDF1.saveAsTable(
457429
tableName = "t",
458430
source = classOf[SimpleTextSource].getCanonicalName,
@@ -461,21 +433,21 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
461433
partitionColumns = Seq("p1", "p2"))
462434

463435
// Using only a subset of all partition columns
464-
intercept[IllegalArgumentException] {
436+
intercept[Throwable] {
465437
partitionedTestDF2.saveAsTable(
466438
tableName = "t",
467439
source = classOf[SimpleTextSource].getCanonicalName,
468-
mode = SaveMode.Overwrite,
440+
mode = SaveMode.Append,
469441
options = Map("dataSchema" -> dataSchema.json),
470442
partitionColumns = Seq("p1"))
471443
}
472444

473445
// Using different order of partition columns
474-
intercept[IllegalArgumentException] {
446+
intercept[Throwable] {
475447
partitionedTestDF2.saveAsTable(
476448
tableName = "t",
477449
source = classOf[SimpleTextSource].getCanonicalName,
478-
mode = SaveMode.Overwrite,
450+
mode = SaveMode.Append,
479451
options = Map("dataSchema" -> dataSchema.json),
480452
partitionColumns = Seq("p2", "p1"))
481453
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ class SimpleTextOutputWriter extends OutputWriter {
9292
class SimpleTextRelation(
9393
paths: Array[String],
9494
val maybeDataSchema: Option[StructType],
95-
val partitionColumns: StructType,
95+
partitionsSchema: StructType,
9696
parameters: Map[String, String])(
9797
@transient val sqlContext: SQLContext)
98-
extends FSBasedRelation(paths, partitionColumns) {
98+
extends FSBasedRelation(paths, partitionsSchema) {
9999

100100
import sqlContext.sparkContext
101101

@@ -106,7 +106,8 @@ class SimpleTextRelation(
106106
case that: SimpleTextRelation =>
107107
this.paths.sameElements(that.paths) &&
108108
this.maybeDataSchema == that.maybeDataSchema &&
109-
this.dataSchema == that.dataSchema
109+
this.dataSchema == that.dataSchema &&
110+
this.partitionColumns == that.partitionColumns
110111

111112
case _ => false
112113
}

0 commit comments

Comments
 (0)