-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20644][core] Initial ground work for kvstore UI backend. #19582
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There are two somewhat unrelated things going on in this patch, but both are meant to make integration of individual UI pages later on much easier. The first part is some tweaking of the code in the listener so that it does less updates of the kvstore for data that changes fast; for example, it avoids writing changes down to the store for every task-related event, since those can arrive very quickly at times. Instead, for these kinds of events, it chooses to only flush things if a certain interval has passed. The interval is based on how often the current spark-shell code updates the progress bar for jobs, so that users can get reasonably accurate data. The code also delays as much as possible hitting the underlying kvstore when replaying apps in the history server. This is to avoid unnecessary writes to disk. The second set of changes prepare the history server and SparkUI for integrating with the kvstore. A new class, AppStatusStore, is used for translating between the stored data and the types used in the UI / API. The SHS now populates a kvstore with data loaded from event logs when an application UI is requested. Because this store can hold references to disk-based resources, the code was modified to retrieve data from the store under a read lock. This allows the SHS to detect when the store is still being used, and only update it (e.g. because an updated event log was detected) when there is no other thread using the store. This changed ended up creating a lot of churn in the ApplicationCache code, which was cleaned up a lot in the process. I also removed some metrics which don't make too much sense with the new code. Tested with existing and added unit tests, and by making sure the SHS still works on a real cluster.
|
For context:
A special note about this PR: this marks a sort of "point of no return" for the UI. Once this is in, the UI will be in a weird franken-state until vanzin#51 / SPARK-20653 is committed. Until then, there will be duplicate listeners collecting data, which can slow things down a bit in the event bus, and also increase memory usage. |
|
Test build #83096 has finished for PR 19582 at commit
|
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry took me a while to get to this.
| /** Update a live entity only if it hasn't been updated in the last configured period. */ | ||
| private def maybeUpdate(entity: LiveEntity): Unit = { | ||
| if (liveUpdatePeriodNs >= 0) { | ||
| val now = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
System.nanoTime() can be somewhat expensive, right? there are a few places you are calling this repeatedly, might as well call nanoTime() once outside of this method and pass it in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't notice that method ever showing up when profiling; my guess is it's just a read from some CPU register (TSC?) and so reasonably cheap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can vary a lot with OS (and maybe the hardware?) Unfortunately when searching now, most of the references are really dated, I have no idea what info is obsolete. But there is enough evidence it seems prudent to avoid calling it a lot in case its slow in some situations.
|
|
||
| def write(store: KVStore): Unit = { | ||
| store.write(doUpdate()) | ||
| lastWriteTime = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could also pass the nanoTime down into this, to avoid calling it again
squito
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
One suggestion on comments, one teeny nit.
| import org.apache.spark.util.kvstore.KVIndex | ||
|
|
||
| private[spark] case class AppStatusStoreMetadata( | ||
| val version: Long) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: val is unnecessary
| // Invalidate the existing UI for the reloaded app attempt, if any. Note that this does | ||
| // not remove the UI from the active list; that has to be done in onUIDetached, so that | ||
| // cleanup of files can be done in a thread-safe manner. It does mean the UI will remain | ||
| // in memory for longer than it should. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it took me some time to figure how the cache & invalidation worked, mostly because I wasn't looking in the right places. I don't think you've made this any more confusing than it was before (in fact its probably better), but seems like a good opportunity to improve commenting a little. I think it might help to have one comment in the code where the entire sequence is described ( here on mergeApplicationListing, or on AppCache, or ApplicationCacheCheckFilter, doesn't really matter, but they could all reference the longer comment). if I understand correctly, it would be something like:
Logs of incomplete apps are regularly polled to see if they have been updated (based on an increase in file size). If they have, the existing data for that app is marked as invalid in LoadedAppUI. However, no memory is freed, no files are cleaned up at this time, nor is a new UI built. On each request for one app's UI, the application cache is checked to see if it has a valid LoadedAppUI in the cache. If there is data in the cache and its valid, then its served. If there is data in the cache but it is invalid, then the UI is rebuilt from the raw event logs. If there is nothing in the cache, then the UI is built from the raw event logs and added to the cache. This may kick another entry out of the cache -- if its for an incomplete app, then any KVStore data written to disk is deleted (as the KVStore for an incomplete app is always regenerated from scratch anyway).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also updated a bunch of other stale comments.
|
retest this please |
|
Test build #83424 has finished for PR 19582 at commit
|
|
merged to master |
There are two somewhat unrelated things going on in this patch, but
both are meant to make integration of individual UI pages later on
much easier.
The first part is some tweaking of the code in the listener so that
it does less updates of the kvstore for data that changes fast; for
example, it avoids writing changes down to the store for every
task-related event, since those can arrive very quickly at times.
Instead, for these kinds of events, it chooses to only flush things
if a certain interval has passed. The interval is based on how often
the current spark-shell code updates the progress bar for jobs, so
that users can get reasonably accurate data.
The code also delays as much as possible hitting the underlying kvstore
when replaying apps in the history server. This is to avoid unnecessary
writes to disk.
The second set of changes prepare the history server and SparkUI for
integrating with the kvstore. A new class, AppStatusStore, is used
for translating between the stored data and the types used in the
UI / API. The SHS now populates a kvstore with data loaded from
event logs when an application UI is requested.
Because this store can hold references to disk-based resources, the
code was modified to retrieve data from the store under a read lock.
This allows the SHS to detect when the store is still being used, and
only update it (e.g. because an updated event log was detected) when
there is no other thread using the store.
This change ended up creating a lot of churn in the ApplicationCache
code, which was cleaned up a lot in the process. I also removed some
metrics which don't make too much sense with the new code.
Tested with existing and added unit tests, and by making sure the SHS
still works on a real cluster.