Skip to content
60 changes: 58 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2326,7 +2326,7 @@ from this directory.
# Inheriting Hadoop Cluster Configuration

If you plan to read and write from HDFS using Spark, there are two Hadoop configuration files that
should be included on Spark's classpath:
should be included on Spark's class path:
Copy link
Contributor

@tejasapatil tejasapatil Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: everywhere in the documentation classpath is being used so changing just one instance will make the doc inconsistent. Lets keep this as it was.


* `hdfs-site.xml`, which provides default behaviors for the HDFS client.
* `core-site.xml`, which sets the default filesystem name.
Expand All @@ -2335,5 +2335,61 @@ The location of these configuration files varies across Hadoop versions, but
a common location is inside of `/etc/hadoop/conf`. Some tools create
configurations on-the-fly, but offer a mechanisms to download copies of them.

To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/spark-env.sh`
To make these files visible to Spark, set `HADOOP_CONF_DIR` in `$SPARK_HOME/conf/spark-env.sh`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing @liancheng Could you please take a look at the documentation? Anything is missing or inaccurate?

to a location containing the configuration files.

# Custom Hadoop/Hive Configuration

If your Spark applications interacting with Hadoop, Hive, or both, there are probably Hadoop/Hive
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/applications/r/application is/

configuration files in Spark's class path.

Multiple running applications might require different Hadoop/Hive client side configurations.
You can copy and modify `hdfs-site.xml`, `core-site.xml`, `yarn-site.xml`, `hive-site.xml` in
Spark's class path for each application, but it is not very convenient and these
files are best to be shared with common properties to avoid hard-coding certain configurations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"best shared"

You can'd do that anyway on a production Spark on Yarn cluster as if you did., lots of other things would break. How about

In a Spark cluster running on YARN, these configuration files are set cluster-wide, and cannot safely be changed by the application.


The better choice is to use spark hadoop properties in the form of `spark.hadoop.*`.
They can be considered as same as normal spark properties which can be set in `$SPARK_HOME/conf/spark-defalut.conf`

In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
instance, Spark allows you to simply create an empty conf and set spark/spark hadoop properties.

{% highlight scala %}
val conf = new SparkConf().set("spark.hadoop.abc.def","xyz")
val sc = new SparkContext(conf)
{% endhighlight %}

Also, you can modify or add configurations at runtime:
{% highlight bash %}
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
myApp.jar
{% endhighlight %}

## Typical Hadoop/Hive Configurations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious : whats the motive behind having this section ? I feel that we should not get into suggesting these configs external to spark.


<table>
<tr>
<td><code>spark.hadoop.<br />mapreduce.fileoutputcommitter.algorithm.version</code></td>
<td>1</td>
<td>
The file output committer algorithm version, valid algorithm version number: 1 or 2.
Version 2 may have better performance, but version 1 may handle failures better in certain situations,
as per <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4815">MAPREDUCE-4815</a>.
</td>
</tr>

<tr>
<td><code>spark.hadoop.<br />fs.hdfs.impl.disable.cache</code></td>
Copy link
Contributor

@steveloughran steveloughran Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty dangerous one to point people at, especially since it's fixed in future Hadoop versions & backported to some distros —and the cost of creating a new HDFS client on every worker can get very expensive if you have a spark process with many threads, all fielding work from the same user (thread pools, IPC connections, ....)

How about spark.hadoop.parquet.enable.summary-metadata=false ? That's a good one for cloud deployments, & goes well with using commit V2

<td>false</td>
<td>
When true, return a fresh HDFS filesystem instance, bypassing the HDFS cache mechanism.
This is to prevent the DFSClient from using an old cached token to connect to the NameNode,
which might fails long-running Spark applications. see <a href="https://issues.apache.org/jira/browse/HDFS-9276">HDFS-9276</a>.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
private val prompt = "spark-sql"
private val continuedPrompt = "".padTo(prompt.length, ' ')
private var transport: TSocket = _
private final val SPARK_HADOOP_PROP_PREFIX = "spark.hadoop."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question, why the prefix has to be spark.hadoop.?

See the related PR: #2379

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. I see spark.hive in some of my configs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.hadoop. was tribal knowledge and was a sneaky way to stick values into Hadoop Configuration object (which can later also pass on to HiveConf). What does spark.hive. do ? Have never seen such configs and would like to know.

Keeping that aside, are you proposing to drop that prefix at L145 ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking more, I think we should just consider spark.hadoop. in this PR, unless we get the other feedbacks from the community.


installSignalHandler()

Expand Down Expand Up @@ -134,6 +135,16 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Hive 1.2 + not supported in CLI
throw new RuntimeException("Remote operations not supported")
}
// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
val hiveConfFromCmd = sessionState.getOverriddenConfigurations.entrySet().asScala
val newHiveConf = hiveConfFromCmd.map { kv =>
// If the same property is configured by spark.hadoop.xxx, we ignore it and
// obey settings from spark properties
val k = kv.getKey
val v = sys.props.getOrElseUpdate(SPARK_HADOOP_PROP_PREFIX + k, kv.getValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try to summarize the impacts of these changes. The initial call of newTemporaryConfiguration is before we setting sys.props. The subsequent call of newTemporaryConfiguration in newClientForExecution will be used for Hive execution clients. Thus, the changes will affect Hive execution clients.

Could you check all the codes in Spark are using sys.prop? Will this change impact them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we build SparkConf in SparkSQLEnv, we get the conf from system prop because loadDefaults is set to true. That is the way we pass -hiveconf values to sc.hadoopConfiguration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClientForExecution is used ONLY in HiveThriftServer2, where it is used to get a hiveconf. There is no more a execution hive client, IMO this method be removed. This activity happens after SparkSQLEnv.init, so it is OK for spark.hadoop. properties.

I realize that --hiveconf should be added to sys.props as spark.hadoop.xxx before SparkSQLEnv.init

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newClientForExecution is used for us to read/write hive serde tables. This is the major concern I have. Let us add another parameter in newTemporaryConfiguration . When newClientForExecution is calling newTemporaryConfiguration , we should not get the hive conf from sys.prop.

Copy link
Member Author

@yaooqinn yaooqinn Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the whole project that newClientForExecution is only used at HiveThriftServer2.scala#L58, HiveThriftServer2.scala#L86

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, that sounds ok to me.

(k, v)
}

val cli = new SparkSQLCLIDriver
cli.setHiveVariables(oproc.getHiveVariables)
Expand All @@ -157,12 +168,8 @@ private[hive] object SparkSQLCLIDriver extends Logging {
// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)

// Respect the configurations set by --hiveconf from the command line
// (based on Hive's CliDriver).
val it = sessionState.getOverriddenConfigurations.entrySet().iterator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason you move it to line 140?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--hiveconf abc.def will be add to system properties as spark.hadoop.abc.def if is not existed , before SparkSQLEnv.init

while (it.hasNext) {
val kv = it.next()
SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue)
newHiveConf.foreach{ kv =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach{ -> foreach {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

SparkSQLEnv.sqlContext.setConf(kv._1, kv._2)
}

if (sessionState.execString != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"SET conf3;" -> "conftest"
)
}

test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") {
runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath)
}

test("SPARK-21451: Apply spark.hadoop.* configurations") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, this test case still can succeed.

Copy link
Member Author

@yaooqinn yaooqinn Aug 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile Yes, after sc initialized, spark.hadoop.hive.metastore.warehouse.dir will be translated into a hadoop conf hive.metastore.warehouse.dir as an alternative of warehouse dir. This test case couldn't tell whether this pr works. CliSuite may not see these values only if we explicitly set them to SqlConf.

The original code did break another test case anyway.

val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451")
runCliWithin(
1.minute,
Seq(s"--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))(
"set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath)
tmpDir.delete()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,13 @@ private[spark] object HiveUtils extends Logging {
propMap.put(ConfVars.METASTORE_EVENT_LISTENERS.varname, "")
propMap.put(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname, "")

// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we should do so?

Copy link
Member Author

@yaooqinn yaooqinn Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan if we run bin/spark-sql --conf spark.hadoop.hive.exec.strachdir=/some/dir or in spark-default.conf, SessionState.start(cliSessionState) in SparkSQLCliDriver will not use this dir but the default

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have documents saying that spark.hadoop.xxx is supported? or are you proposing a new feature?

Copy link
Member Author

@yaooqinn yaooqinn Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets move this to a util method so that we know this is done in 2 places

Copy link
Member

@gatorsmile gatorsmile Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yaooqinn Please follow what @tejasapatil said and create a util function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

sys.props.foreach { case (key, value) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned above, we should not do this for newClientForExecution.

if (key.startsWith("spark.hadoop.")) {
propMap.put(key.substring("spark.hadoop.".length), value)
}
}

propMap.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,13 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
assert(conf(ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname) === "")
}
}

test("newTemporaryConfiguration respect spark.hadoop.foo=bar in SparkConf") {
sys.props.put("spark.hadoop.foo", "bar")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test says we should respect hadoop conf in SparkConf, but why we handle system properties?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan at the very beginning, the spark-sumit do the same thing that add properties from --conf and spark-default.conf to sys.props.

Seq(true, false) foreach { useInMemoryDerby =>
val hiveConf = HiveUtils.newTemporaryConfiguration(useInMemoryDerby)
assert(!hiveConf.contains("spark.hadoop.foo"))
assert(hiveConf("foo") === "bar")
}
}
}