Skip to content

Commit c71ac6c

Browse files
committed
Addresses comments from @marmbrus
1 parent 7552168 commit c71ac6c

File tree

5 files changed

+23
-24
lines changed

5 files changed

+23
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
238238
private[spark] def defaultDataSourceName: String =
239239
getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
240240

241+
private[spark] def partitionDiscoveryEnabled() =
242+
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
243+
241244
// Do not use a value larger than 4000 as the default value of this property.
242245
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
243246
private[spark] def schemaStringLengthThreshold: Int =

sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -364,13 +364,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
364364
translate(child).map(sources.Not)
365365

366366
case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
367-
Some(sources.StringStartsWith(a.name, v.toString()))
367+
Some(sources.StringStartsWith(a.name, v.toString))
368368

369369
case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
370-
Some(sources.StringEndsWith(a.name, v.toString()))
370+
Some(sources.StringEndsWith(a.name, v.toString))
371371

372372
case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) =>
373-
Some(sources.StringContains(a.name, v.toString()))
373+
Some(sources.StringContains(a.name, v.toString))
374374

375375
case _ => None
376376
}

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717

1818
package org.apache.spark.sql.sources
1919

20+
import scala.util.Try
21+
2022
import org.apache.hadoop.conf.Configuration
2123
import org.apache.hadoop.fs.{FileStatus, Path}
2224
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2325

2426
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2527
import org.apache.spark.deploy.SparkHadoopUtil
2628
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.sql._
2730
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2831
import org.apache.spark.sql.types.{StructField, StructType}
29-
import org.apache.spark.sql._
3032

3133
/**
3234
* ::DeveloperApi::
@@ -87,7 +89,7 @@ trait SchemaRelationProvider {
8789
* ::DeveloperApi::
8890
* Implemented by objects that produce relations for a specific kind of data source
8991
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
90-
* USING clause specified (to specify the implemented SchemaRelationProvider), a user defined
92+
* USING clause specified (to specify the implemented [[FSBasedRelationProvider]]), a user defined
9193
* schema, and an optional list of partition columns, this interface is used to pass in the
9294
* parameters specified by a user.
9395
*
@@ -114,7 +116,7 @@ trait FSBasedRelationProvider {
114116
sqlContext: SQLContext,
115117
schema: Option[StructType],
116118
partitionColumns: Option[StructType],
117-
parameters: Map[String, String]): BaseRelation
119+
parameters: Map[String, String]): FSBasedRelation
118120
}
119121

120122
@DeveloperApi
@@ -282,12 +284,13 @@ abstract class OutputWriter {
282284
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
283285
* the task output is committed.
284286
*/
285-
def close(): Unit = ()
287+
def close(): Unit
286288
}
287289

288290
/**
289291
* ::Experimental::
290-
* A [[BaseRelation]] that abstracts file system based data sources.
292+
* A [[BaseRelation]] that provides much of the common code required for formats that store their
293+
* data to an HDFS compatible filesystem.
291294
*
292295
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
293296
* filter using selected predicates before producing an RDD containing all matching tuples as
@@ -338,16 +341,13 @@ abstract class FSBasedRelation private[sql](
338341
private var _partitionSpec: PartitionSpec = maybePartitionSpec.map { spec =>
339342
spec.copy(partitionColumns = spec.partitionColumns.asNullable)
340343
}.getOrElse {
341-
if (partitionDiscoverEnabled()) {
344+
if (sqlContext.conf.partitionDiscoveryEnabled()) {
342345
discoverPartitions()
343346
} else {
344347
PartitionSpec(StructType(Nil), Array.empty[Partition])
345348
}
346349
}
347350

348-
private def partitionDiscoverEnabled() =
349-
sqlContext.conf.getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
350-
351351
private[sql] def partitionSpec: PartitionSpec = _partitionSpec
352352

353353
/**
@@ -356,7 +356,7 @@ abstract class FSBasedRelation private[sql](
356356
def partitionColumns: StructType = partitionSpec.partitionColumns
357357

358358
private[sql] def refresh(): Unit = {
359-
if (partitionDiscoverEnabled()) {
359+
if (sqlContext.conf.partitionDiscoveryEnabled()) {
360360
_partitionSpec = discoverPartitions()
361361
}
362362
}
@@ -365,11 +365,10 @@ abstract class FSBasedRelation private[sql](
365365
val basePaths = paths.map(new Path(_))
366366
val leafDirs = basePaths.flatMap { path =>
367367
val fs = path.getFileSystem(hadoopConf)
368-
if (fs.exists(path)) {
369-
SparkHadoopUtil.get.listLeafDirStatuses(fs, fs.makeQualified(path))
370-
} else {
371-
Seq.empty[FileStatus]
372-
}
368+
Try(fs.getFileStatus(path.makeQualified(fs.getUri, fs.getWorkingDirectory)))
369+
.filter(_.isDir)
370+
.map(SparkHadoopUtil.get.listLeafDirStatuses(fs, _))
371+
.getOrElse(Seq.empty[FileStatus])
373372
}.map(_.getPath)
374373

375374
if (leafDirs.nonEmpty) {

sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
101101
}
102102
}
103103

104-
case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) =>
105-
// OK
106-
107-
case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) =>
108-
// OK
109-
104+
case logical.InsertIntoTable(LogicalRelation(_: InsertableRelation), _, _, _, _) => // OK
105+
case logical.InsertIntoTable(LogicalRelation(_: FSBasedRelation), _, _, _, _) => // OK
110106
case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
111107
// The relation in l is not an InsertableRelation.
112108
failAnalysis(s"$l does not allow insertion.")

sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.hive.test.TestHive
2525
import org.apache.spark.sql.parquet.ParquetTest
2626
import org.apache.spark.sql.types._
2727

28+
// TODO Don't extend ParquetTest
2829
// This test suite extends ParquetTest for some convenient utility methods. These methods should be
2930
// moved to some more general places, maybe QueryTest.
3031
class FSBasedRelationSuite extends QueryTest with ParquetTest {

0 commit comments

Comments
 (0)