Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -715,12 +715,8 @@ private[spark] object Utils extends Logging {

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))
//YarnLocalDirs must be inside container directory. Since it will be automatically deleted when container shut downs.
val localDirs = Option(System.getProperty("user.dir")).getOrElse(""))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm pretty certain you can't make this change. You're ignoring the setting for YARN's directories and just using the user home directory? why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sean,

What i have understood from http://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/

container directory, from where executor gets launched, created by node manager is inside yarn-local-dirs. So it is automatically fulfilling that criteria.

Please correct me if i am wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but that's not the directory we're looking for in this code. We want the local dirs. You can see comments about where this is coming from in the deleted comments. I don't see how this fixes the problem you reported though. You might have a look at the conversation happening now at #4759 (comment) ; I think shuffle files are kept on purpose in some instances, but, I am not clear if this is one of them.

@vanzin I know I am invoking you a lot today but your thoughts would be good here too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please do not make this change, it's not correct. We do want to use those env variables, which are set by Yarn and configurable (so, for example, users can tell apps to use a fast local disk to store shuffle data instead of whatever disk hosts home directories).

And you do not want the executor's files to disappear when it dies. Because you may be able to reuse shuffle data written by that executor to save the work of re-computing that data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So e.g. in spark run via yarn-client
i can see directory structures like
{yarn.nodemanager.local-dirs}/nm-local-dir/usercache/admin/appcache/application_1424859293845_0003/container_1424859293845_0003_01_000001/ -- this is current working directory since executor was launched from this directory.
and spark is using {yarn.nodemanager.local-dirs}/nm-local-dir/usercache/admin/appcache/application_1424859293845_0003/ -- this directory to write shuffle files which will get
deleted when application shuts down.
And also regarding #4759 (comment) will not work if executor gets killed without letting shutdown hook to trigger i.e.

-pankaj

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is in DiskBlockManager.scala. It's the same code whether you're using the external shuffle service or not. As I said, the external service just tracks location of shuffle files (e.g. "this block id is in file /blah"). That code is in network/shuffle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what i understood is:

  1. ExternalShuffleService is a separate process - since it must be running even when executor dies.
  2. It is per node and serves all executors of that node.
  3. If one executor on node dies then the blocks will be served by another executor of that node.
  4. It does not directly read files and just keep mapping of blockId to filename.

If above is correct how does it serve blocks if all the executors on particular node dies.

Am i wrong somehwere in my understanding?
--pankaj

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand it, the shuffle service can serve the files. But the executor still writes them directly - the write does not go through the shuffle service, and those files are written to the directories set up by createLocalDirs in DiskBlockManager.scala.

There's even a comment alluding to that in the doStop method:

// Only perform cleanup if an external service is not serving our shuffle files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you suggest what could be the correct way to delete those files when executor dies in case of no ExternalShuffleService.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is with shuffle files accumulating, as I suggested before, my understanding is that ContextCleaner would take care of this. Maybe your application is not releasing RDDs for garbage collection, in which case the cleaner wouldn't be able to do much. Or maybe the cleaner has a bug, or wasn't supposed to do that in the first place.

But the point here is that your patch is not correct. It breaks two existing features.


if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
Expand Down