Skip to content

Conversation

@ncounterspecialist
Copy link

Spark uses current application directory to save shuffle files for all Executors. But when Executor gets killed abruptly not allowing DiskBlockManager.scala shutdownhook to get executed. These files remain there till application is up.

This is causing out of disk space error for long/infinitley running applications.
In this fix i used current working directory, which is inside executor's directory, to save shuffle files instead of application's directory. So that Yarn clears those directories when executor gets killed.

-Pankaj

pankaj.arora added 3 commits May 31, 2014 16:41
…instead of Application Directory so that spark-local-files gets deleted when executor exits abruptly.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm pretty certain you can't make this change. You're ignoring the setting for YARN's directories and just using the user home directory? why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sean,

What i have understood from http://hortonworks.com/blog/resource-localization-in-yarn-deep-dive/

container directory, from where executor gets launched, created by node manager is inside yarn-local-dirs. So it is automatically fulfilling that criteria.

Please correct me if i am wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but that's not the directory we're looking for in this code. We want the local dirs. You can see comments about where this is coming from in the deleted comments. I don't see how this fixes the problem you reported though. You might have a look at the conversation happening now at #4759 (comment) ; I think shuffle files are kept on purpose in some instances, but, I am not clear if this is one of them.

@vanzin I know I am invoking you a lot today but your thoughts would be good here too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, please do not make this change, it's not correct. We do want to use those env variables, which are set by Yarn and configurable (so, for example, users can tell apps to use a fast local disk to store shuffle data instead of whatever disk hosts home directories).

And you do not want the executor's files to disappear when it dies. Because you may be able to reuse shuffle data written by that executor to save the work of re-computing that data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So e.g. in spark run via yarn-client
i can see directory structures like
{yarn.nodemanager.local-dirs}/nm-local-dir/usercache/admin/appcache/application_1424859293845_0003/container_1424859293845_0003_01_000001/ -- this is current working directory since executor was launched from this directory.
and spark is using {yarn.nodemanager.local-dirs}/nm-local-dir/usercache/admin/appcache/application_1424859293845_0003/ -- this directory to write shuffle files which will get
deleted when application shuts down.
And also regarding #4759 (comment) will not work if executor gets killed without letting shutdown hook to trigger i.e.

-pankaj

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is in DiskBlockManager.scala. It's the same code whether you're using the external shuffle service or not. As I said, the external service just tracks location of shuffle files (e.g. "this block id is in file /blah"). That code is in network/shuffle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what i understood is:

  1. ExternalShuffleService is a separate process - since it must be running even when executor dies.
  2. It is per node and serves all executors of that node.
  3. If one executor on node dies then the blocks will be served by another executor of that node.
  4. It does not directly read files and just keep mapping of blockId to filename.

If above is correct how does it serve blocks if all the executors on particular node dies.

Am i wrong somehwere in my understanding?
--pankaj

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I understand it, the shuffle service can serve the files. But the executor still writes them directly - the write does not go through the shuffle service, and those files are written to the directories set up by createLocalDirs in DiskBlockManager.scala.

There's even a comment alluding to that in the doStop method:

// Only perform cleanup if an external service is not serving our shuffle files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you suggest what could be the correct way to delete those files when executor dies in case of no ExternalShuffleService.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is with shuffle files accumulating, as I suggested before, my understanding is that ContextCleaner would take care of this. Maybe your application is not releasing RDDs for garbage collection, in which case the cleaner wouldn't be able to do much. Or maybe the cleaner has a bug, or wasn't supposed to do that in the first place.

But the point here is that your patch is not correct. It breaks two existing features.

@vanzin
Copy link
Contributor

vanzin commented Feb 25, 2015

@pankajarora12 Data generated by DiskBlockManager cannot be deleted since it may be used by other executors when using the external shuffle service. You may be able to optimize to delete these things when not using the external service, but that sounds like the wrong approach.

If it's the shuffle data that is accumulating, maybe the right fix is for the block manager to properly clean up shuffle files that are not used anymore. The executor doesn't have enough information for that, as far as I can tell, and the driver would need to tell executors when RDDs are gc'ed so that their shuffle data can be cleaned up. Maybe that's already done, even, but I don't know.

Well, long story short, this change seems to break dynamic allocation and anything that uses the external shuffle service, so it really cannot be pushed.

@vanzin
Copy link
Contributor

vanzin commented Feb 25, 2015

There is also a second thing that is broken by this patch: YARN_LOCAL_DIRS can actually be multiple directories, as the name implies. The BlockManager uses that to distribute shuffle files across multiple disks, to speed things up. After this change, everything will end up on the same disk.

@ncounterspecialist
Copy link
Author

I thought about that case too. Since we will be having many executors on
one node. So yarn will use different local dir for launching each executor
and that will use up other disks too.

On Thu, Feb 26, 2015 at 1:39 AM, Marcelo Vanzin [email protected]
wrote:

There is also a second thing that is broken by this patch: YARN_LOCAL_DIRS
can actually be multiple directories, as the name implies. The BlockManager
uses that to distribute shuffle files across multiple disks, to speed
things up. After this change, everything will end up on the same disk.


Reply to this email directly or view it on GitHub
#4770 (comment).

@ncounterspecialist
Copy link
Author

Also using multiple disks for each executor provides speed but failure of
any of the disk will fail all executors of that node.

on other hand using one disk per executor and having as many disks
available as there are executors will provide fault tolerant against disk
failures.

I think quite debatable.

On Thu, Feb 26, 2015 at 2:30 AM, Pankaj Arora [email protected]
wrote:

I thought about that case too. Since we will be having many executors on
one node. So yarn will use different local dir for launching each executor
and that will use up other disks too.

On Thu, Feb 26, 2015 at 1:39 AM, Marcelo Vanzin [email protected]
wrote:

There is also a second thing that is broken by this patch:
YARN_LOCAL_DIRS can actually be multiple directories, as the name
implies. The BlockManager uses that to distribute shuffle files across
multiple disks, to speed things up. After this change, everything will end
up on the same disk.


Reply to this email directly or view it on GitHub
#4770 (comment).

@vanzin
Copy link
Contributor

vanzin commented Feb 25, 2015

So yarn will use different local dir for launching each executor

But that leaves the case of a single executor being tied to a single local disk after your patch. You're removing that feature. Disk failures are not an issue here, if one can fail more than one can also fail, and the block manager will figure things out.

I don't see how any of this is debatable.

@srowen
Copy link
Member

srowen commented Feb 25, 2015

Yes, do you mind closing this PR? I think the same underlying issue of temp file cleanup is discussed in https://issues.apache.org/jira/browse/SPARK-5836 and I think the discussion could take place there.

@asfgit asfgit closed this in 9168259 Feb 28, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants