Skip to content
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
Expand Down Expand Up @@ -99,17 +100,30 @@ class SparkHadoopUtil extends Logging {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}

def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
conf.getAll.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
}

def appendSparkHadoopConfigs(propMap: HashMap[String, String]): Unit = {
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
sys.props.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
propMap.put(key.substring("spark.hadoop.".length), value)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can shorten it to something like

    for ((key, value) <- conf if key.startsWith("spark.hadoop.")) {
      propMap.put(key.substring("spark.hadoop.".length), value)
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your change is different from what I posted before

    for ((key, value) <- conf if key.startsWith("spark.hadoop.")) {
      propMap.put(key.substring("spark.hadoop.".length), value)
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your solution requires another case _ =>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
Expand Down
34 changes: 33 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2335,5 +2335,37 @@ The location of these configuration files varies across Hadoop versions, but
a common location is inside of `/etc/hadoop/conf`. Some tools create
configurations on-the-fly, but offer a mechanisms to download copies of them.

To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh`
To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing @liancheng Could you please take a look at the documentation? Anything is missing or inaccurate?

to a location containing the configuration files.

# Custom Hadoop/Hive Configuration

If your Spark application is interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive
configuration files in Spark's classpath.

Multiple running applications might require different Hadoop/Hive client side configurations.
You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in
Spark's classpath for each application. In a Spark cluster running on YARN, these configuration
files are set cluster-wide, and cannot safely be changed by the application.

The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`.
They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf`

In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties.

{% highlight scala %}
val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
val sc = new SparkContext(conf)
{% endhighlight %}

Also, you can modify or add configurations at runtime:
{% highlight bash %}
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
myApp.jar
{% endhighlight %}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
private val prompt = "spark-sql"
private val continuedPrompt = "".padTo(prompt.length, ' ')
private var transport: TSocket = _
private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question, why the prefix has to be spark.hadoop.?

See the related PR: #2379

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I see spark.hive in some of my configs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.hadoop. was tribal knowledge and was a sneaky way to stick values into Hadoop Configuration object (which can later also pass on to HiveConf). What does spark.hive. do ? Have never seen such configs and would like to know.

Keeping that aside, are you proposing to drop that prefix at L145 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more, I think we should just consider spark.hadoop. in this PR, unless we get the other feedbacks from the community.


installSignalHandler()

Expand Down Expand Up @@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
}
// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala
val newHiveConf = hiveConfFromCmd.map { kv =>
// If the same property is configured by spark.hadoop.xxx, we ignore it and
// obey settings from spark properties
val k = kv.getKey
val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to summarize the impacts of these changes. The initial call of newTemporaryConfiguration is before we setting sys.props. The subsequent call of newTemporaryConfiguration in newClientForExecution will be used for Hive execution clients. Thus, the changes will affect Hive execution clients.

Could you check all the codes in Spark are using sys.prop? Will this change impact them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we build SparkConf in SparkSQLEnv, we get the conf from system prop because loadDefaults is set to true. That is the way we pass -hiveconf values to sc.hadoopConfiguration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClientForExecution is used ONLY in HiveThriftServer2, where it is used to get a hiveconf. There is no more a execution hive client, IMO this method be removed. This activity happens after SparkSQLEnv.init, so it is OK for spark.hadoop. properties.

I realize that --hiveconf should be added to sys.props as spark.hadoop.xxx before SparkSQLEnv.init

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClientForExecution is used for us to read/write hive serde tables. This is the major concern I have. Let us add another parameter in newTemporaryConfiguration . When newClientForExecution is calling newTemporaryConfiguration , we should not get the hive conf from sys.prop.

Copy link
Member Author

@yaooqinn yaooqinn Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the whole project that newClientForExecution is only used at HiveThriftServer2.scala#L58, HiveThriftServer2.scala#L86

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, that sounds ok to me.

(k, v)
}

val cli = new SparkSQLCLIDriver
cli.setHiveVariables(oproc.getHiveVariables)
Expand All @@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)

// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
val it = sessionState.getOverriddenConfigurations.entrySet().iterator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason you move it to line 140?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--hiveconf abc.def will be add to system properties as spark.hadoop.abc.def if is not existed , before SparkSQLEnv.init

while (it.hasNext) {
val kv = it.next()
SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue)
newHiveConf.foreach { kv =>
SparkSQLEnv.sqlContext.setConf(kv._1, kv._2)
}

if (sessionState.execString != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SET conf3;" -> "conftest"
)
}

test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") {
runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath)
}

test("SPARK-21451: Apply spark.hadoop.* configurations") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, this test case still can succeed.

Copy link
Member Author

@yaooqinn yaooqinn Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Yes, after sc initialized, spark.hadoop.hive.metastore.warehouse.dir will be translated into a hadoop conf hive.metastore.warehouse.dir as an alternative of warehouse dir. This test case couldn't tell whether this pr works. CliSuite may not see these values only if we explicitly set them to SqlConf.

The original code did break another test case anyway.

val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451")
runCliWithin(
1.minute,
Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))(
"set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath)
tmpDir.delete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
import org.apache.hadoop.util.VersionInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand Down Expand Up @@ -404,6 +405,8 @@ private[spark] object HiveUtils extends Logging {
propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "")
propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "")

SparkHadoopUtil.get.appendSparkHadoopConfigs(propMap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are unable to know this is getting the values from sys.props. How about changing the interface to?

// xyz
SparkHadoopUtil.get.appendSparkHadoopConfigs(sys.props.toMap, propMap)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


propMap.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "")
}
}

test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") {
sys.props.put("spark.hadoop.foo", "bar")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test says we should respect hadoop conf in SparkConf, but why we handle system properties?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan at the very beginning, the spark-sumit do the same thing that add properties from --conf and spark-default.conf to sys.props.

Seq(true, false) foreach { useInMemoryDerby =>
val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby)
assert(!hiveConf.contains("spark.hadoop.foo"))
assert(hiveConf("foo") === "bar")
}
}
}