Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ object SQLConf {
val CONSTRAINT_PROPAGATION_ENABLED = buildConf("spark.sql.constraintPropagation.enabled")
.internal()
.doc("When true, the query optimizer will infer and propagate data constraints in the query " +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive" +
"plan to optimize them. Constraint propagation can sometimes be computationally expensive " +
"for certain kinds of query plans (such as those with a large number of predicates and " +
"aliases) which might negatively impact overall runtime.")
.booleanConf
Expand All @@ -263,6 +263,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val DISK_TO_MEMORY_SIZE_FACTOR = buildConf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this too, FILE_COMRESSION_FACTOR

"spark.sql.sources.compressionFactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge this with the previous line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how about fileCompressionFactor? Since it works for only file-based data sources.

.internal()
.doc("The result of multiplying this factor with the size of data source files is propagated " +
"to serve as the stats to choose the best execution plan. In the case where the " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When estimating the output data size of a table scan, multiple the file size with this factor as the estimated data size, in case the data is compressed in the file and lead to a heavily underestimated result.

"in-disk and in-memory size of data is significantly different, users can adjust this " +
"factor for a better choice of the execution plan. The default value is 1.0.")
.doubleConf
.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe >= 1.0? it's weird to see a compression factor less than 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW fileDataSizeFactor -> compressionFactor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not necessary to be that parquet is always smaller than memory size...e.g. in some simple dataset (like the one used in the test), parquet's overhead makes the overall size larger than in-memory size....

but with TPCDS dataset, I observed that parquet size is much smaller than in-memory size

.createWithDefault(1.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkValues > 0.0


val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
Expand Down Expand Up @@ -1241,6 +1252,8 @@ class SQLConf extends Serializable with Logging {

def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)

def diskToMemorySizeFactor: Double = getConf(DISK_TO_MEMORY_SIZE_FACTOR)

def stringRedationPattern: Option[Regex] = SQL_STRING_REDACTION_PATTERN.readFrom(reader)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ case class HadoopFsRelation(
}
}

override def sizeInBytes: Long = location.sizeInBytes
override def sizeInBytes: Long = {
val sizeFactor = sqlContext.conf.diskToMemorySizeFactor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compressionFactor

(location.sizeInBytes * sizeFactor).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a safe check for overflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before the latest commit, there is a safe check e6065c7#diff-fcb68cd3c7630f337ce9a3b479b6d0c4R88

However, since sizeFactor is a double, any overflow with positive double numbers would be capped as Double.PositiveInfinity, and as @wzhfy indicated, any double number which is larger than Long.MaxValue would return Long.MaxValue in its toLong method

so it should be safe here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good to know it

}


override def inputFiles: Array[String] = location.inputFiles
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.io.{File, FilenameFilter}

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.test.SharedSQLContext

class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
Expand All @@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
}
}

test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
import testImplicits._
Seq(1.0, 0.5).foreach { compressionFactor =>
withSQLConf("spark.sql.sources.compressionFactor" -> compressionFactor.toString,
"spark.sql.autoBroadcastJoinThreshold" -> "400") {
withTempPath { workDir =>
// the file size is 740 bytes
val workDirPath = workDir.getAbsolutePath
val data1 = Seq(100, 200, 300, 400).toDF("count")
data1.write.parquet(workDirPath + "/data1")
val df1FromFile = spark.read.parquet(workDirPath + "/data1")
val data2 = Seq(100, 200, 300, 400).toDF("count")
data2.write.parquet(workDirPath + "/data2")
val df2FromFile = spark.read.parquet(workDirPath + "/data2")
val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
if (compressionFactor == 0.5) {
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.nonEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.isEmpty)
} else {
// compressionFactor is 1.0
val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
case bJoin: BroadcastHashJoinExec => bJoin
}
assert(bJoinExec.isEmpty)
val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
case smJoin: SortMergeJoinExec => smJoin
}
assert(smJoinExec.nonEmpty)
}
}
}
}
}
}