From 34a9a2523cccb49279db317263282204720593b3 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 12 Jul 2019 18:33:27 +0800 Subject: [PATCH 01/13] init pr --- .../org/apache/spark/rdd/HadoopRDD.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3974580cfaa11..e6b4fbd3cf4fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -25,6 +25,7 @@ import scala.collection.immutable.Map import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapred._ import org.apache.hadoop.mapred.lib.CombineFileSplit import org.apache.hadoop.mapreduce.TaskType @@ -196,6 +197,24 @@ class HadoopRDD[K, V]( newInputFormat } + private val UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD = 1024 * 1024 * 1024 + + @transient private lazy val compressionCodecs = new CompressionCodecFactory(getJobConf()) + + private def checkAndLogUnsplittableLargeFile(split: InputSplit): Unit = { + if (split.isInstanceOf[FileSplit]) { + val fileSplit = split.asInstanceOf[FileSplit] + val path = fileSplit.getPath + val codec = compressionCodecs.getCodec(path) + if (codec != null && !codec.isInstanceOf[SplittableCompressionCodec]) { + if (fileSplit.getLength > UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD) { + logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + + s"rdd partition have to deal with the whole file and consume large time.") + } + } + } + } + override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized @@ -207,6 +226,9 @@ class HadoopRDD[K, V]( } else { allInputSplits } + if (inputSplits.length == 1) { + checkAndLogUnsplittableLargeFile(inputSplits(0)) + } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) From b48ced168f68c02414c6e02ef3c04b1d676e367a Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 17 Jul 2019 01:18:25 +0800 Subject: [PATCH 02/13] update --- .../spark/internal/config/package.scala | 7 +++++ .../org/apache/spark/rdd/HadoopRDD.scala | 26 +++++++------------ .../scala/org/apache/spark/util/Utils.scala | 7 +++++ .../execution/datasources/v2/FileScan.scala | 16 +++++++++++- .../datasources/v2/TextBasedFileScan.scala | 15 ++++------- 5 files changed, 43 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 7c332fdb85721..c8173fb78c5e6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1180,6 +1180,13 @@ package object config { .intConf .createWithDefault(1) + private[spark] val IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD = + ConfigBuilder("spark.io.file.unsplittable.warning.threshold") + .doc("When spark loading one single large unsplittable file, if file size exceed this " + + "threshold, then log warning.") + .longConf + .createWithDefault(1024 * 1024 * 1024) + private[spark] val EVENT_LOG_COMPRESSION_CODEC = ConfigBuilder("spark.eventLog.compression.codec") .doc("The codec used to compress event log. By default, Spark provides four codecs: " + diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index e6b4fbd3cf4fd..87942b96b1abc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -41,7 +41,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation} import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager} +import org.apache.spark.util.{NextIterator, SerializableConfiguration, ShutdownHookManager, Utils} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -199,21 +199,7 @@ class HadoopRDD[K, V]( private val UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD = 1024 * 1024 * 1024 - @transient private lazy val compressionCodecs = new CompressionCodecFactory(getJobConf()) - - private def checkAndLogUnsplittableLargeFile(split: InputSplit): Unit = { - if (split.isInstanceOf[FileSplit]) { - val fileSplit = split.asInstanceOf[FileSplit] - val path = fileSplit.getPath - val codec = compressionCodecs.getCodec(path) - if (codec != null && !codec.isInstanceOf[SplittableCompressionCodec]) { - if (fileSplit.getLength > UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD) { - logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + - s"rdd partition have to deal with the whole file and consume large time.") - } - } - } - } + @transient private lazy val codecFactory = new CompressionCodecFactory(getJobConf()) override def getPartitions: Array[Partition] = { val jobConf = getJobConf() @@ -227,7 +213,13 @@ class HadoopRDD[K, V]( allInputSplits } if (inputSplits.length == 1) { - checkAndLogUnsplittableLargeFile(inputSplits(0)) + val fileSplit = inputSplits(0).asInstanceOf[FileSplit] + val path = fileSplit.getPath + if (Utils.isFileSplittable(path, codecFactory) + && fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { + logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + + s"rdd partition have to deal with the whole file and consume large time.") + } } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 80d70a1d48504..faacb14419fc9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -51,6 +51,7 @@ import com.google.common.net.InetAddresses import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.eclipse.jetty.util.MultiException @@ -2895,6 +2896,12 @@ private[spark] object Utils extends Logging { def isLocalUri(uri: String): Boolean = { uri.startsWith(s"$LOCAL_SCHEME:") } + + /** Check whether the file of the path is splittable. */ + def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = { + val codec = codecFactory.getCodec(path) + codec == null || codec.isInstanceOf[SplittableCompressionCodec] + } } private[util] object CallerContext extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index b2f3c4d256448..55a32146c75b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -21,6 +21,8 @@ import java.util.{Locale, OptionalLong} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path +import org.apache.spark.internal.config.IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD +import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -36,7 +38,9 @@ abstract class FileScan( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, readDataSchema: StructType, - readPartitionSchema: StructType) extends Scan with Batch with SupportsReportStatistics { + readPartitionSchema: StructType) + extends Scan + with Batch with SupportsReportStatistics with Logging { /** * Returns whether a file with `path` could be split or not. */ @@ -91,6 +95,16 @@ abstract class FileScan( ) }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) } + + if (splitFiles.length == 1) { + val path = new Path(splitFiles(0).filePath) + if (isSplitable(path) && splitFiles(0).length > + sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { + logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + + s"rdd partition have to deal with the whole file and consume large time.") + } + } + FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index d6b84dcdfd15d..e85af891e3e11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -19,12 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils abstract class TextBasedFileScan( sparkSession: SparkSession, @@ -33,14 +34,8 @@ abstract class TextBasedFileScan( readPartitionSchema: StructType, options: CaseInsensitiveStringMap) extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) { - private var codecFactory: CompressionCodecFactory = _ + @transient private lazy val codecFactory: CompressionCodecFactory = new CompressionCodecFactory( + sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) - override def isSplitable(path: Path): Boolean = { - if (codecFactory == null) { - codecFactory = new CompressionCodecFactory( - sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) - } - val codec = codecFactory.getCodec(path) - codec == null || codec.isInstanceOf[SplittableCompressionCodec] - } + override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) } From fafba7be512918b7730db7bdd86238d01b369eef Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 17 Jul 2019 01:24:04 +0800 Subject: [PATCH 03/13] fix --- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 55a32146c75b4..d2e6731eb533a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -21,17 +21,15 @@ import java.util.{Locale, OptionalLong} import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path -import org.apache.spark.internal.config.IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils abstract class FileScan( From 4ee25d6a4610e99b4bc5f3d2b57408abe6af569c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 17 Jul 2019 01:26:40 +0800 Subject: [PATCH 04/13] remove useless code --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 87942b96b1abc..7e1df97a9f9fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -197,8 +197,6 @@ class HadoopRDD[K, V]( newInputFormat } - private val UNSPLITTABLE_FILE_SIZE_LOG_THRESHOLD = 1024 * 1024 * 1024 - @transient private lazy val codecFactory = new CompressionCodecFactory(getJobConf()) override def getPartitions: Array[Partition] = { From 736587beada1d3796f376d41bfec625b3d3837af Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 17 Jul 2019 09:37:47 +0800 Subject: [PATCH 05/13] refine logging message with detail unsplittable reason --- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +++--- .../sql/execution/datasources/v2/FileScan.scala | 8 ++++++-- .../datasources/v2/TextBasedFileScan.scala | 8 ++++++++ .../sql/execution/datasources/v2/csv/CSVScan.scala | 14 +++++++++++++- .../execution/datasources/v2/json/JsonScan.scala | 12 ++++++++++++ .../execution/datasources/v2/text/TextScan.scala | 12 ++++++++++++ 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 7e1df97a9f9fd..c763eabe8abba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -210,13 +210,13 @@ class HadoopRDD[K, V]( } else { allInputSplits } - if (inputSplits.length == 1) { + if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (Utils.isFileSplittable(path, codecFactory) && fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + - s"rdd partition have to deal with the whole file and consume large time.") + logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + + s"partition, because the file is compressed by unsplittable compression codec.") } } val array = new Array[Partition](inputSplits.size) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index d2e6731eb533a..c8a9a7fd23c2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -46,6 +46,10 @@ abstract class FileScan( false } + def getFileUnSplittableReason(path: Path): String = { + "Unknown" + } + override def description(): String = { val locationDesc = fileIndex.getClass.getSimpleName + fileIndex.rootPaths.mkString("[", ", ", "]") @@ -98,8 +102,8 @@ abstract class FileScan( val path = new Path(splitFiles(0).filePath) if (isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"File ${path.toString} is large and unsplittable so the corresponding " + - s"rdd partition have to deal with the whole file and consume large time.") + logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + + s"partition, the reason is: ${getFileUnSplittableReason(path)}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index e85af891e3e11..6271be9de1fda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -38,4 +38,12 @@ abstract class TextBasedFileScan( sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap)) override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) + + override def getFileUnSplittableReason(path: Path): String = { + if (!isSplitable(path)) { + "the file is compressed by unsplittable compression codec" + } else { + null + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 5bc8029b4068a..71cb45842446b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex -import org.apache.spark.sql.execution.datasources.csv.CSVDataSource +import org.apache.spark.sql.execution.datasources.csv.{CSVDataSource, MultiLineCSVDataSource} import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory import org.apache.spark.sql.types.{DataType, StructType} @@ -50,6 +50,18 @@ case class CSVScan( CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (!CSVDataSource(parsedOptions).isSplitable) { + "the csv datasource is set multiLine mode" + } else { + "" + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 201572b4338b6..c4a7f5f8ddc1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -50,6 +50,18 @@ case class JsonScan( JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) } + override def getFileUnSplittableReason(path: Path): String = { + if (!super.isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (!JsonDataSource(parsedOptions).isSplitable) { + "the json datasource is set multiLine mode" + } else { + "" + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { // Check a field requirement for corrupt records here to throw an exception in a driver side ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index 202723db27421..1f9bf78ad7f5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -44,6 +44,18 @@ case class TextScan( super.isSplitable(path) && !textOptions.wholeText } + override def getFileUnSplittableReason(path: Path): String = { + if (!isSplitable(path)) { + super.getFileUnSplittableReason(path) + } else { + if (textOptions.wholeText) { + "the text datasource is set wholetext mode" + } else { + null + } + } + } + override def createReaderFactory(): PartitionReaderFactory = { assert( readDataSchema.length <= 1, From 3da440b68b884d76e0081803f6ee1bcb37c3bd68 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 18 Jul 2019 11:39:33 +0800 Subject: [PATCH 06/13] update --- .../spark/sql/execution/datasources/v2/TextBasedFileScan.scala | 2 +- .../apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala | 2 +- .../spark/sql/execution/datasources/v2/json/JsonScan.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index 6271be9de1fda..dfda330dcf31a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -43,7 +43,7 @@ abstract class TextBasedFileScan( if (!isSplitable(path)) { "the file is compressed by unsplittable compression codec" } else { - null + throw new UnsupportedOperationException("Undefined method getFileUnSplittableReason") } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 71cb45842446b..053f9b60232c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -57,7 +57,7 @@ case class CSVScan( if (!CSVDataSource(parsedOptions).isSplitable) { "the csv datasource is set multiLine mode" } else { - "" + null } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index c4a7f5f8ddc1d..594595caf8112 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -57,7 +57,7 @@ case class JsonScan( if (!JsonDataSource(parsedOptions).isSplitable) { "the json datasource is set multiLine mode" } else { - "" + null } } } From 0c2ce85766f170410c9a97f44dfa6a1d86fad29a Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 18 Jul 2019 16:59:43 +0800 Subject: [PATCH 07/13] change to internal config --- .../scala/org/apache/spark/internal/config/package.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c8173fb78c5e6..548699f42dcec 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1182,10 +1182,11 @@ package object config { private[spark] val IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD = ConfigBuilder("spark.io.file.unsplittable.warning.threshold") - .doc("When spark loading one single large unsplittable file, if file size exceed this " + - "threshold, then log warning.") - .longConf - .createWithDefault(1024 * 1024 * 1024) + .internal() + .doc("When spark loading one single large unsplittable file, if file size exceed this " + + "threshold, then log warning.") + .longConf + .createWithDefault(1024 * 1024 * 1024) private[spark] val EVENT_LOG_COMPRESSION_CODEC = ConfigBuilder("spark.eventLog.compression.codec") From e6cf714137e3d86ae7041136f88403c8745a7cd8 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 23 Jul 2019 10:23:17 +0800 Subject: [PATCH 08/13] update --- .../org/apache/spark/internal/config/package.scala | 8 ++++---- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 12 ++++++++---- .../sql/execution/datasources/v2/FileScan.scala | 10 +++++----- .../execution/datasources/v2/TextBasedFileScan.scala | 2 +- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 548699f42dcec..2f2afc4301cbf 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1180,11 +1180,11 @@ package object config { .intConf .createWithDefault(1) - private[spark] val IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD = - ConfigBuilder("spark.io.file.unsplittable.warning.threshold") + private[spark] val IO_LOAD_LARGE_FILE_WARNING_THRESHOLD = + ConfigBuilder("spark.io.loadLargeFile.warning.threshold") .internal() - .doc("When spark loading one single large unsplittable file, if file size exceed this " + - "threshold, then log warning.") + .doc("When spark loading one single large file, if file size exceed this " + + "threshold, then log warning with possible reasons.") .longConf .createWithDefault(1024 * 1024 * 1024) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index c763eabe8abba..8b175f04e7251 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,10 +213,14 @@ class HadoopRDD[K, V]( if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath - if (Utils.isFileSplittable(path, codecFactory) - && fileSplit.getLength > conf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + - s"partition, because the file is compressed by unsplittable compression codec.") + if (fileSplit.getLength > conf.get(IO_LOAD_LARGE_FILE_WARNING_THRESHOLD)) { + if (Utils.isFileSplittable(path, codecFactory)) { + logWarning("Loading one large file by one partition is slow, we can increase " + + "partition numbers by the `minPartitions` argument in method `sc.textFile`") + } else { + logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + + s"partition, because the file is compressed by unsplittable compression codec.") + } } } val array = new Array[Partition](inputSplits.size) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index c8a9a7fd23c2c..71e85907de3ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -22,7 +22,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD +import org.apache.spark.internal.config.IO_LOAD_LARGE_FILE_WARNING_THRESHOLD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.PartitionedFileUtil @@ -47,7 +47,7 @@ abstract class FileScan( } def getFileUnSplittableReason(path: Path): String = { - "Unknown" + "Undefined" } override def description(): String = { @@ -100,9 +100,9 @@ abstract class FileScan( if (splitFiles.length == 1) { val path = new Path(splitFiles(0).filePath) - if (isSplitable(path) && splitFiles(0).length > - sparkSession.sparkContext.getConf.get(IO_FILE_UNSPLITTABLE_WARNING_THRESHOLD)) { - logWarning(s"Loading one large unsplittable File ${path.toString} with only one " + + if (!isSplitable(path) && splitFiles(0).length > + sparkSession.sparkContext.getConf.get(IO_LOAD_LARGE_FILE_WARNING_THRESHOLD)) { + logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, the reason is: ${getFileUnSplittableReason(path)}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index dfda330dcf31a..6271be9de1fda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -43,7 +43,7 @@ abstract class TextBasedFileScan( if (!isSplitable(path)) { "the file is compressed by unsplittable compression codec" } else { - throw new UnsupportedOperationException("Undefined method getFileUnSplittableReason") + null } } } From feb8dd0c9489c8d9a6b0dd6f4243081510cafda6 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 23 Jul 2019 11:34:10 +0800 Subject: [PATCH 09/13] update --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8b175f04e7251..3e581f29aa7cf 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -215,8 +215,9 @@ class HadoopRDD[K, V]( val path = fileSplit.getPath if (fileSplit.getLength > conf.get(IO_LOAD_LARGE_FILE_WARNING_THRESHOLD)) { if (Utils.isFileSplittable(path, codecFactory)) { - logWarning("Loading one large file by one partition is slow, we can increase " + - "partition numbers by the `minPartitions` argument in method `sc.textFile`") + logWarning(s"Loading one large file ${path.toString} with only one partition, " + + s"we can increase partition numbers by the `minPartitions` argument in method " + + "`sc.textFile`") } else { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, because the file is compressed by unsplittable compression codec.") From 597114f44a4a199a09295488c0fbc3bd12eab375 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 24 Jul 2019 22:42:51 +0800 Subject: [PATCH 10/13] address comments --- .../scala/org/apache/spark/internal/config/package.scala | 4 ++-- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 5 ++--- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2f2afc4301cbf..b55412b3c3ae7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1180,8 +1180,8 @@ package object config { .intConf .createWithDefault(1) - private[spark] val IO_LOAD_LARGE_FILE_WARNING_THRESHOLD = - ConfigBuilder("spark.io.loadLargeFile.warning.threshold") + private[spark] val IO_WARNING_LARGEFILETHRESHOLD = + ConfigBuilder("spark.io.warning.largeFileThreshold") .internal() .doc("When spark loading one single large file, if file size exceed this " + "threshold, then log warning with possible reasons.") diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 3e581f29aa7cf..eea3c697cf219 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -197,8 +197,6 @@ class HadoopRDD[K, V]( newInputFormat } - @transient private lazy val codecFactory = new CompressionCodecFactory(getJobConf()) - override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized @@ -213,7 +211,8 @@ class HadoopRDD[K, V]( if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath - if (fileSplit.getLength > conf.get(IO_LOAD_LARGE_FILE_WARNING_THRESHOLD)) { + if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { + val codecFactory = new CompressionCodecFactory(jobConf) if (Utils.isFileSplittable(path, codecFactory)) { logWarning(s"Loading one large file ${path.toString} with only one partition, " + s"we can increase partition numbers by the `minPartitions` argument in method " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 71e85907de3ba..2d020293a3fab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -22,7 +22,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.IO_LOAD_LARGE_FILE_WARNING_THRESHOLD +import org.apache.spark.internal.config.IO_WARNING_LARGEFILETHRESHOLD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.PartitionedFileUtil @@ -101,7 +101,7 @@ abstract class FileScan( if (splitFiles.length == 1) { val path = new Path(splitFiles(0).filePath) if (!isSplitable(path) && splitFiles(0).length > - sparkSession.sparkContext.getConf.get(IO_LOAD_LARGE_FILE_WARNING_THRESHOLD)) { + sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, the reason is: ${getFileUnSplittableReason(path)}") } From 4ce0d335287249a89457a57f4ff34d7f39690f21 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 29 Jul 2019 20:10:39 +0800 Subject: [PATCH 11/13] update --- .../spark/sql/execution/datasources/v2/FileScan.scala | 10 +++++++--- .../execution/datasources/v2/TextBasedFileScan.scala | 6 +++--- .../sql/execution/datasources/v2/csv/CSVScan.scala | 6 +++--- .../sql/execution/datasources/v2/json/JsonScan.scala | 6 +++--- .../sql/execution/datasources/v2/text/TextScan.scala | 8 ++++---- 5 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 2d020293a3fab..d7bac64c977f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -46,8 +46,12 @@ abstract class FileScan( false } - def getFileUnSplittableReason(path: Path): String = { - "Undefined" + /** + * If a file with `path` is unsplittable, return the unsplittable reason, + * otherwise return `None`. + */ + def getFileUnSplittableReason(path: Path): Option[String] = { + if (!isSplitable(path)) Some("Undefined") else None } override def description(): String = { @@ -103,7 +107,7 @@ abstract class FileScan( if (!isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + - s"partition, the reason is: ${getFileUnSplittableReason(path)}") + s"partition, the reason is: ${getFileUnSplittableReason(path).get}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index 6271be9de1fda..4ad72b7b9fb73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -39,11 +39,11 @@ abstract class TextBasedFileScan( override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) - override def getFileUnSplittableReason(path: Path): String = { + override def getFileUnSplittableReason(path: Path): Option[String] = { if (!isSplitable(path)) { - "the file is compressed by unsplittable compression codec" + Some("the file is compressed by unsplittable compression codec") } else { - null + None } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 053f9b60232c8..77085b9d330d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -50,14 +50,14 @@ case class CSVScan( CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path) } - override def getFileUnSplittableReason(path: Path): String = { + override def getFileUnSplittableReason(path: Path): Option[String] = { if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { if (!CSVDataSource(parsedOptions).isSplitable) { - "the csv datasource is set multiLine mode" + Some("the csv datasource is set multiLine mode") } else { - null + None } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 594595caf8112..6965f345647c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -50,14 +50,14 @@ case class JsonScan( JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) } - override def getFileUnSplittableReason(path: Path): String = { + override def getFileUnSplittableReason(path: Path): Option[String] = { if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { if (!JsonDataSource(parsedOptions).isSplitable) { - "the json datasource is set multiLine mode" + Some("the json datasource is set multiLine mode") } else { - null + None } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index 1f9bf78ad7f5c..2482a473058a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -44,14 +44,14 @@ case class TextScan( super.isSplitable(path) && !textOptions.wholeText } - override def getFileUnSplittableReason(path: Path): String = { - if (!isSplitable(path)) { + override def getFileUnSplittableReason(path: Path): Option[String] = { + if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { if (textOptions.wholeText) { - "the text datasource is set wholetext mode" + Some("the text datasource is set wholetext mode") } else { - null + None } } } From 944294893f5a4d4cf5086539a8cf287d397e9aba Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 30 Jul 2019 20:48:28 +0800 Subject: [PATCH 12/13] address comments --- .../apache/spark/sql/execution/datasources/v2/FileScan.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index d7bac64c977f1..f193876a5d630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -51,7 +51,8 @@ abstract class FileScan( * otherwise return `None`. */ def getFileUnSplittableReason(path: Path): Option[String] = { - if (!isSplitable(path)) Some("Undefined") else None + assert(!isSplitable(path)) + Some("undefined") } override def description(): String = { From 801c6e30d785142dd5d03716b619dc48b87d125a Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 1 Aug 2019 00:14:50 +0800 Subject: [PATCH 13/13] update --- .../spark/sql/execution/datasources/v2/FileScan.scala | 6 +++--- .../sql/execution/datasources/v2/TextBasedFileScan.scala | 9 +++------ .../spark/sql/execution/datasources/v2/csv/CSVScan.scala | 9 +++------ .../sql/execution/datasources/v2/json/JsonScan.scala | 9 +++------ .../sql/execution/datasources/v2/text/TextScan.scala | 9 +++------ 5 files changed, 15 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index f193876a5d630..0438bd0430da1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -50,9 +50,9 @@ abstract class FileScan( * If a file with `path` is unsplittable, return the unsplittable reason, * otherwise return `None`. */ - def getFileUnSplittableReason(path: Path): Option[String] = { + def getFileUnSplittableReason(path: Path): String = { assert(!isSplitable(path)) - Some("undefined") + "undefined" } override def description(): String = { @@ -108,7 +108,7 @@ abstract class FileScan( if (!isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + - s"partition, the reason is: ${getFileUnSplittableReason(path).get}") + s"partition, the reason is: ${getFileUnSplittableReason(path)}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala index 4ad72b7b9fb73..7ddd99a0293b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TextBasedFileScan.scala @@ -39,11 +39,8 @@ abstract class TextBasedFileScan( override def isSplitable(path: Path): Boolean = Utils.isFileSplittable(path, codecFactory) - override def getFileUnSplittableReason(path: Path): Option[String] = { - if (!isSplitable(path)) { - Some("the file is compressed by unsplittable compression codec") - } else { - None - } + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) + "the file is compressed by unsplittable compression codec" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala index 77085b9d330d8..3cbcfca01a9c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVScan.scala @@ -50,15 +50,12 @@ case class CSVScan( CSVDataSource(parsedOptions).isSplitable && super.isSplitable(path) } - override def getFileUnSplittableReason(path: Path): Option[String] = { + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { - if (!CSVDataSource(parsedOptions).isSplitable) { - Some("the csv datasource is set multiLine mode") - } else { - None - } + "the csv datasource is set multiLine mode" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala index 6965f345647c1..5c41bbd931982 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonScan.scala @@ -50,15 +50,12 @@ case class JsonScan( JsonDataSource(parsedOptions).isSplitable && super.isSplitable(path) } - override def getFileUnSplittableReason(path: Path): Option[String] = { + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { - if (!JsonDataSource(parsedOptions).isSplitable) { - Some("the json datasource is set multiLine mode") - } else { - None - } + "the json datasource is set multiLine mode" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala index 2482a473058a5..89b0511442d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala @@ -44,15 +44,12 @@ case class TextScan( super.isSplitable(path) && !textOptions.wholeText } - override def getFileUnSplittableReason(path: Path): Option[String] = { + override def getFileUnSplittableReason(path: Path): String = { + assert(!isSplitable(path)) if (!super.isSplitable(path)) { super.getFileUnSplittableReason(path) } else { - if (textOptions.wholeText) { - Some("the text datasource is set wholetext mode") - } else { - None - } + "the text datasource is set wholetext mode" } }