Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ object SQLConf {
.longConf
.createWithDefault(10L * 1024 * 1024)

val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats")
.doc("If the table statistics are not available from table metadata enable fall back to hdfs" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing period after hdfs

" This is useful in determining if a table is small enough to use auto broadcast joins.")
.booleanConf
.createWithDefault(false)

val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to a larger " +
Expand Down Expand Up @@ -596,6 +603,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.sql.hive

import java.io.IOException

import scala.collection.JavaConverters._

import com.google.common.base.Objects
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.FieldSchema
Expand Down Expand Up @@ -114,17 +117,31 @@ private[hive] case class MetastoreRelation(
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
// if the size is still less than zero, we try to get the file size from HDFS.
// given this is only needed for optimization, if the HDFS call fails we return the default.
if (Option(totalSize).map(_.toLong).getOrElse(0L) > 0) {
Copy link
Contributor

@rxin rxin May 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we write something like this to make it easier to read?

if (totalSize != null && totalSize.toLong > 0L) {
  totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
  rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
  ...
} else {
  sparkSession.sessionState.conf.defaultSizeInBytes
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

totalSize.toLong
} else if (Option(rawDataSize).map(_.toLong).getOrElse(0L) > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
)

Expand Down