Skip to content
Closed
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
112ce2d
Checks column names are compatible to provided schema
MaxGekk Mar 20, 2018
a85ccce
Checking header is matched to schema in per-line mode
MaxGekk Mar 20, 2018
75e1534
Extract header and check that it is matched to schema
MaxGekk Mar 20, 2018
8eb45b8
Checking column names in header in multiLine mode
MaxGekk Mar 21, 2018
9b1a986
Adding the checkHeader option with true by default
MaxGekk Mar 21, 2018
6442633
Fix csv test by changing headers or disabling header checking
MaxGekk Mar 21, 2018
9440d8a
Adding comment for the checkHeader option
MaxGekk Mar 21, 2018
9f91ce7
Added comments
MaxGekk Mar 21, 2018
0878f7a
Adding a space between column names
MaxGekk Mar 21, 2018
a341dd7
Fix a test: checking name duplication in schemas
MaxGekk Mar 21, 2018
98c27ea
Fixing the test and adding ticket number to test's title
MaxGekk Mar 23, 2018
811df6f
Refactoring - removing unneeded parameter
MaxGekk Mar 23, 2018
691cfbc
Output filename in the exception
MaxGekk Mar 25, 2018
efb0105
PySpark: adding a test and checkHeader parameter
MaxGekk Mar 25, 2018
c9f5e14
Removing unneeded parameter - fileName
MaxGekk Mar 25, 2018
e195838
Fix for pycodestyle checks
MaxGekk Mar 25, 2018
d6d370d
Adding description of the checkHeader option
MaxGekk Mar 25, 2018
acd6d2e
Improving error messages and handling the case when header size is no…
MaxGekk Mar 26, 2018
13892fd
Refactoring: check header by calling an uniVocity method
MaxGekk Mar 26, 2018
476b517
Refactoring: convert val to def
MaxGekk Mar 26, 2018
f8167e4
Parse header only if the checkHeader option is true
MaxGekk Mar 26, 2018
d068f6c
Moving header checks to CSVDataSource
MaxGekk Mar 26, 2018
08cfcf4
Making uniVocity wrapper unaware of header
MaxGekk Mar 26, 2018
f6a1694
Fix the test: error mesage was changed
MaxGekk Mar 27, 2018
adbedf3
Revert CSV tests as it was before the option was introduced
MaxGekk Mar 31, 2018
0904daf
Renaming checkHeader to enforceSchema
MaxGekk Mar 31, 2018
191b415
Pass required parameter
MaxGekk Apr 1, 2018
718f7ca
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk Apr 5, 2018
75c1ce6
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Apr 13, 2018
ab9c514
Addressing Xiao Li's review comments
MaxGekk Apr 13, 2018
0405863
Making header validation case sensitive
MaxGekk Apr 13, 2018
714c66d
Describing enforceSchema in PySpark's csv method
MaxGekk Apr 13, 2018
78d9f66
Respect to caseSensitive parameter
MaxGekk Apr 13, 2018
b43a7c7
Check header on csv parsing from dataset of strings
MaxGekk Apr 13, 2018
a5f2916
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk Apr 18, 2018
9b2d403
Make Scala style checker happy
MaxGekk Apr 18, 2018
1fffc16
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Apr 27, 2018
ad6cda4
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 1, 2018
4bdabe2
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk May 4, 2018
2bd2713
Merge branch 'master' into check-column-names
MaxGekk May 14, 2018
b4bfd1d
Merge branch 'check-column-names' of github.com:MaxGekk/spark-1 into …
MaxGekk May 14, 2018
21f8b10
Removing a space to make Scala style checker happy.
MaxGekk May 14, 2018
aca4db9
Merge branch 'master' of github.com:apache/spark into check-column-names
MaxGekk May 16, 2018
e3b4275
Addressing review comments
MaxGekk May 16, 2018
d704766
Removing unnecessary empty checks
MaxGekk May 17, 2018
04199e0
Addressing review comments
MaxGekk May 17, 2018
d5fde52
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 17, 2018
795a878
Addressing Hyukjin Kwon's review comments
MaxGekk May 17, 2018
05fc7cd
Improving description of the option
MaxGekk May 18, 2018
9606711
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 18, 2018
11c7591
Addressing Wenchen Fan's review comment
MaxGekk May 18, 2018
7dce1e7
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 22, 2018
c008328
Output warnings when enforceSchema is enabled and the schema is not c…
MaxGekk May 25, 2018
9f7c440
Added tests for inferSchema is true and enforceSchema is false
MaxGekk May 25, 2018
e83ad60
Rename dropFirstRecord to shouldDropHeader
MaxGekk May 25, 2018
26ae4f9
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk May 25, 2018
4b6495b
Merge remote-tracking branch 'origin/master' into check-column-names
MaxGekk Jun 1, 2018
c5ee207
Renaming of 'is not conform' to 'does not conform'
MaxGekk Jun 1, 2018
a2cbb7b
Fix Scala coding style
MaxGekk Jun 1, 2018
70e2b75
Added description of checkHeaderColumnNames's arguments
MaxGekk Jun 1, 2018
e7c3ace
Test checks a warning presents in logs
MaxGekk Jun 1, 2018
3b37712
fix python tests
MaxGekk Jun 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None):
samplingRatio=None, enforceSchema=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand All @@ -373,6 +373,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
default value, ``false``.
:param inferSchema: infers the input schema automatically from data. It requires one extra
pass over the data. If None is set, it uses the default value, ``false``.
:param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this option to streaming reader and writer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

forcibly applied to datasource files and headers in CSV files will be
ignored. If the option is set to ``false``, the schema will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ignored?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@gatorsmile gatorsmile May 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not silently ignore the option. When header is false and enforceSchema is true, we should issue an exception.

validated against headers in CSV files if the ``header`` option is set
to ``true``. The validation is performed in column ordering aware
manner by taking into account ``spark.sql.caseSensitive``.
If None is set, ``true`` is used by default.
:param ignoreLeadingWhiteSpace: A flag indicating whether or not leading whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
Expand Down Expand Up @@ -449,7 +456,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio)
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down
12 changes: 10 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None):
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -592,6 +593,13 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
default value, ``false``.
:param inferSchema: infers the input schema automatically from data. It requires one extra
pass over the data. If None is set, it uses the default value, ``false``.
:param enforceSchema: If it is set to ``true``, the specified or inferred schema will be
forcibly applied to datasource files and headers in CSV files will be
ignored. If the option is set to ``false``, the schema will be
validated against headers in CSV files if the ``header`` option is set
to ``true``. The validation is performed in column ordering aware
manner by taking into account ``spark.sql.caseSensitive``.
If None is set, ``true`` is used by default.
:param ignoreLeadingWhiteSpace: a flag indicating whether or not leading whitespaces from
values being read should be skipped. If None is set, it
uses the default value, ``false``.
Expand Down Expand Up @@ -664,7 +672,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3040,6 +3040,24 @@ def test_csv_sampling_ratio(self):
.csv(rdd, samplingRatio=0.5).schema
self.assertEquals(schema, StructType([StructField("_c0", IntegerType(), True)]))

def test_checking_csv_header(self):
tmpPath = tempfile.mkdtemp()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try-except

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tmpPath -> tmp_path or path

shutil.rmtree(tmpPath)
try:
self.spark.createDataFrame([[1, 1000], [2000, 2]])\
.toDF('f1', 'f2').write.option("header", "true").csv(tmpPath)
schema = StructType([
StructField('f2', IntegerType(), nullable=True),
StructField('f1', IntegerType(), nullable=True)])
df = self.spark.read.option('header', 'true').schema(schema)\
.csv(tmpPath, enforceSchema=False)
self.assertRaisesRegexp(
Exception,
"CSV file header does not contain the expected fields",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error message is a little confusing. We do contain the fields, but in different order.

Copy link
Member Author

@MaxGekk MaxGekk May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, I have already changed the error message as @hvanhovell suggested. What about:

CSV header is not conform to the schema.

So, error message will look like:

java.lang.IllegalArgumentException: CSV header is not conform to the schema.
 Header: depth, temperature
 Schema: temperature, depth
Expected: temperature but found: depth
CSV file: marina.csv

Is it ok for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this looks good.

lambda: df.collect())
finally:
shutil.rmtree(tmpPath)


class HiveSparkSubmitTests(SparkSubmitTests):

Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.{Locale, Properties}
import scala.collection.JavaConverters._

import com.fasterxml.jackson.databind.ObjectMapper
import com.univocity.parsers.csv.CsvParser

import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
Expand Down Expand Up @@ -497,6 +498,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
if (parsedOptions.enforceSchema == false) {
CSVDataSource.checkHeader(firstLine, new CsvParser(parsedOptions.asParserSettings),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this logic ever validate each file's header at all?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should check CSV header each time when they are removed. Do you think I missed some cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we could, for example, make a dataset from spark.read.text("tmp/*.csv"), preprocess it and then convert it via spark.read.csv(dataset). In this case, every file would have the header. This doesn't validate each file's header.

Shall we document this if it's hard to fix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for explaining of the use case. I didn't realize that the input dataset can contain multiple headers. So, potentially the headers can contain column names in different order or even incorrect column names. It seems pretty tough to fix that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add comments for csv() in DataFrameReader and in readwrite.py as well in streaming.py

actualSchema, csvDataset.getClass.getCanonicalName, checkHeaderFlag = true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please make them consistent. rename it to enforceSchema

sparkSession.sessionState.conf.caseSensitiveAnalysis)
}
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
}.getOrElse(filteredLines.rdd)

Expand Down Expand Up @@ -537,6 +543,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`comment` (default empty string): sets a single character used for skipping lines
* beginning with this character. By default, it is disabled.</li>
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`enforceSchema` (default `true`): If it is set to `true`, the specified or inferred schema
* will be forcibly applied to datasource files and headers in CSV files will be ignored.
* If the option is set to `false`, the schema will be validated against headers in CSV files
* in the case when the `header` option is set to `true`. The validation is performed in column
* ordering aware manner by taking into account `spark.sql.caseSensitive`.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It
* requires one extra pass over the data.</li>
* <li>`samplingRatio` (default is 1.0): defines fraction of rows used for schema inferring.</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ abstract class CSVDataSource extends Serializable {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow]
requiredSchema: StructType,
// Actual schema of data in the csv file
dataSchema: StructType,
caseSensitive: Boolean): Iterator[InternalRow]

/**
* Infers the schema from `inputPaths` files.
Expand Down Expand Up @@ -118,6 +121,64 @@ object CSVDataSource {
TextInputCSVDataSource
}
}

/**
* Checks that column names in a CSV header and field names in the schema are the same
* by taking into account case sensitivity.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To help readability, how about at least adding @param columnNames and @param schema?

def checkHeaderColumnNames(
schema: StructType,
columnNames: Array[String],
fileName: String,
checkHeaderFlag: Boolean,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also rename it too.

caseSensitive: Boolean): Unit = {
if (checkHeaderFlag && columnNames != null) {
val fieldNames = schema.map(_.name).toIndexedSeq
val (headerLen, schemaSize) = (columnNames.size, fieldNames.length)

if (headerLen == schemaSize) {
var i = 0
while (i < headerLen) {
var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i))
if (!caseSensitive) {
nameInSchema = nameInSchema.toLowerCase
nameInHeader = nameInHeader.toLowerCase
}
if (nameInHeader != nameInSchema) {
throw new IllegalArgumentException(
s"""|CSV file header does not contain the expected fields.
| Header: ${columnNames.mkString(", ")}
| Schema: ${fieldNames.mkString(", ")}
|Expected: ${columnNames(i)} but found: ${fieldNames(i)}
|CSV file: $fileName""".stripMargin)
}
i += 1
}
} else {
throw new IllegalArgumentException(
s"""|Number of column in CSV header is not equal to number of fields in the schema:
| Header length: $headerLen, schema size: $schemaSize
|CSV file: $fileName""".stripMargin)
}
}
}

/**
* Checks that CSV header contains the same column names as fields names in the given schema
* by taking into account case sensitivity.
*/
def checkHeader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to define this in CSVDataSource? It sounds like this is not being shared with the CSV multi-line reader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkHeader is using in the csv method of DataFrameReader. I think the CSVDataSource object can encapsulate the checking.

header: String,
parser: CsvParser,
schema: StructType,
fileName: String,
checkHeaderFlag: Boolean,
caseSensitive: Boolean): Unit = {
if (checkHeaderFlag) {
checkHeaderColumnNames(schema, parser.parseLine(header), fileName, checkHeaderFlag,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be like this?

checkHeaderColumnNames(
    schema,
    parser.parseLine(header),
    fileName,
    checkHeaderFlag,
    caseSensitive)

caseSensitive)
}
}
}

object TextInputCSVDataSource extends CSVDataSource {
Expand All @@ -127,7 +188,9 @@ object TextInputCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
requiredSchema: StructType,
dataSchema: StructType,
caseSensitive: Boolean): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
Expand All @@ -136,8 +199,19 @@ object TextInputCSVDataSource extends CSVDataSource {
}
}

val shouldDropHeader = parser.options.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
val hasHeader = parser.options.headerFlag && file.start == 0
if (hasHeader) {
// Checking that column names in the header are matched to field names of the schema.
// The header will be removed from lines.
// Note: if there are only comments in the first block, the header would probably
// be not extracted.
CSVUtils.extractHeader(lines, parser.options).foreach { header =>
CSVDataSource.checkHeader(header, parser.tokenizer, dataSchema, file.filePath,
checkHeaderFlag = !parser.options.enforceSchema, caseSensitive)
}
}

UnivocityParser.parseIterator(lines, parser, requiredSchema)
}

override def infer(
Expand Down Expand Up @@ -206,24 +280,33 @@ object MultiLineCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
schema: StructType): Iterator[InternalRow] = {
requiredSchema: StructType,
dataSchema: StructType,
caseSensitive: Boolean): Iterator[InternalRow] = {
def checkHeader(header: Array[String]): Unit = {
CSVDataSource.checkHeaderColumnNames(dataSchema, header, file.filePath,
checkHeaderFlag = !parser.options.enforceSchema, caseSensitive)
}

UnivocityParser.parseStream(
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
parser.options.headerFlag,
parser,
schema)
parser.options.headerFlag, parser, requiredSchema, checkHeader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indents. We should follow the original way

}

override def infer(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
// The header is not checked because there is no schema against with it could be check
def checkHeader(header: Array[String]): Unit = ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line should be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit but shall we revert this newline while we are here?

csv.flatMap { lines =>
val path = new Path(lines.getPath())
UnivocityParser.tokenizeStream(
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
shouldDropHeader = false,
dropFirstRecord = false,
checkHeader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the checkHeader is doing nothing here. Why need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is defined here because header is an entity of CSV level. Stream tokenizer (in UnivocityParser) works on lower level.

Copy link
Member

@gengliangwang gengliangwang May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean the one in line 302 is doing nothing.

new CsvParser(parsedOptions.asParserSettings))
}.take(1).headOption match {
case Some(firstRow) =>
Expand All @@ -235,6 +318,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
lines.getConfiguration,
new Path(lines.getPath())),
parsedOptions.headerFlag,
checkHeader,
new CsvParser(parsedOptions.asParserSettings))
}
val sampled = CSVUtils.sample(tokenRDD, parsedOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,16 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
"df.filter($\"_corrupt_record\".isNotNull).count()."
)
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema, dataSchema,
caseSensitive)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the same here.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ class CSVOptions(
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

/**
* Forcibly apply the specified or inferred schema to datasource files.
* If the option is enabled, headers of CSV files will be ignored.
*/
val enforceSchema = getBool("enforceSchema", true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migration guide for behaviour change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't change existing behavior. By default, user's or inferred schema is forcibly applied, and CSV headers are ignored completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missied:

+          checkHeaderFlag = !parser.options.enforceSchema, caseSensitive)

Can't we simply name the option checkHeader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, it was checkHeader (see 9b1a986#diff-eb708fbebdf6d20d1ab1b109f5f2cd56R153) but @rxin proposed another names enforceSchema or enforceHeaders , and I renamed it to enforceSchema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enforceSchema is from Reynold. I am also fine about the name. This name can be applied to the other data sources later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enforcing schema sounds the names and also types too whereas the current PR only deals with the names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @gatorsmile wrote the name was selected in the perspective that similar option can be applied to another datasources that contains type info explicitly. CSV doesn't types. How would you propose to check types?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know 'schema' implies source's schema. Yup, if there's a plan for it, I'm okay. we can talk about this later too for the naming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For booleans, we need to name boolean parameters.

getBool("enforceSchema", default = true)


def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,28 @@ object CSVUtils {
}
}

/**
* Drop header line so that only data can remain.
* This is similar with `filterHeaderLine` above and currently being used in CSV reading path.
*/
def dropHeaderLine(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
val nonEmptyLines = if (options.isCommentSet) {
def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
}
} else {
iter.dropWhile(_.trim.isEmpty)
}

if (nonEmptyLines.hasNext) nonEmptyLines.drop(1)
iter
}

/**
* Extracts header and moves iterator forward so that only data remains in it
*/
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = {
val nonEmptyLines = skipComments(iter, options)
if (nonEmptyLines.hasNext) {
Some(nonEmptyLines.next())
} else {
None
}
}
/**
* Helper method that converts string representation of a character to actual character.
* It handles some Java escaped strings and throws exception if given string is longer than one
Expand Down
Loading