Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 30, 2016

What changes were proposed in this pull request?

This PR refactors CSV data source to be consistent with JSON data source.

This PR removes classes CSVParser and introduces new classes UnivocityParser and UnivocityGenerator to be consistent with JSON data source (JacksonParser, JacksonGenerator). Also, CSVRelation is merged with CSVFileFormat just like JsonFileFormat.

This is a rough look of this change:

CSVOptions - reading/writing settings that can be created from options from CsvParser.

UnivocityGenerator - writing logics from CSVRelation and CsvParser

UnivocityParser - parsing logics in CSVTypeCast, CsvParser and CSVRelation

CSVFileFormat - CSVOutputWriterFactory and CsvOutputWriter in CSVRelation

This PR makes the methods in classes have consistent arguments with JSON ones.

  • UnivocityGenerator and JacksonGenerator

    private[csv] class UnivocityGenerator(
        schema: StructType,
        writer: Writer,
        options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
    ...
    
    def write ...
    def close ...
    def flush ...
    private[sql] class JacksonGenerator(
       schema: StructType,
       writer: Writer,
       options: JSONOptions = new JSONOptions(Map.empty[String, String])) {
    ...
    
    def write ...
    def close ...
    def flush ...
  • UnivocityParser resembles JacksonParser.

    private[csv] class UnivocityParser(
        schema: StructType,
        requiredSchema: StructType,
        options: CSVOptions) extends Logging {
    def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)
    ...
    def parse(input: String)
    class JacksonParser(
        schema: StructType,
        columnNameOfCorruptRecord: String,
        options: JSONOptions) extends Logging {
    ...
    def parse(input: String)
  • csv.CSVInferSchema and json.InferSchema

    csv.CSVInferSchema.infer(csv: Dataset[String], caseSensitive: Boolean, options: CSVOptions)
    
    json.InferSchema.infer(json: RDD[String], columnNameOfCorruptRecord: String , configOptions: JSONOptions)
  • This PR also makes the classes put in together in a consistent manner with JSON.

    • CsvFileFormat

      CsvFileFormat
      CsvOutputWriter
    • JsonFileFormat

      JsonFileFormat
      JsonOutputWriter

Also, this re-write existing CSV parsing logics to re-use the row, separate parsing mode logic/convering logics and etc.

How was this patch tested?

Existing tests should cover this.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jun 30, 2016

I still need to correct some nits and check the consistency with JSON data source but I opened this just to check if it breaks anything. I will submit some more commits soon. (and will also update the PR description to be in more details maybe).

@SparkQA
Copy link

SparkQA commented Jun 30, 2016

Test build #61523 has finished for PR 13988 at commit 211bfb4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon changed the title [WIP][SPARK-16101][SQL] Refactoring CSV data source to be consistent with JSON data source [SPARK-16101][SQL][WIP] Refactoring CSV data source to be consistent with JSON data source Jun 30, 2016
@HyukjinKwon HyukjinKwon changed the title [SPARK-16101][SQL][WIP] Refactoring CSV data source to be consistent with JSON data source [SPARK-16101][SQL] Refactoring CSV data source to be consistent with JSON data source Jul 1, 2016
@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Jul 1, 2016

Hi @rxin, I think the change in this PR might be still pretty big. Should I maybe make this separate into two PRs for both reading and writing parts?

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61587 has finished for PR 13988 at commit b1050b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61588 has finished for PR 13988 at commit 0d60c57.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

(@rxin gentle ping..)

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62011 has finished for PR 13988 at commit f0d1512.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62014 has finished for PR 13988 at commit 97d7bd4.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@HyukjinKwon
Copy link
Member Author

I updated the PR description. I hope this is helpful for reviewing.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62015 has finished for PR 13988 at commit 97d7bd4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2016

Test build #62016 has finished for PR 13988 at commit 97d7bd4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon we ran in to this issue where csv writes ints for DateType instead of date string. (https://issues.apache.org/jira/browse/SPARK-16597)

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jul 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I opened another PR here, #13912. Maybe it is about that PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks, commented on that PR. Glad to see someone showing some love to Spark's csv datasource!

@HyukjinKwon
Copy link
Member Author

@rxin Could you take a look please?

@SparkQA
Copy link

SparkQA commented Aug 9, 2016

Test build #63425 has finished for PR 13988 at commit a634435.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63482 has finished for PR 13988 at commit c143a01.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
    • class CacheManager extends Logging
    • trait DataSourceScanExec extends LeafExecNode with CodegenSupport
    • case class RowDataSourceScanExec(
    • case class FileSourceScanExec(
    • case class ExternalRDD[T](
    • case class ExternalRDDScanExec[T](
    • case class LogicalRDD(
    • case class RDDScanExec(
    • trait FileRelation
    • case class LocalTableScanExec(
    • abstract class RowIterator
    • trait LeafExecNode extends SparkPlan
    • trait UnaryExecNode extends SparkPlan
    • trait BinaryExecNode extends SparkPlan
    • case class PlanLater(plan: LogicalPlan) extends LeafExecNode
    • abstract class SparkStrategies extends QueryPlanner[SparkPlan]
    • class UnsafeRowSerializer(
    • case class ScalaUDAF(
    • case class InMemoryRelation(
    • case class InMemoryTableScanExec(
    • trait RunnableCommand extends LogicalPlan with logical.Command
    • case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan
    • case class AlterTableRecoverPartitionsCommand(
    • case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan]
    • class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
    • case class InsertIntoDataSourceCommand(
    • case class InsertIntoHadoopFsRelationCommand(
    • case class PartitionDirectory(values: InternalRow, path: Path)
    • case class PartitionSpec(
    • class CSVOutputWriterFactory(options: CSVOptions) extends OutputWriterFactory
    • class CsvOutputWriter(
    • case class JDBCPartition(whereClause: String, idx: Int) extends Partition
    • class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan]
    • case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan]
    • case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
    • case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport
    • class ExchangeCoordinator(
    • case class MapPartitionsRWrapper(
    • class IncrementalExecution(
    • class ExecutionPage(parent: SQLTab) extends WebUIPage(\"execution\") with Logging
    • class SQLHistoryListenerFactory extends SparkHistoryListenerFactory
    • class SQLListener(conf: SparkConf) extends SparkListener with Logging
    • class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
    • class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
    • case class SparkPlanGraph(

@SparkQA
Copy link

SparkQA commented Aug 23, 2016

Test build #64254 has finished for PR 13988 at commit 346c4a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64635 has finished for PR 13988 at commit 52ca52b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

cc @hvanhovell Do you mind if I ask to review this please? I remember the initial proposal was reviewed by you. If this seems too big to review, I can split this into reading path and writing path.

@HyukjinKwon
Copy link
Member Author

This is also loosely related with https://issues.apache.org/jira/browse/SPARK-15463. After this one is merged, we could resemble the implementation of JSON one easily rather then introducing another refactoring.

@SparkQA
Copy link

SparkQA commented Sep 21, 2016

Test build #65716 has finished for PR 13988 at commit 015567e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon HyukjinKwon force-pushed the SPARK-16101 branch 2 times, most recently from a6d85b6 to ac94e67 Compare September 28, 2016 10:25
@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66037 has finished for PR 13988 at commit a6d85b6.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66035 has finished for PR 13988 at commit df69c8d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 28, 2016

Test build #66038 has finished for PR 13988 at commit ac94e67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

@hvanhovell If this change looks too big, I will split this into reading path and writing path if you confirm please.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66939 has finished for PR 13988 at commit 697f276.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2016

Test build #66936 has finished for PR 13988 at commit 97eec87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2016

Test build #67391 has finished for PR 13988 at commit 346b1d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

I will try to split this into two PRs for read path and write path. Would that sound okay to you both @rxin and @hvanhovell?

schema.foreach(field => verifyType(field.dataType))
}
}

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These below just came from CSVRelation.

* 2. Merge row types to find common type
* 3. Replace any null types with string type
*/
def infer(
Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument change is kind of a important change to introduce similar functionalities with JSON. (e,g., creating a dataframe from RDD[String] or Dataset[String]).

case datum =>
Try(datum.toDouble)
.getOrElse(NumberFormat.getInstance(Locale.US).parse(datum).doubleValue())
private def makeSafeHeader(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just came from CSVFileFormat.


val isCommentSet = this.comment != '\u0000'

def asWriterSettings: CsvWriterSettings = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These just came from CSVParser.

writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)

// A `ValueConverter` is responsible for converting a value of an `InternalRow` to `String`.
Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These below just mostly came from CSVRelation.

private type ValueConverter = String => Any

var numMalformedRecords = 0
val row = new GenericInternalRow(requiredSchema.length)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, we reuse the single row.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it separates numMalformedRecords when it calls parse (...) which looked weird before.

* each element represents a column) and turns it into either one resulting row or no row (if the
* the record is malformed).
*/
def parse(input: String): Option[InternalRow] = {
Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I separate the parsing mode logics (withParseMode) and actual converting logics (parse).

Copy link
Member Author

@HyukjinKwon HyukjinKwon Jan 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the argument change (matching it up to JacksonParser) is also important. We could avoid additional refactoring when introducing the same funtionalities with JacksonParser, (e.g., loading it from RDD[String] or Dataset[String], from_json and to_json functions).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, PR - 13300 introduces such refactoring.

@SparkQA
Copy link

SparkQA commented Jan 5, 2017

Test build #70917 has finished for PR 13988 at commit 72e7dcc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2017

Test build #70919 has finished for PR 13988 at commit 34fc6ca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2017

Test build #70920 has finished for PR 13988 at commit 08e5fe7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2017

Test build #70923 has finished for PR 13988 at commit 4c6666d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Hi @cloud-fan, do you mind if I ask to check whether it looks making sense?

options: CSVOptions) extends Logging {
def this(schema: StructType, options: CSVOptions) = this(schema, schema, options)

val valueConverters = makeConverters(schema, options)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some changes about converting here came from CSVTypeCast.

@cloud-fan
Copy link
Contributor

can you split this into smaller PRs? it's really painful to review such a big refactor-only PR.

@HyukjinKwon
Copy link
Member Author

Sure! Let me split this into reading and writing ones. Thank you for yout comments. Let me close this for now.

@HyukjinKwon HyukjinKwon closed this Jan 6, 2017
@HyukjinKwon HyukjinKwon deleted the SPARK-16101 branch January 2, 2018 03:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants