Skip to content

Conversation

@weixiuli
Copy link
Contributor

@weixiuli weixiuli commented Dec 28, 2018

What changes were proposed in this pull request?

As we all know that spark on Yarn uses DB #7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .

The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.

To solve the problem above, a method is proposed and is committed .

How was this patch tested?

new unit tests

@weixiuli
Copy link
Contributor Author

cc @dongjoon-hyun @gatorsmile Kindly review

@gatorsmile
Copy link
Member

Let us cc @sameeragarwal @tejasapatil Do you have any opinion about this new feature?

@tejasapatil
Copy link
Contributor

tejasapatil commented Dec 28, 2018 via email

@sameeragarwal
Copy link
Member

From a pure feature perspective, I think it makes sense to support this for non-yarn modes as well. While Facebook uses a custom scheduler, we rely on this leveldb state to be robust against external shuffle service failures/restarts (for certain deployment types). Although admittedly, I lack full context behind as to why this was only done for Yarn to begin with. cc @dafrista @squito who might've more details there.

@dongjoon-hyun
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jan 1, 2019

Test build #100608 has finished for PR 23393 at commit 8cc02a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was originally done only for yarn just because I wasn't very familiar with other modes -- but it certainly should be possible to do it. There are some corner cases to think about -- the one which comes to mind is, what happens if an application is stopped while the external shuffle service is down? In yarn, we rely on being told the application was stopped even after the NM comes back. I don't think the same is true in standalone mode, the master won't tell the worker after it comes back? So then you'll leave an entry in the DB forever. Maybe this is rare enough and low-impact enough that you'd never expect that list to get large, but at least worth thinking through and documenting.

protected def initRegisteredExecutorsDB(dbName: String): File = {
val localDirs = sparkConf.get("spark.local.dir", "").split(",")
if (localDirs.length >= 1 && !"".equals(localDirs(0))) {
createDirectory(localDirs(0), dbName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to only use localDirs(0) instead of checking all localDirs? I worry that you might get another local dir pre-pended or something during a configuration change + restart

also it seems you're creating a path like "[local-dir]/registeredExecutors/registeredExecutors.ldb", any reason for the extra level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @squito , as you agreed that it certainly should be possible to do it.
the one which comes to mind is, what happens if an application is stopped while the external shuffle service is down? In yarn, we rely on being told the application was stopped even after the NM comes back.
Now , It can leave an entry in the DB forever when some time like above. As you said that Maybe this is rare enough and low-impact enough , but at least worth thinking through and documenting. So I think we can add some core to remove the entry with WorkDirCleanup when set #spark.worker.cleanup.enabled = true in standalone mode. can you have any good idea ?

This commit uses localDirs(0) instead of checking all localDirs to make sure it's a same path to be used by DB and make sure initRegisteredExecutorsDB to work , localDirs(0) is just to
be used for DB instead of additional set.

Creating a path like "[local-dir]/registeredExecutors/registeredExecutors.ldb" is just to make it look clearly .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I think WorkDirCleanup maybe just what we need to ensure things get cleaned up, good idea.

I understand wanting to use a consistent directory, but like I said I'm worried about restarts after configuration changes (maybe not a concern in a standalone mode? does it always require a total restart?) You could do something like what was done in the original patch for yarn, to check all the dirs, but fallback to dir[0] (that code has since changed to take advantage of other yarn features for recovery):

private File findRegisteredExecutorFile(String[] localDirs) {
for (String dir: localDirs) {
File f = new File(dir, "registeredExecutors.ldb");
if (f.exists()) {
return f;
}
}
return new File(localDirs[0], "registeredExecutors.ldb");
}

@weixiuli
Copy link
Contributor Author

cc @squito @gatorsmile @dongjoon-hyun PTAL.

@SparkQA
Copy link

SparkQA commented Jan 24, 2019

Test build #101616 has finished for PR 23393 at commit 76a7564.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jan 24, 2019

Jenkins, retest this please

// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED) && !isAppStillRunning) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent this line, the continuation of the condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,@squito ,i have fixed it,thanks a lot!

@SparkQA
Copy link

SparkQA commented Jan 24, 2019

Test build #101645 has finished for PR 23393 at commit 76a7564.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Jan 25, 2019

the approach looks good to me. I would need to take a closer look at the full flow of everything in Worker before being ready to merge myself -- if nobody else has a chance I'll try to find time to come back to review that part more thoroughly

@weixiuli
Copy link
Contributor Author

ok

@SparkQA
Copy link

SparkQA commented Jan 25, 2019

Test build #101659 has finished for PR 23393 at commit 6c4248d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@weixiuli
Copy link
Contributor Author

weixiuli commented Mar 4, 2019

hi,@squito , how about this patch ,wether it can be merged to master.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the delayed review, I was hoping somebody that knew standalone better would chime in. But I took a closer look and I think this is OK. My comments are just style stuff

// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double indent this line (4 spaces)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry , i don't understand why this is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a continuation of the condition of the if. That helps separate it from the body, which is only indented 2 spaces. eg. like this:

protected def assertNotAnalysisRule(): Unit = {
if (Utils.isTesting &&
AnalysisHelper.inAnalyzer.get > 0 &&
AnalysisHelper.resolveOperatorDepth.get == 0) {
throw new RuntimeException("This method should not be called in the analyzer")


private val shuffleServiceSource = new ExternalShuffleServiceSource

protected def initRegisteredExecutorsDB(dbName: String): File = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming to findRegisteredExecutorsDBFile would be better, nothing is really getting initialized here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,done

}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to your change -- can you add 2 space indentation on line 466 !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS) as its a continuation?

externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
bockHandler = externalShuffleService.getBlockHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo, missing an 'l in blockHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}

/** ForTesting */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the same style for labelling the "for testing" methods as already in these files? I think for all of these java files its @VisibleForTestin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,done

// pass
blockResolver.closeForTest()
// externalShuffleService stop
externalShuffleService.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think those comments add anything.

should the close() / stop() be in the afterAll (or a finally)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// This test getBlockData will be passed when the external shuffle service is restarted.
test("restart External Shuffle Service With InitRegisteredExecutorsDB") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd reword the test name & comment here a bit. Maybe for test-name, "Recover shuffle data with spark.shuffle.service.db.enabled=true after shuffle service restart"

and the comment should say something more like "The beforeAll ensures the shuffle data was already written, and then the shuffle service was stopped. Here we restart the shuffle service and make we can read the shuffle data"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ,thanks your.

assert(error.contains("not registered"))
blockResolver.closeForTest()
// externalShuffleService stop
externalShuffleService.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

// This test getBlockData will't be passed when the external shuffle service is restarted.
test("restart External Shuffle Service Without InitRegisteredExecutorsDB") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: will not? but as above, I'd reword the test name and comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Manages some sort-shuffle data, including the creation
* and cleanup of directories that can be read by the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment appears to be cutoff.

I don't love that this is copied from elsewhere -- can you just add the network-common as a test dependency to core, like this:

spark/core/pom.xml

Lines 364 to 370 in 827d371

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103087 has finished for PR 23393 at commit b246642.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@weixiuli
Copy link
Contributor Author

weixiuli commented Mar 6, 2019

hi,@squito ,thank you very much for your detailed review,PTAL.

@SparkQA
Copy link

SparkQA commented Mar 6, 2019

Test build #103088 has finished for PR 23393 at commit db1b11e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block is not indented correctly

@squito
Copy link
Contributor

squito commented Mar 6, 2019

lgtm, I'll fix the remaining style issues while merging
EDIT: sorry noticed some more things

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, actually I realized there are some other things to fix.

In addition to the comment on ShuffleSuite, I also realized we're not testing the WorkDirCleanup part. It doesnt seem like there is a test in WorkerSuite even for the old WorkDirCleanup part, but can you add something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, sorry one important thing -- why does this extend ShuffleSuite? Its not really changing the behavior that ShuffleSuite is designed for

Copy link
Contributor Author

@weixiuli weixiuli Mar 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I've replaced it with SparkFunSuite.

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some pending review comments but as I see it is quite close to merging I decided to add them now and continue the review tomorrow (if it is not merged yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if with localDirs(0).nonEmpty can be spared by using this line:

val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea,thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation

if () {
...
} else {
...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry,done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation as above.

Also there are few other places with this indentation error but I do not want to spam your PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@weixiuli
Copy link
Contributor Author

weixiuli commented Mar 7, 2019

hi,@squito @attilapiros,i have added WorkDirCleanup test f,PTAL and review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid this sleep? On my machine running the test via IntelliJ this 10 milliseconds was not enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe overkill but moving in the Worker extracting the cleanup functionality (the cleanup Future body) into a separate method visible for the test and calling that function directly instead of sending WorkDirCleanup would solve this issue. Otherwise I would a bit worried about having a flaky test depending on Jenkins workload. What is your opinion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just read @squito's solution for this problem and I like that as it is less intrusive.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding the test, just a couple minor things.

One more thought -- is there any particular reason we need a config "spark.shuffle.service.db.enabled", rather than just leaving it always on? Its always on for yarn. Is it just in case there is some bug we're introducing here, as a way to turn it off? If so, maybe we should document that it may be removed in the future. In general I'd rather avoid adding confs when unnecessary, but given my lack of expertise with standalone it might be best to leave it in.

Which also reminds me of the other thing that needs to be done -- you should add this to the docs here: https://github.com/apache/spark/blob/master/docs/spark-standalone.md#cluster-launch-scripts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can combine these conditions into one if:

if (!executorDir.exists() && !executorDir.mkdirs()) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using sleep() in tests to wait for things isn't great -- it either leads to flakiness if you dont' sleep long enough and the test is occasionally slow, or if you make it super long, then it makes tests slow.

Ideally there would be a condition variable you could wait on, but that's probably not worth it here. Instead using scalatest's eventually works well, eg. like this:

eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
assert(!store.hasLocalBlock("a1-to-remove"))
master.getLocations("a1-to-remove") should have size 0
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to test the old behavior of WorkDirCleanup, you also want to assert that !executorDir.exists(), right? (regardless of value)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename value to something more meaningful, eg. dbCleanupEnabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor -- as I mentioned below, both of these tests should still test dir cleanup, so maybe we can rename a little to something like

WorkDirCleanup cleans app dirs and shuffle metadata when spark.shuffle.service.db.enabled=true

WorkdDirCleanup cleans only app dirs when spark.shuffle.service.db.enabled=false

@SparkQA
Copy link

SparkQA commented Mar 7, 2019

Test build #103137 has finished for PR 23393 at commit c42712c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Mar 7, 2019

looks like it might be a real test failure

@weixiuli
Copy link
Contributor Author

@squito @attilapiros

@weixiuli
Copy link
Contributor Author

Retest this please.

@weixiuli
Copy link
Contributor Author

@squito @attilapiros

val WORKER_CLEANUP_ENABLED = ConfigBuilder("spark.worker.cleanup.enabled")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you changing this default? Honestly I am much less comfortable merging it with the default changed, as I don't have much experience w/ standalone mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As ,this commit depend on WORKER_CLEANUP_ENABLED . While ,we should keep the default value of spark.worker.cleanup.enabled = false . But ,We should make it clear in the docs that spark.worker.cleanup.enabled should be enabled if spark.shuffle.service.db.enabled is "true”, all right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe attila meant to revert your change to the docs, not to change the default

used again when the external shuffle service is restarted. Note that this only affects standalone
mode, its has always on for yarn. We should Enable `spark.worker.cleanup.enabled` to remove the entry
(It will leave an entry in the DB forever when an application is stopped while the external shuffle
service is down) in the leveldb with WorkDirCleanup. It may be removed in the future.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor rewordings:

Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors.  This only affects standalone mode (yarn always has this behavior
enabled).  You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up.  This config may be removed in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clear description, thank you.

@weixiuli
Copy link
Contributor Author

@squito @attilapiros

@attilapiros
Copy link
Contributor

@weixiuli you have to change WORKER_CLEANUP_ENABLED too (back to createWithDefault(false))

@weixiuli
Copy link
Contributor Author

@attilapiros ok ,i have done.

@SparkQA
Copy link

SparkQA commented Mar 15, 2019

Test build #103544 has finished for PR 23393 at commit cb78728.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@weixiuli
Copy link
Contributor Author

Jenkins ,retest this please.

@SparkQA
Copy link

SparkQA commented Mar 16, 2019

Test build #103557 has finished for PR 23393 at commit ed9a842.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2019

Test build #103559 has finished for PR 23393 at commit 1f94b86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgmt except for one really minor thing

@VisibleForTesting
public static File getFileForTest(String[] localDirs, int subDirsPerLocalDir, String filename) {
return getFile(localDirs, subDirsPerLocalDir, filename);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unused now, right? You can undo this change?

Copy link
Contributor Author

@weixiuli weixiuli Mar 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,your are right,i have fixxed it.

@SparkQA
Copy link

SparkQA commented Mar 19, 2019

Test build #103642 has finished for PR 23393 at commit 475d278.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@weixiuli
Copy link
Contributor Author

Jenkins ,retest this please.

@squito
Copy link
Contributor

squito commented Mar 19, 2019

failure was spurious, earlier run passed everything with only an extra unused function, so merging to master. thanks @weixiuli

@asfgit asfgit closed this in 8b0aa59 Mar 19, 2019
cfmcgrady pushed a commit to cfmcgrady/spark that referenced this pull request Aug 1, 2019
As we all know that spark on Yarn uses DB apache#7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .

The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.

To solve the problem above, a method is proposed and is committed .

new  unit tests

Closes apache#23393 from weixiuli/SPARK-26288.

Authored-by: weixiuli <[email protected]>
Signed-off-by: Imran Rashid <[email protected]>
@gatorsmile gatorsmile changed the title [SPARK-26288][CORE]add initRegisteredExecutorsDB [SPARK-26288][CORE] Restore RegisteredExecutors information for External shuffle service in Standalone/Kubernetes backend when the service is restarted Jan 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants