Skip to content

Conversation

@yaooqinn
Copy link
Member

@yaooqinn yaooqinn commented Oct 15, 2020

What changes were proposed in this pull request?

case

the case here covers the static and dynamic SQL configs behavior in sharedState and sessionState, and the specially handled config spark.sql.warehouse.dir
the case can be found here - https://github.com/yaooqinn/sugar/blob/master/src/main/scala/com/netease/mammut/spark/training/sql/WarehouseSCBeforeSS.scala

import java.lang.reflect.Field

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object WarehouseSCBeforeSS extends App {

  val wh = "spark.sql.warehouse.dir"
  val td = "spark.sql.globalTempDatabase"
  val custom = "spark.sql.custom"

  val conf = new SparkConf()
    .setMaster("local")
    .setAppName("SPARK-32991")
    .set(wh, "./data1")
    .set(td, "bob")

  val sc = new SparkContext(conf)

  val spark = SparkSession.builder()
    .config(wh, "./data2")
    .config(td, "alice")
    .config(custom, "kyao")
    .getOrCreate()

  val confField: Field = spark.sharedState.getClass.getDeclaredField("conf")
  confField.setAccessible(true)
  private val shared: SparkConf = confField.get(spark.sharedState).asInstanceOf[SparkConf]
  println()
  println(s"=====> SharedState: $wh=${shared.get(wh)}")
  println(s"=====> SharedState: $td=${shared.get(td)}")
  println(s"=====> SharedState: $custom=${shared.get(custom, "")}")

  println(s"=====> SessionState: $wh=${spark.conf.get(wh)}")
  println(s"=====> SessionState: $td=${spark.conf.get(td)}")
  println(s"=====> SessionState: $custom=${spark.conf.get(custom, "")}")

  val spark2 = SparkSession.builder().config(td, "fred").getOrCreate()

  println(s"=====> SessionState 2: $wh=${spark2.conf.get(wh)}")
  println(s"=====> SessionState 2: $td=${spark2.conf.get(td)}")
  println(s"=====> SessionState 2: $custom=${spark2.conf.get(custom, "")}")

  SparkSession.setActiveSession(spark)
  spark.sql("RESET")

  println(s"=====> SessionState RESET: $wh=${spark.conf.get(wh)}")
  println(s"=====> SessionState RESET: $td=${spark.conf.get(td)}")
  println(s"=====> SessionState RESET: $custom=${spark.conf.get(custom, "")}")

  val spark3 = SparkSession.builder().getOrCreate()

  println(s"=====> SessionState 3: $wh=${spark2.conf.get(wh)}")
  println(s"=====> SessionState 3: $td=${spark2.conf.get(td)}")
  println(s"=====> SessionState 3: $custom=${spark2.conf.get(custom, "")}")
}

outputs and analysis

// 1. Make the cloned spark conf in shared state respect the warehouse dir from the 1st SparkSession
//=====> SharedState: spark.sql.warehouse.dir=./data1
// 2. ⏬
//=====> SharedState: spark.sql.globalTempDatabase=alice
//=====> SharedState: spark.sql.custom=kyao
//=====> SessionState: spark.sql.warehouse.dir=./data2
//=====> SessionState: spark.sql.globalTempDatabase=alice
//=====> SessionState: spark.sql.custom=kyao
//=====> SessionState 2: spark.sql.warehouse.dir=./data2
//=====> SessionState 2: spark.sql.globalTempDatabase=alice
//=====> SessionState 2: spark.sql.custom=kyao
// 2'.🔼 OK until here
// 3. Make the below 3 ones respect the cloned spark conf in shared state with issue 1 fixed
//=====> SessionState RESET: spark.sql.warehouse.dir=./data1
//=====> SessionState RESET: spark.sql.globalTempDatabase=bob
//=====> SessionState RESET: spark.sql.custom=
// 4. Then the SparkSessions created after RESET will be corrected.
//=====> SessionState 3: spark.sql.warehouse.dir=./data1
//=====> SessionState 3: spark.sql.globalTempDatabase=bob
//=====> SessionState 3: spark.sql.custom=

In this PR, we gather all valid config to the cloned conf of sharedState during being constructed, well, actually only spark.sql.warehouse.dir is missing. Then we use this conf as defaults for RESET Command.

SparkSession.clearActiveSession/clearDefaultSession will make the shared state invisible and unsharable. They will be internal only soon (confirmed with Wenchen), so cases with them called will not be a problem.

Why are the changes needed?

bugfix for programming API to call RESET while users creating SparkContext first and config SparkSession later.

Does this PR introduce any user-facing change?

yes, before this change when you use programming API and call RESET, all configs will be reset to SparkContext.conf, now they go to SparkSession.sharedState.conf

How was this patch tested?

new tests

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34400/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34400/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129792 has finished for PR 30045 at commit f253fad.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yaooqinn
Copy link
Member Author

cc @cloud-fan @maropu @gatorsmile thanks~

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129815 has finished for PR 30045 at commit a52c86a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34420/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34420/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34431/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34431/

@SparkQA
Copy link

SparkQA commented Oct 15, 2020

Test build #129825 has finished for PR 30045 at commit ccbf0ae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yaooqinn
Copy link
Member Author

cc @hvanhovell too

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34488/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34488/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129883 has finished for PR 30045 at commit 6848b2f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static class Bitmaps
  • public static class BitmapArrays
  • class BlockPushErrorHandler implements ErrorHandler
  • public class MergedBlockMeta
  • public class OneForOneBlockPusher
  • public class FinalizeShuffleMerge extends BlockTransferMessage
  • public class MergeStatuses extends BlockTransferMessage
  • public class PushBlockStream extends BlockTransferMessage

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34506/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34506/

@SparkQA
Copy link

SparkQA commented Oct 16, 2020

Test build #129900 has finished for PR 30045 at commit 91c2e91.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

From a cursory look, I think this is correct. But probably it's best to have @hvanhovell's look.

// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = sparkWarehouse.getOrElse(sparkConf.get(WAREHOUSE_PATH))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sparkWarehouse already has .orElse(sparkConf.getOption(WAREHOUSE_PATH.key)). Do you mean sparkWarehouse.getOrElse(WAREHOUSE_PATH.defaultValueString)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here will finally touch the default value string if never matched before

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change to sparkWarehouse.getOrElse(WAREHOUSE_PATH.defaultValueString) so that it's more explicit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

val metastorePath = Utils.createTempDir()
val tmpDb = "tmp_db"

// The initial configs used to generate SharedState, none of these should affect the global
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong now, as warehouse conf is an exception.

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34757/

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34757/

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Test build #130150 has finished for PR 30045 at commit 0cbb4bc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34773/

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34773/

@SparkQA
Copy link

SparkQA commented Oct 22, 2020

Test build #130166 has finished for PR 30045 at commit d194796.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yaooqinn
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34786/

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34786/

@cloud-fan
Copy link
Contributor

GA passed, merging to master, thanks!

@cloud-fan cloud-fan closed this in e21bb71 Oct 23, 2020
@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Test build #130184 has finished for PR 30045 at commit d194796.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

@cloud-fan @yaooqinn I have a question about the semantics. We currently reset to the initial state of the first session. You can have many different sessions (e.g. in thrift server) with different initial settings, IMO it would be more sane to reset to the initial state of the current session. WDYT?

@HyukjinKwon
Copy link
Member

^^ makes sense to me.

@cloud-fan
Copy link
Contributor

makes sense. @yaooqinn what do you think?

@yaooqinn
Copy link
Member Author

make sense to me too, I will raise a followup later.

@yaooqinn
Copy link
Member Author

yaooqinn commented Dec 6, 2020

I have a question about the semantics. We currently reset to the initial state of the first session. You can have many different sessions (e.g. in thrift server) with different initial settings, IMO it would be more sane to reset to the initial state of the current session. WDYT?

I did some research:

w/o calling clearDefaultSession and clearActiveSession(there was a PR to make these 2 internal but reverted for some reasons), the existing APIs for creating new SparkSession cannot lead users to such a situation.

Let's assume the initial configs we talk about here is those going with the instantiating process of a SparkSession instance, not those being the first-time set

  1. w/ the SparkSession.newSession() API, there are no parameters provided to set initial configs
  2. w the SparkSession.Builder.getOrCreate, we are just referencing the original one only but w/o creating a new SparkSession instance.

So do we have a way to actually create a new session with initial configs when there is an existing active one? The answer is NO.

This situation only happens when these 2 APIs called, but the current approach actually meets our goal here. We do keep the session configs per session in such a use case.

The actual problem here that revealed in the following case is that the GLOBAL SharedState is not being shared after those clear-like APIs being called

bin/spark-shell \
  --conf spark.sql.warehouse.dir=./warehouse \
  --conf spark.sql.globalTempDatabase=mytemp \
  --conf spark.sql.custom=abc
scala> org.apache.spark.sql.SparkSession.clearDefaultSession()

scala> org.apache.spark.sql.SparkSession.clearActiveSession()

scala> val nes = org.apache.spark.sql.SparkSession.builder.config("spark.sql.warehouse.dir", "w2").config("spark.sql.globalTempDatabase", "m2").config("spark.sql.custom", "xyz").getOrCreate
20/12/07 00:59:35 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
nes: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@175f1ff5

scala> nes.conf.get("spark.sql.warehouse.dir")
20/12/07 01:00:06 WARN SharedState: Not allowing to set spark.sql.warehouse.dir or hive.metastore.warehouse.dir in SparkSession's options, it should be set statically for cross-session usages
res2: String = w2

scala> nes.conf.get("spark.sql.globalTempDatabase")
res3: String = m2

scala> nes.conf.get("spark.sql.custom")
res4: String = xyz

scala> nes.sql("reset")
res5: org.apache.spark.sql.DataFrame = []

scala> nes.conf.get("spark.sql.globalTempDatabase")
res6: String = m2

scala> nes.conf.get("spark.sql.custom")
res7: String = xyz

cc @gatorsmile @cloud-fan

@cloud-fan
Copy link
Contributor

@yaooqinn active session is a thread local. What if we create a new session in another thread?

@yaooqinn
Copy link
Member Author

yaooqinn commented Dec 7, 2020

@yaooqinn active session is a thread local. What if we create a new session in another thread?

w/o SparkSession.clearDefaultSession(), we will not create a new one

image

w/ SparkSession.clearDefaultSession(), the shared state is also not able to reach

image

@cloud-fan
Copy link
Contributor

I think you are right about reality. But it seems safer to handle the per-session initial configs as it's a property of the SparkSession. It's kind of a potential bug and we'd better fix it earlier. @yaooqinn what do you think?

@yaooqinn
Copy link
Member Author

yaooqinn commented Dec 7, 2020

I have created #30642 to fix this

cloud-fan pushed a commit that referenced this pull request Dec 16, 2020
…s first

### What changes were proposed in this pull request?

As a follow-up of #30045, we modify the RESET command here to respect the session initial configs per session first then fall back to the `SharedState` conf, which makes each session could maintain a different copy of initial configs for resetting.

### Why are the changes needed?

to make reset command saner.
### Does this PR introduce _any_ user-facing change?

yes, RESET will respect session initials first not always go to the system defaults

### How was this patch tested?

add new tests

Closes #30642 from yaooqinn/SPARK-32991-F.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Dec 16, 2020
…s first

### What changes were proposed in this pull request?

As a follow-up of #30045, we modify the RESET command here to respect the session initial configs per session first then fall back to the `SharedState` conf, which makes each session could maintain a different copy of initial configs for resetting.

### Why are the changes needed?

to make reset command saner.
### Does this PR introduce _any_ user-facing change?

yes, RESET will respect session initials first not always go to the system defaults

### How was this patch tested?

add new tests

Closes #30642 from yaooqinn/SPARK-32991-F.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 205d8e4)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants