@@ -22,6 +22,8 @@ import java.util.{Map => JMap}
2222import scala .collection .mutable .HashMap
2323import scala .util .matching .Regex
2424
25+ import org .apache .commons .lang3 .SerializationUtils
26+
2527private object ConfigReader {
2628
2729 private val REF_RE = " \\ $\\ {(?:(\\ w+?):)?(\\ S+?)\\ }" .r
@@ -56,6 +58,17 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
5658 bindEnv(new EnvProvider ())
5759 bindSystem(new SystemProvider ())
5860
61+ protected [spark] val localProperties =
62+ new InheritableThreadLocal [java.util.HashMap [String , String ]] {
63+ override protected def childValue (parent : java.util.HashMap [String , String ]):
64+ java.util.HashMap [String , String ] = {
65+ // Note: make a clone such that changes in the parent properties aren't reflected in
66+ // the those of the children threads, which has confusing semantics (SPARK-10563).
67+ SerializationUtils .clone(parent)
68+ }
69+ override protected def initialValue (): java.util.HashMap [String , String ] =
70+ new java.util.HashMap [String , String ]()
71+ }
5972 /**
6073 * Binds a prefix to a provider. This method is not thread-safe and should be called
6174 * before the instance is used to expand values.
@@ -76,7 +89,9 @@ private[spark] class ConfigReader(conf: ConfigProvider) {
7689 /**
7790 * Reads a configuration key from the default provider, and apply variable substitution.
7891 */
79- def get (key : String ): Option [String ] = conf.get(key).map(substitute)
92+ def get (key : String ): Option [String ] = {
93+ Option (localProperties.get().get(key)).orElse(conf.get(key)).map(substitute)
94+ }
8095
8196 /**
8297 * Perform variable substitution on the given input string.
0 commit comments