Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ sparkR.session <- function(
...) {

sparkConfigMap <- convertNamedListToEnv(sparkConfig)

namedParams <- list(...)
if (length(namedParams) > 0) {
paramMap <- convertNamedListToEnv(namedParams)
Expand All @@ -376,6 +377,12 @@ sparkR.session <- function(
overrideEnvs(sparkConfigMap, paramMap)
}

# NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements
# See SPARK-18817 for more details
if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethinking it, we might not need to add an extra sql conf. We just need to know whether the value of spark.sql.warehouse.dir is from the users or the original default. If it is the default, R can simply change it.

Maybe it is a good to-have feature for users to know whether the SQLConf value is from users or from the default. cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we can, SessionState.conf.settings contains all the user-setted entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - I will make try to use SessionState and see if it can avoid having to create a new option

assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap)
}

deployMode <- ""
if (exists("spark.submit.deployMode", envir = sparkConfigMap)) {
deployMode <- sparkConfigMap[["spark.submit.deployMode"]]
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ test_that("repeatedly starting and stopping SparkSession", {
}
})

test_that("Default warehouse dir should be set to tempdir", {
sparkR.session.stop()
sparkR.session(enableHiveSupport = FALSE)

# Create a temporary table
sql("CREATE TABLE people_warehouse_test")
# spark-warehouse should be written only tempdir() and not current working directory
res <- list.files(path = ".", pattern = ".*spark-warehouse.*",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we test to make sure that no files are created during this process instead of only checking for spark-warehouse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well - given that this PR is only changing the warehouse dir, I think its only fair to test for that. Or in other words, adding such a test would fail now because of derby.log etc. (per our JIRA discussion) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was testing this and it looked like the current directory (.) was SPARK_HOME/R/pkg/inst/tests/testthat and spark-warehouse would have been in SPARK_HOME/R

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - Will check this today. I think if SPARK_HOME is accessible I can just call list.files with that as path

Copy link
Member

@felixcheung felixcheung Dec 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a couple of other test files would have called sparkR.session already (binary_function, binaryFile, broadcast), so I'd propose adding a new test explicitly named to make sure it is called the very first, ie. https://github.com/apache/spark/pull/16330/files#diff-5ff1ba5d1751f3b1cc96a567e9ab25ffR18

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why it needs to run first ? because the default warehouse dir is in tempdir even if spark.session is called before it shouldn't create any warehouse dir in SPARK_HOME/ ?

Copy link
Member

@felixcheung felixcheung Dec 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.
In this case we are specifically looking for spark-warehouse; I guess I was referring to a general check to make sure going forward the list of files before running anything in the package should == list of files after running anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my bigger concern for that is that usually tests are run all at time - i.e. core, sql, hive and then python, R. And there are no guarantees that other module tests won't create files inside SPARK_HOME afaik. So while we can check some basic things with our test, I dont think verifying a global property is always possible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That I agree completely

recursive = TRUE, include.dirs = TRUE)
expect_equal(length(res), 0)
result <- sql("DROP TABLE people_warehouse_test")
sparkR.session.stop()
})

test_that("rdd GC across sparkR.stop", {
sc <- sparkR.sparkContext() # sc should get id 0
rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
def warehousePath: String = {
if (contains(StaticSQLConf.WAREHOUSE_PATH.key)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we are not doing the same check, as shown in another place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch - Added the same check here as well

new Path(getConf(StaticSQLConf.WAREHOUSE_PATH).get).toString
} else {
new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString
}
}

def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)

Expand Down Expand Up @@ -964,11 +970,17 @@ object StaticSQLConf {
}
}

val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
val DEFAULT_WAREHOUSE_PATH = buildConf("spark.sql.default.warehouse.dir")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make it internal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with this part of the code base - What are the consequences of making it internal ? Is it just in terms of what shows up in documentation or does it affect how users can use it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the internal configuration, it will not be printed out. For example, you can try something like

spark.sql("SET -v").show(numRows = 200, truncate = false)

.doc("The default location for managed databases and tables. " +
"Used if spark.sql.warehouse.dir is not set")
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir")
.doc("The location for managed databases and tables.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description is not right. spark.sql.warehouse.dir is still the default location when we create a database/table without providing the location value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good point. I misunderstood the meaning of default there. Will fix this now

.stringConf
.createOptional

val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation")
.internal()
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
hiveWarehouseDir
} else {
} else if (sparkContext.conf.contains(WAREHOUSE_PATH.key) &&
sparkContext.conf.get(WAREHOUSE_PATH).isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indent is not right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indented 4 spaces now

// If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH).get
sparkContext.conf.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
} else {
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the value of spark.sql.default.warehouse.dir.
val sparkDefaultWarehouseDir = sparkContext.conf.get(DEFAULT_WAREHOUSE_PATH)
sparkContext.conf.set("hive.metastore.warehouse.dir", sparkDefaultWarehouseDir)
sparkDefaultWarehouseDir
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
.sessionState.conf.warehousePath.stripSuffix("/"))
}

test("changing default value of warehouse path") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this test case only cover one of four cases. spark.sql.default.warehouse.dir is set and spark.sql.warehouse.dir is not set. We also need to check the other three cases:

  • spark.sql.default.warehouse.dir is not set and spark.sql.warehouse.dir is not set
  • spark.sql.default.warehouse.dir is set and spark.sql.warehouse.dir is set
  • spark.sql.default.warehouse.dir is not set and spark.sql.warehouse.dir is set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added tests for all 4 cases now

try {
val newWarehouseDefault = "spark-warehouse2"
val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString
sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath)
val spark = new SparkSession(sparkContext)
assert(newWarehouseDefaultPath.stripSuffix("/") === spark
.sessionState.conf.warehousePath.stripSuffix("/"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need a check for spark.sharedState.warehousePath because we did the logic changes there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} finally {
sparkContext.conf.remove("spark.sql.default.warehouse.dir")
}
}

test("MAX_CASES_BRANCHES") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down