Skip to content

Commit 770b5ba

Browse files
committed
Adds tests for FSBasedRelation
1 parent 3ba9bbf commit 770b5ba

File tree

2 files changed

+458
-18
lines changed

2 files changed

+458
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.sources
1919

20-
import org.apache.hadoop.conf.Configuration
21-
2220
import org.apache.spark.annotation.{DeveloperApi, Experimental}
2321
import org.apache.spark.rdd.RDD
2422
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
@@ -251,17 +249,20 @@ trait CatalystScan {
251249
* opened. This instance is used to persist rows to this single output file.
252250
*/
253251
@Experimental
254-
trait OutputWriter {
252+
abstract class OutputWriter {
253+
def init(): Unit = ()
254+
255255
/**
256-
* Persists a single row. Invoked on the executor side.
256+
* Persists a single row. Invoked on the executor side. When writing to dynamically partitioned
257+
* tables, dynamic partition columns are not included in rows to be written.
257258
*/
258259
def write(row: Row): Unit
259260

260261
/**
261262
* Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before
262263
* the task output is committed.
263264
*/
264-
def close(): Unit
265+
def close(): Unit = ()
265266
}
266267

267268
/**
@@ -270,17 +271,54 @@ trait OutputWriter {
270271
*
271272
* For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and
272273
* filter using selected predicates before producing an RDD containing all matching tuples as
273-
* [[Row]] objects.
274-
*
275-
* In addition, when reading from Hive style partitioned tables stored in file systems, it's able to
276-
* discover partitioning information from the paths of input directories, and perform partition
277-
* pruning before start reading the data.
274+
* [[Row]] objects. In addition, when reading from Hive style partitioned tables stored in file
275+
* systems, it's able to discover partitioning information from the paths of input directories, and
276+
* perform partition pruning before start reading the data.Subclasses of [[FSBasedRelation()]] must
277+
* override one of the three `buildScan` methods to implement the read path.
278278
*
279279
* For the write path, it provides the ability to write to both non-partitioned and partitioned
280280
* tables. Directory layout of the partitioned tables is compatible with Hive.
281281
*/
282282
@Experimental
283-
trait FSBasedRelation extends BaseRelation {
283+
abstract class FSBasedRelation extends BaseRelation {
284+
// Discovers partitioned columns, and merge them with `dataSchema`. All partition columns not
285+
// existed in `dataSchema` should be appended to `dataSchema`.
286+
override val schema: StructType = ???
287+
288+
/**
289+
* Base path of this relation. For partitioned relations, `path` should be the root directory of
290+
* all partition directories.
291+
*/
292+
def path: String
293+
294+
/**
295+
* Specifies schema of actual data files. For partitioned relations, if one or more partitioned
296+
* columns are contained in the data files, they should also appear in `dataSchema`.
297+
*/
298+
def dataSchema: StructType
299+
300+
/**
301+
* Builds an `RDD[Row` containing all rows within this relation.
302+
*
303+
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
304+
* files within required partition directories are included.
305+
*/
306+
def buildScan(inputPaths: Array[String]): RDD[Row] = {
307+
throw new RuntimeException(
308+
"At least one buildScan() method should be overridden to read the relation.")
309+
}
310+
311+
/**
312+
* Builds an `RDD[Row` containing all rows within this relation.
313+
*
314+
* @param requiredColumns Required columns.
315+
* @param inputPaths Data files to be read. If the underlying relation is partitioned, only data
316+
* files within required partition directories are included.
317+
*/
318+
def buildScan(requiredColumns: Array[String], inputPaths: Array[String]): RDD[Row] = {
319+
buildScan(inputPaths)
320+
}
321+
284322
/**
285323
* Builds an `RDD[Row]` containing all rows within this relation.
286324
*
@@ -295,13 +333,9 @@ trait FSBasedRelation extends BaseRelation {
295333
def buildScan(
296334
requiredColumns: Array[String],
297335
filters: Array[Filter],
298-
inputPaths: Array[String]): RDD[Row]
299-
300-
/**
301-
* When writing rows to this relation, this method is invoked on the driver side before the actual
302-
* write job is issued. It provides an opportunity to configure the write job to be performed.
303-
*/
304-
def prepareForWrite(conf: Configuration): Unit
336+
inputPaths: Array[String]): RDD[Row] = {
337+
buildScan(requiredColumns, inputPaths)
338+
}
305339

306340
/**
307341
* This method is responsible for producing a new [[OutputWriter]] for each newly opened output

0 commit comments

Comments
 (0)