Skip to content

Conversation

@ericvandenbergfb-zz
Copy link

@ericvandenbergfb-zz ericvandenbergfb-zz commented Nov 16, 2017

What changes were proposed in this pull request?

** Updated pull request based on some other refactoring that went into FsHistoryProvider **

The spark history server doesn't clean up certain history files outside the retention window leading to thousands of such files lingering around on our servers. The log checking and clean up logic skipped 0 byte files and expired inprogress or complete history files that weren't properly parseable (not able to extract an app id or otherwise parse...) Note these files most likely appeared to due aborted jobs or earlier spark/file system driver bugs. To mitigate this, FsHistoryProvider.checkForLogs now internally identifies these untracked files and will remove them if they expire outside the cleaner retention window.

This is currently controlled via configuration spark.history.fs.cleaner.aggressive=true to perform more aggressive cleaning.

Fixed logic:

checkForLogs excluded 0-size files so they stuck around forever.
checkForLogs / mergeApplicationListing indefinitely ignored files
that were not parseable/couldn't extract an appID, so they stuck around
forever.
Only apply above logic if spark.history.fs.cleaner.aggressive=true.

Fixed race condition in a test (SPARK-3697: ignore files that cannot be
read.) where the number of mergeApplicationListings could be more than 1
since the FsHistoryProvider would spin up an executor that also calls
checkForLogs in parallel with the test.

Added unit test to cover all cases with aggressive and non-aggressive
clean up logic.

How was this patch tested?

Implemented a unit test that exercises the above cases without and without the aggressive cleaning to ensure correct results in all cases. Note that FsHistoryProvider at one place uses the file system to get the current time and and at other times the local system time, this seems inconsistent/buggy but I did not attempt to fix in this commit. I was forced to change one of the method FsHistoryProvider.getNewLastScanTime() for the test to properly mock the clock.

Also ran a history server and touched some files to verify they were properly removed.

ericvandenberg@localhost /tmp/spark-events % ls -la
total 808K
drwxr-xr-x 8 ericvandenberg 272 Jul 31 18:22 .
drwxrwxrwt 127 root
-rw-r--r-- 1 ericvandenberg 0 Jan 1 2016 local-123.inprogress
-rwxr-x--- 1 ericvandenberg 342K Jan 1 2016 local-1501549952084
-rwxrwx--- 1 ericvandenberg 342K Jan 1 2016 local-1501549952084.inprogress
-rwxrwx--- 1 ericvandenberg 59K Jul 31 18:19 local-1501550073208
-rwxrwx--- 1 ericvandenberg 59K Jul 31 18:21 local-1501550473508.inprogress
-rw-r--r-- 1 ericvandenberg 0 Jan 1 2016 local-234

Observed in history server logs:

17/07/31 18:23:52 INFO FsHistoryProvider: Aggressively cleaned up 4 untracked history files.

ericvandenberg@localhost /tmp/spark-events % ls -la
total 120K
drwxr-xr-x 4 ericvandenberg 136 Jul 31 18:24 .
drwxrwxrwt 127 root 4.3K Jul 31 18:07 ..
-rwxrwx--- 1 ericvandenberg 59K Jul 31 18:19 local-1501550073208
-rwxrwx--- 1 ericvandenberg 59K Jul 31 18:22 local-1501550473508

@ericvandenbergfb-zz
Copy link
Author

This is a continuation of #18791 - the underlying code changed so had to reimplement.

@vanzin
Copy link
Contributor

vanzin commented Nov 16, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Nov 16, 2017

Test build #83949 has finished for PR 19770 at commit 3431d5a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

Please fix the scala style failure, thanks!

@ericvandenbergfb-zz
Copy link
Author

Fixed scalastyle issues.

@ajbozarth
Copy link
Member

This look a bit cleaner this time around. Since this is left off by default it LGTM

@SparkQA
Copy link

SparkQA commented Nov 21, 2017

Test build #84040 has finished for PR 19770 at commit 08a6e92.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
}
toDelete
.map(attempt => new Path(logDir, attempt.logPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { attempt =>

}
toDelete
.map(attempt => new Path(logDir, attempt.logPath))
.foreach(logPath => deleteLogInfo(logPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { logPath => (or .foreach(deleteLogInfo)).

if (AGGRESSIVE_CLEANUP) {
var untracked: Option[KVStoreIterator[LogInfo]] = None
try {
untracked = Some(listing.view(classOf[LogInfo])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be similar to what I have in the pipeline for the new SHS project at vanzin#40. Except my change takes care of other things (like also cleaning up any loaded UI data).

Could you take a look at that PR and see whether there's something it's not covering? I can incorporate any needed changes there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I spent some time reading my own patch and it's covering a slightly different case. My patch covers deleting SHS state when files are deleted, this covers deleting files that the SHS decides are broken. I still think that some code / state can be saved by handling both similarly - still playing with my code, though.

// time it encounters an inprogress file to work around certain file system inconsistencies.
// No history files should clean up in first check and clean pass.
clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, TimeUnit.DAYS))
provider.checkForLogs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add () to method calls (also in other places).

var untracked: Option[KVStoreIterator[LogInfo]] = None
try {
untracked = Some(listing.view(classOf[LogInfo])
.index("lastModifiedTime")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this is based on FileStatus.getModificationTime, couldn't this end up deleting event log files for long-running applications (i.e. those that run for over the configured cleanup period)? That will probably cause weird issues if those applications are actually running.

You refer to this issue in your tests, but the code doesn't seem to be testing that explicit case.


// Create history files
// 1. 0-byte size files inprogress and corrupt complete files
// 2. >0 byte size files inprogress and corrupt complete files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is what I refer to in a previous comment. You probably need another test here where you have an in progress, valid file that is being appended to, but where the time since its last mod time exceeds the cleanup period.

@vanzin
Copy link
Contributor

vanzin commented Dec 5, 2017

@ericvandenbergfb

I incorporated this feature into vanzin#40, could you take a look? I also simplified your test a lot and removed the config option (doesn't seem necessary to me if you already asked for the SHS to clean up logs).

}
}
toDelete
.map(attempt => new Path(logDir, attempt.logPath))
Copy link
Member

@gengliangwang gengliangwang Dec 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can change the input parameter of deleteLogInfo. Make it logPath(as string) instead of file. So that we don't need to write new Path(logDir, attempt.logPath) every time before calling the function.

toDelete.foreach { attempt => deleteLogInfo(attempt.logPath) }

@SparkQA
Copy link

SparkQA commented Dec 12, 2017

Test build #84746 has finished for PR 19770 at commit 9194aab.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 12, 2017

Test build #84791 has finished for PR 19770 at commit 5eedb29.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case s => throw new SparkException(s\"unrecognized class $s in $json\")
  • throw new SubmitRestMissingFieldException(\"Main class 'mainClass' is missing.\")

@ericvandenbergfb-zz
Copy link
Author

@vanzin I took a look at your pr, it looks good overall and covers the same cases from what I can tell. I don't think there's anything additional needed here unless I missed something.

@SparkQA
Copy link

SparkQA commented Dec 13, 2017

Test build #84794 has finished for PR 19770 at commit 534d2f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class MultiShuffleSorter extends ShuffleSorter
  • final class ShuffleExternalSorter extends ShuffleSorter
  • abstract class ShuffleSorter extends MemoryConsumer
  • public class ShuffleSorterFactory

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants