Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1354,9 +1354,8 @@ test_that("column functions", {
# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
error = function(e) { stop(e) }),
paste0(".*(java.lang.NumberFormatException: For input string:).*"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also sounds a behavior change. Could you add another test case here to trigger the exception?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or a bug fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's a minor bug fix, see cloud-fan#4

I'm not sure if it worth a ticket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see.

s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, BadRecordException, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -555,7 +555,7 @@ case class JsonToStruct(
CreateJacksonParser.utf8String,
identity[UTF8String]))
} catch {
case _: SparkSQLJsonProcessingException => null
case _: BadRecordException => null
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about creating an enum, like what we are doing for SaveMode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this can be a good follow-up

val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)

/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: StructType,
options: JSONOptions) extends Logging {
val options: JSONOptions) extends Logging {

import JacksonUtils._
import ParseModes._
import com.fasterxml.jackson.core.JsonToken._

// A `ValueConverter` is responsible for converting a value from `JsonParser`
Expand All @@ -55,108 +52,6 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))

private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
corruptFieldIndex.foreach { corrFieldIndex =>
require(schema(corrFieldIndex).dataType == StringType)
require(schema(corrFieldIndex).nullable)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above checking sounds missing in the new codes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a sanity check, actually this check is already done in DataFrameReader.csv/json and JsonFileFormat/CSVFileFormat


@transient
private[this] var isWarningPrinted: Boolean = false

@transient
private def printWarningForMalformedRecord(record: () => UTF8String): Unit = {
def sampleRecord: String = {
if (options.wholeFile) {
""
} else {
s"Sample record: ${record()}\n"
}
}

def footer: String = {
s"""Code example to print all malformed records (scala):
|===================================================
|// The corrupted record exists in column ${options.columnNameOfCorruptRecord}.
|val parsedJson = spark.read.json("/path/to/json/file/test.json")
|
""".stripMargin
}

if (options.permissive) {
logWarning(
s"""Found at least one malformed record. The JSON reader will replace
|all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
|To find out which corrupted records have been replaced with null, please use the
|default inferred schema instead of providing a custom schema.
|
|${sampleRecord ++ footer}
|
""".stripMargin)
} else if (options.dropMalformed) {
logWarning(
s"""Found at least one malformed record. The JSON reader will drop
|all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
|corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
|mode and use the default inferred schema.
|
|${sampleRecord ++ footer}
|
""".stripMargin)
}
}

@transient
private def printWarningIfWholeFile(): Unit = {
if (options.wholeFile && corruptFieldIndex.isDefined) {
logWarning(
s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result
|in very large allocations or OutOfMemoryExceptions being raised.
|
""".stripMargin)
}
}

/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
*/
private def failedRecord(record: () => UTF8String): Seq[InternalRow] = {
corruptFieldIndex match {
case _ if options.failFast =>
if (options.wholeFile) {
throw new SparkSQLJsonProcessingException("Malformed line in FAILFAST mode")
} else {
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: ${record()}")
}

case _ if options.dropMalformed =>
if (!isWarningPrinted) {
printWarningForMalformedRecord(record)
isWarningPrinted = true
}
Nil

case None =>
if (!isWarningPrinted) {
printWarningForMalformedRecord(record)
isWarningPrinted = true
}
emptyRow

case Some(corruptIndex) =>
if (!isWarningPrinted) {
printWarningIfWholeFile()
isWarningPrinted = true
}
val row = new GenericInternalRow(schema.length)
row.update(corruptIndex, record())
Seq(row)
}
}

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -239,7 +134,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
throw new RuntimeException(s"Cannot parse $value as FloatType.")
}
}

Expand All @@ -259,7 +154,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
throw new RuntimeException(s"Cannot parse $value as DoubleType.")
}
}

Expand Down Expand Up @@ -391,9 +286,8 @@ class JacksonParser(

case token =>
// We cannot parse this token based on the given data type. So, we throw a
// SparkSQLJsonProcessingException and this exception will be caught by
// `parse` method.
throw new SparkSQLJsonProcessingException(
// RuntimeException and this exception will be caught by `parse` method.
throw new RuntimeException(
s"Failed to parse a value for data type $dataType (current token: $token).")
}

Expand Down Expand Up @@ -466,14 +360,14 @@ class JacksonParser(
parser.nextToken() match {
case null => Nil
case _ => rootConverter.apply(parser) match {
case null => throw new SparkSQLJsonProcessingException("Root converter returned null")
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
}
}
}
} catch {
case _: JsonProcessingException | _: SparkSQLJsonProcessingException =>
failedRecord(() => recordLiteral(record))
case e @ (_: RuntimeException | _: JsonProcessingException) =>
throw BadRecordException(() => recordLiteral(record), () => None, e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
func: IN => Seq[InternalRow],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename it? rawParser?

mode: String,
schema: StructType,
columnNameOfCorruptRecord: String) {

private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)

private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we should explain what happens within toResultRowwith some comments. This might also be okay to be included in the follow-up.

if (corruptFieldIndex.isDefined) {
(row, badRecord) => {
for ((f, i) <- actualSchema.zipWithIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull
}
resultRow(corruptFieldIndex.get) = badRecord()
resultRow
}
} else {
(row, badRecord) => row.getOrElse {
for (i <- schema.indices) resultRow.setNullAt(i)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to my understanding, the last commit focuses on simplification. I like that but I think maybe we should use while here instead unless we are very sure that the byte codes are virtually the same or more efficient in the critical path. This change might not harm the simplification much here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran some tests with the codes below to help.

object ForWhile {
  def forloop = {
    val l = Array[Int](1,2,3)
    for (i <- l) {
    }
  }

  def whileloop = {
    val arr = Array[Int](1,2,3)
    var idx = 0
    while(idx < arr.length) {
      idx += 1
    }
  }
}
Compiled from "ForWhile.scala"
public final class ForWhile {
  public static void whileloop();
    Code:
       0: getstatic     #16                 // Field ForWhile$.MODULE$:LForWhile$;
       3: invokevirtual #18                 // Method ForWhile$.whileloop:()V
       6: return

  public static void forloop();
    Code:
       0: getstatic     #16                 // Field ForWhile$.MODULE$:LForWhile$;
       3: invokevirtual #21                 // Method ForWhile$.forloop:()V
       6: return
}

Compiled from "ForWhile.scala"
public final class ForWhile$ {
  public static final ForWhile$ MODULE$;

  public static {};
    Code:
       0: new           #2                  // class ForWhile$
       3: invokespecial #12                 // Method "<init>":()V
       6: return

  public void forloop();
    Code:
       0: getstatic     #18                 // Field scala/Array$.MODULE$:Lscala/Array$;
       3: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       6: iconst_3
       7: newarray       int
       9: dup
      10: iconst_0
      11: iconst_1
      12: iastore
      13: dup
      14: iconst_1
      15: iconst_2
      16: iastore
      17: dup
      18: iconst_2
      19: iconst_3
      20: iastore
      21: invokevirtual #27                 // Method scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
      24: getstatic     #32                 // Field scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
      27: invokevirtual #36                 // Method scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
      30: invokevirtual #40                 // Method scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
      33: checkcast     #42                 // class "[I"
      36: astore_1
      37: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
      40: aload_1
      41: invokevirtual #46                 // Method scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
      44: new           #48                 // class ForWhile$$anonfun$forloop$1
      47: dup
      48: invokespecial #49                 // Method ForWhile$$anonfun$forloop$1."<init>":()V
      51: invokeinterface #55,  2           // InterfaceMethod scala/collection/mutable/ArrayOps.foreach:(Lscala/Function1;)V
      56: return

  public void whileloop();
    Code:
       0: getstatic     #18                 // Field scala/Array$.MODULE$:Lscala/Array$;
       3: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       6: iconst_3
       7: newarray       int
       9: dup
      10: iconst_0
      11: iconst_1
      12: iastore
      13: dup
      14: iconst_1
      15: iconst_2
      16: iastore
      17: dup
      18: iconst_2
      19: iconst_3
      20: iastore
      21: invokevirtual #27                 // Method scala/Predef$.wrapIntArray:([I)Lscala/collection/mutable/WrappedArray;
      24: getstatic     #32                 // Field scala/reflect/ClassTag$.MODULE$:Lscala/reflect/ClassTag$;
      27: invokevirtual #36                 // Method scala/reflect/ClassTag$.Int:()Lscala/reflect/ClassTag;
      30: invokevirtual #40                 // Method scala/Array$.apply:(Lscala/collection/Seq;Lscala/reflect/ClassTag;)Ljava/lang/Object;
      33: checkcast     #42                 // class "[I"
      36: astore_1
      37: iconst_0
      38: istore_2
      39: iload_2
      40: aload_1
      41: arraylength
      42: if_icmpge     52
      45: iload_2
      46: iconst_1
      47: iadd
      48: istore_2
      49: goto          39
      52: return
}

Compiled from "ForWhile.scala"
public final class ForWhile$$anonfun$forloop$1 extends scala.runtime.AbstractFunction1$mcVI$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final void apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #21                 // Method apply$mcVI$sp:(I)V
       5: return

  public void apply$mcVI$sp(int);
    Code:
       0: return

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #32                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #34                 // Method apply:(I)V
       8: getstatic     #40                 // Field scala/runtime/BoxedUnit.UNIT:Lscala/runtime/BoxedUnit;
      11: areturn

  public ForWhile$$anonfun$forloop$1();
    Code:
       0: aload_0
       1: invokespecial #45                 // Method scala/runtime/AbstractFunction1$mcVI$sp."<init>":()V
       4: return
}

resultRow
}
}
}

def parse(input: IN): Iterator[InternalRow] = {
try {
func(input).toIterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
Iterator(toResultRow(e.partialResult(), e.record))
case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
Iterator.empty
// If the parse mode is FAIL FAST, do not catch the exception.
}
}
}

case class BadRecordException(
record: () => UTF8String,
partialResult: () => Option[InternalRow],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could leave some comments for partialResult too.

cause: Throwable) extends Exception(cause)
21 changes: 17 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.FailureSafeParser
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.csv._
Expand Down Expand Up @@ -382,11 +383,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: dataSchema -> actualSchema? Be consistent what we did in the other place?


val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val parser = new JacksonParser(schema, parsedOptions)
iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
val rawParser = new JacksonParser(dataSchema, parsedOptions)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about passing dataSchema to FailureSafeParser? It can simplify the codes in FailureSafeParser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need schema anyway, passing dataSchema only saves one line in FailureSafeParser...

parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}

Dataset.ofRows(
Expand Down Expand Up @@ -435,14 +442,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}

verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: dataSchema -> actualSchema? Be consistent what we did in the other place?


val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
}.getOrElse(filteredLines.rdd)

val parsed = linesWithoutHeader.mapPartitions { iter =>
val parser = new UnivocityParser(schema, parsedOptions)
iter.flatMap(line => parser.parse(line))
val rawParser = new UnivocityParser(dataSchema, parsedOptions)
val parser = new FailureSafeParser[String](
input => Seq(rawParser.parse(input)),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}

Dataset.ofRows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ abstract class CSVDataSource extends Serializable {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow]
schema: StructType): Iterator[InternalRow]

/**
* Infers the schema from `inputPaths` files.
Expand Down Expand Up @@ -115,17 +115,17 @@ object TextInputCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow] = {
schema: StructType): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, parsedOptions.charset)
new String(line.getBytes, 0, line.getLength, parser.options.charset)
}
}

val shouldDropHeader = parsedOptions.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser)
val shouldDropHeader = parser.options.headerFlag && file.start == 0
UnivocityParser.parseIterator(lines, shouldDropHeader, parser, schema)
}

override def infer(
Expand Down Expand Up @@ -192,11 +192,12 @@ object WholeFileCSVDataSource extends CSVDataSource {
conf: Configuration,
file: PartitionedFile,
parser: UnivocityParser,
parsedOptions: CSVOptions): Iterator[InternalRow] = {
schema: StructType): Iterator[InternalRow] = {
UnivocityParser.parseStream(
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
parsedOptions.headerFlag,
parser)
parser.options.headerFlag,
parser,
schema)
}

override def infer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {

(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
val parser = new UnivocityParser(dataSchema, requiredSchema, parsedOptions)
CSVDataSource(parsedOptions).readFile(conf, file, parser, parsedOptions)
val parser = new UnivocityParser(
StructType(dataSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
StructType(requiredSchema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)),
parsedOptions)
CSVDataSource(parsedOptions).readFile(conf, file, parser, requiredSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need FailureSafeParser in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need, and the logic is in readFile

}
}

Expand Down
Loading