Skip to content

Commit 4b19d6c

Browse files
committed
Tighten the visibility of various SQLConf methods and renamed setter/getters.
1 parent 1aad911 commit 4b19d6c

File tree

6 files changed

+54
-61
lines changed

6 files changed

+54
-61
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24-
object SQLConf {
24+
private[spark] object SQLConf {
2525
val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
2626
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
2727
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
28-
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
2928
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
30-
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
3129
val CODEGEN_ENABLED = "spark.sql.codegen"
3230
val DIALECT = "spark.sql.dialect"
3331

@@ -66,13 +64,13 @@ trait SQLConf {
6664
* Note that the choice of dialect does not affect things like what tables are available or
6765
* how query execution is performed.
6866
*/
69-
private[spark] def dialect: String = get(DIALECT, "sql")
67+
private[spark] def dialect: String = getConf(DIALECT, "sql")
7068

7169
/** When true tables cached using the in-memory columnar caching will be compressed. */
72-
private[spark] def useCompression: Boolean = get(COMPRESS_CACHED, "false").toBoolean
70+
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
7371

7472
/** Number of partitions to use for shuffle operators. */
75-
private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
73+
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
7674

7775
/**
7876
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
@@ -84,7 +82,7 @@ trait SQLConf {
8482
* Defaults to false as this feature is currently experimental.
8583
*/
8684
private[spark] def codegenEnabled: Boolean =
87-
if (get(CODEGEN_ENABLED, "false") == "true") true else false
85+
if (getConf(CODEGEN_ENABLED, "false") == "true") true else false
8886

8987
/**
9088
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -94,49 +92,47 @@ trait SQLConf {
9492
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
9593
*/
9694
private[spark] def autoBroadcastJoinThreshold: Int =
97-
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
95+
getConf(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
9896

9997
/**
10098
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
10199
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
102100
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
103101
*/
104102
private[spark] def defaultSizeInBytes: Long =
105-
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
103+
getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
106104

107105
/** ********************** SQLConf functionality methods ************ */
108106

109-
def set(props: Properties): Unit = {
107+
/** Set Spark SQL configuration properties. */
108+
def setConf(props: Properties): Unit = {
110109
settings.synchronized {
111110
props.asScala.foreach { case (k, v) => settings.put(k, v) }
112111
}
113112
}
114113

115-
def set(key: String, value: String): Unit = {
114+
/** Set the given Spark SQL configuration property. */
115+
def setConf(key: String, value: String): Unit = {
116116
require(key != null, "key cannot be null")
117117
require(value != null, s"value cannot be null for key: $key")
118118
settings.put(key, value)
119119
}
120120

121-
def get(key: String): String = {
121+
/** Return the value of Spark SQL configuration property for the given key. */
122+
def getConf(key: String): String = {
122123
Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
123124
}
124125

125-
def get(key: String, defaultValue: String): String = {
126+
/**
127+
* Return the value of Spark SQL configuration property for the given key. If the key is not set
128+
* yet, return `defaultValue`.
129+
*/
130+
def getConf(key: String, defaultValue: String): String = {
126131
Option(settings.get(key)).getOrElse(defaultValue)
127132
}
128133

129-
def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray }
130-
131-
def getOption(key: String): Option[String] = Option(settings.get(key))
132-
133-
def contains(key: String): Boolean = settings.containsKey(key)
134-
135-
def toDebugString: String = {
136-
settings.synchronized {
137-
settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
138-
}
139-
}
134+
/** Return all the configuration properties that have been set (i.e. not the default). */
135+
def getAllConfs: Array[(String, String)] = settings.synchronized { settings.asScala.toArray }
140136

141137
private[spark] def clear() {
142138
settings.clear()

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ case class SetCommand(
5353
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
5454
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
5555
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
56-
context.set(SQLConf.SHUFFLE_PARTITIONS, v)
56+
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
5757
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
5858
} else {
59-
context.set(k, v)
59+
context.setConf(k, v)
6060
Array(s"$k=$v")
6161
}
6262

@@ -77,12 +77,12 @@ case class SetCommand(
7777
"system:sun.java.command=shark.SharkServer2")
7878
}
7979
else {
80-
Array(s"$k=${context.getOption(k).getOrElse("<undefined>")}")
80+
Array(s"$k=${context.getConf(k, "<undefined>")}")
8181
}
8282

8383
// Query all key-value pairs that are set in the SQLConf of the context.
8484
case (None, None) =>
85-
context.getAll.map { case (k, v) =>
85+
context.getAllConfs.map { case (k, v) =>
8686
s"$k=$v"
8787
}
8888

sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,50 +29,47 @@ class SQLConfSuite extends QueryTest {
2929

3030
test("programmatic ways of basic setting and getting") {
3131
clear()
32-
assert(getOption(testKey).isEmpty)
33-
assert(getAll.toSet === Set())
32+
assert(getAllConfs.toSet === Set())
3433

35-
set(testKey, testVal)
36-
assert(get(testKey) == testVal)
37-
assert(get(testKey, testVal + "_") == testVal)
38-
assert(getOption(testKey) == Some(testVal))
39-
assert(contains(testKey))
34+
setConf(testKey, testVal)
35+
assert(getConf(testKey) == testVal)
36+
assert(getConf(testKey, testVal + "_") == testVal)
37+
assert(getAllConfs.contains(testKey))
4038

4139
// Tests SQLConf as accessed from a SQLContext is mutable after
4240
// the latter is initialized, unlike SparkConf inside a SparkContext.
43-
assert(TestSQLContext.get(testKey) == testVal)
44-
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
45-
assert(TestSQLContext.getOption(testKey) == Some(testVal))
46-
assert(TestSQLContext.contains(testKey))
41+
assert(TestSQLContext.getConf(testKey) == testVal)
42+
assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
43+
assert(TestSQLContext.getAllConfs.contains(testKey))
4744

4845
clear()
4946
}
5047

5148
test("parse SQL set commands") {
5249
clear()
5350
sql(s"set $testKey=$testVal")
54-
assert(get(testKey, testVal + "_") == testVal)
55-
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
51+
assert(getConf(testKey, testVal + "_") == testVal)
52+
assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
5653

5754
sql("set some.property=20")
58-
assert(get("some.property", "0") == "20")
55+
assert(getConf("some.property", "0") == "20")
5956
sql("set some.property = 40")
60-
assert(get("some.property", "0") == "40")
57+
assert(getConf("some.property", "0") == "40")
6158

6259
val key = "spark.sql.key"
6360
val vs = "val0,val_1,val2.3,my_table"
6461
sql(s"set $key=$vs")
65-
assert(get(key, "0") == vs)
62+
assert(getConf(key, "0") == vs)
6663

6764
sql(s"set $key=")
68-
assert(get(key, "0") == "")
65+
assert(getConf(key, "0") == "")
6966

7067
clear()
7168
}
7269

7370
test("deprecated property") {
7471
clear()
7572
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
76-
assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10")
73+
assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10")
7774
}
7875
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
6060

6161
/** Sets up the system initially or after a RESET command */
6262
protected def configure() {
63-
set("javax.jdo.option.ConnectionURL",
63+
setConf("javax.jdo.option.ConnectionURL",
6464
s"jdbc:derby:;databaseName=$metastorePath;create=true")
65-
set("hive.metastore.warehouse.dir", warehousePath)
65+
setConf("hive.metastore.warehouse.dir", warehousePath)
6666
}
6767

6868
configure() // Must be called before initializing the catalog below.
@@ -76,7 +76,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
7676
self =>
7777

7878
// Change the default SQL dialect to HiveQL
79-
override private[spark] def dialect: String = get(SQLConf.DIALECT, "hiveql")
79+
override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
8080

8181
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
8282
new this.QueryExecution { val logical = plan }
@@ -224,15 +224,15 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
224224
@transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState])
225225
@transient protected[hive] lazy val sessionState = {
226226
val ss = new SessionState(hiveconf)
227-
set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
227+
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
228228
ss
229229
}
230230

231231
sessionState.err = new PrintStream(outputBuffer, true, "UTF-8")
232232
sessionState.out = new PrintStream(outputBuffer, true, "UTF-8")
233233

234-
override def set(key: String, value: String): Unit = {
235-
super.set(key, value)
234+
override def setConf(key: String, value: String): Unit = {
235+
super.setConf(key, value)
236236
runSqlHive(s"SET $key=$value")
237237
}
238238

sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
6565

6666
/** Sets up the system initially or after a RESET command */
6767
protected def configure() {
68-
set("javax.jdo.option.ConnectionURL",
68+
setConf("javax.jdo.option.ConnectionURL",
6969
s"jdbc:derby:;databaseName=$metastorePath;create=true")
70-
set("hive.metastore.warehouse.dir", warehousePath)
70+
setConf("hive.metastore.warehouse.dir", warehousePath)
7171
}
7272

7373
configure() // Must be called before initializing the catalog below.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ class HiveQuerySuite extends HiveComparisonTest {
7575
"SELECT 2 / 1, 1 / 2, 1 / 3, 1 / COUNT(*) FROM src LIMIT 1")
7676

7777
test("Query expressed in SQL") {
78-
set("spark.sql.dialect", "sql")
78+
setConf("spark.sql.dialect", "sql")
7979
assert(sql("SELECT 1").collect() === Array(Seq(1)))
80-
set("spark.sql.dialect", "hiveql")
80+
setConf("spark.sql.dialect", "hiveql")
8181

8282
}
8383

@@ -436,18 +436,18 @@ class HiveQuerySuite extends HiveComparisonTest {
436436
val testVal = "val0,val_1,val2.3,my_table"
437437

438438
sql(s"set $testKey=$testVal")
439-
assert(get(testKey, testVal + "_") == testVal)
439+
assert(getConf(testKey, testVal + "_") == testVal)
440440

441441
sql("set some.property=20")
442-
assert(get("some.property", "0") == "20")
442+
assert(getConf("some.property", "0") == "20")
443443
sql("set some.property = 40")
444-
assert(get("some.property", "0") == "40")
444+
assert(getConf("some.property", "0") == "40")
445445

446446
sql(s"set $testKey=$testVal")
447-
assert(get(testKey, "0") == testVal)
447+
assert(getConf(testKey, "0") == testVal)
448448

449449
sql(s"set $testKey=")
450-
assert(get(testKey, "0") == "")
450+
assert(getConf(testKey, "0") == "")
451451
}
452452

453453
test("SET commands semantics for a HiveContext") {

0 commit comments

Comments
 (0)