Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ object SQLConf {
.longConf
.createWithDefault(10L * 1024 * 1024)

val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
.doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
" This is useful in determining if a table is small enough to use auto broadcast joins.")
.booleanConf
.createWithDefault(false)

val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to a larger " +
Expand Down Expand Up @@ -596,6 +603,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.hive

import java.io.IOException

import scala.collection.JavaConverters._

import com.google.common.base.Objects
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
Expand Down Expand Up @@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
// if the size is still less than zero, we try to get the file size from HDFS.
// given this is only needed for optimization, if the HDFS call fails we return the default.
if (totalSize != null && totalSize.toLong > 0L) {
totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive

import java.io.{File, PrintWriter}

import scala.reflect.ClassTag

import org.apache.spark.sql.{QueryTest, Row}
Expand All @@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils

class StatisticsSuite extends QueryTest with TestHiveSingleton {
class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
import hiveContext.sql

test("parse analyze commands") {
Expand Down Expand Up @@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
classOf[AnalyzeTableCommand])
}

test("MetastoreRelations fallback to HDFS for size estimation") {
val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled
try {
withTempDir { tempDir =>

// EXTERNAL OpenCSVSerde table pointing to LOCATION

val file1 = new File(tempDir + "/data1")
val writer1 = new PrintWriter(file1)
writer1.write("1,2")
writer1.close()

val file2 = new File(tempDir + "/data2")
val writer2 = new PrintWriter(file2)
writer2.write("1,2")
writer2.close()

sql(
s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to indent this properly. I can fix this when I merge if Jenkins passes.

WITH SERDEPROPERTIES (
\"separatorChar\" = \",\",
\"quoteChar\" = \"\\\"\",
\"escapeChar\" = \"\\\\\")
LOCATION '$tempDir'
""")

hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true)

val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
.asInstanceOf[MetastoreRelation]

val properties = relation.hiveQlTable.getParameters
assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0")
assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0")

val sizeInBytes = relation.statistics.sizeInBytes
assert(sizeInBytes === BigInt(file1.length() + file2.length()))
}
} finally {
hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats)
sql("DROP TABLE csv_table ")
}
}

ignore("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
hiveContext.sessionState.catalog.lookupRelation(
Expand Down