Skip to content

Conversation

@devaraj-kavali
Copy link

What changes were proposed in this pull request?

hsync has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. hsync uses the force FileChannel.force to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events.

I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api DFSInputStream.getFileLength() which gives the file length including the lastBlockBeingWrittenLength(different from FileStatus.getLen()), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui.

How was this patch tested?

Added new test and verified manually, with the added conf spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data.

FsHistoryProvider to read lastBlockBeingWritten data for logs
@vanzin
Copy link
Contributor

vanzin commented Oct 17, 2018

add to whitelist

@vanzin
Copy link
Contributor

vanzin commented Oct 17, 2018

ok to test

@SparkQA
Copy link

SparkQA commented Oct 17, 2018

Test build #97474 has finished for PR 22752 at commit a3f53c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

if (info.fileSize < entry.getLen()) {
if (info.fileSize < entry.getLen() || checkAbsoluteLength(info, entry)) {
Copy link
Member

@gengliangwang gengliangwang Oct 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a function to get the length of given file:

  1. If the new conf is enabled and the input is DFSInputStream, use getFileLength (or max(getFileLength, entry.getLen()))
  2. otherwise entry.getLen()

The logic can be simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked @ this getFileLength() call to see how well it updates?

FwIW HADOOP-15606 proposes adding a method like this for all streams, though that proposal includes the need for specification and tests. Generally the HDFS team are a bit lax about that spec -> test workflow, which doesn't help downstream code or other implementations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gengliangwang for looking into this. Here it doesn't need to check the checkAbsoluteLength if the FileStatus.getLen() is more than the cached fileSize, if we update to max(getFileLength, entry.getLen())) it checks the absoluteLength always which may not be necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @steveloughran for looking into this.

Have you looked @ this getFileLength() call to see how well it updates?

I looked at the DFSInputStream.getFileLength() api, it gives locatedBlocks.getFileLength() + lastBlockBeingWrittenLength, here locatedBlocks.getFileLength() is the value got from NameNode for all the completed blocks and lastBlockBeingWrittenLength is the lastblock lenth from DataNode which is not the completed block.

FwIW HADOOP-15606 proposes adding a method like this for all streams

Thanks for the pointer, once this is available we can update to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...there's no timetable for that getLength thing, but if HDFS already supports the API, I'm more motivated to implement it. It has benefits in cloud stores in general

  1. saves apps going an up front HEAD/getFileStatus() to know how long their data is; the GET should return it.
  2. for S3 Select, you get back the filtered data so don't know how much you will see until the GET is issued


val IN_PROGRESS_ABSOLUTE_LENGTH_CHECK =
ConfigBuilder("spark.history.fs.inProgressAbsoluteLengthCheck.enabled")
.doc("Enable to check the absolute length of the in-progress" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain a little bit in details? So that general user can know what the benefit is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have anything in mind to make it better? thanks

}
}

private[history] def checkAbsoluteLength(info: LogInfo, entry: FileStatus): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the method and the return value are a little cryptic. What does it mean to check?

Might be better to calls it something like shouldReloadLog or something. You could also move the existing check into this function and make the call site simpler.

try {
result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
in.getWrappedStream match {
case dfsIn: DFSInputStream => dfsIn.getFileLength > info.fileSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the discussion about the getFileLength API, If this is not a DFSInpytStream you'll get a MatchError. Better to handle the default case explicitly.

.createWithDefaultString("1m")

val IN_PROGRESS_ABSOLUTE_LENGTH_CHECK =
ConfigBuilder("spark.history.fs.inProgressAbsoluteLengthCheck.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any disadvantage in just leaving this always on? Otherwise this doesn't need to be configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new check adds overhead to the checkForLogs thread, made it disabled by default since most of the users may not want to see the history ui for the running applications, they can enable it explicitly if they want to see progress apps in history ui. I can remove this config if you think not much useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much overhead are we talking about?

That thread is not really in any performance critical path, and in general people won't have so many running apps that this should become a problem... but that kinda depends on how bad this new call is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call makes two invocations, one for getting blocks info from the NameNode and another for getting the last block info from DataNode. I agree this is not a performance critical path, I will remove the config in the update.

@SparkQA
Copy link

SparkQA commented Oct 19, 2018

Test build #97554 has finished for PR 22752 at commit c2f2705.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

@vanzin can you check the updated changes, thanks

@vanzin
Copy link
Contributor

vanzin commented Oct 25, 2018

Merging to master / 2.4 (will run a couple of tests on 2.4 before merging there).

@asfgit asfgit closed this in 46d2d2c Oct 25, 2018
asfgit pushed a commit that referenced this pull request Oct 25, 2018
…istoryProvider to read lastBlockBeingWritten data for logs

## What changes were proposed in this pull request?

`hsync` has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. `hsync` uses the force `FileChannel.force` to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events.

I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api `DFSInputStream.getFileLength()` which gives the file length including the `lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui.

## How was this patch tested?

Added new test and verified manually, with the added conf `spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data.

Closes #22752 from devaraj-kavali/SPARK-24787.

Authored-by: Devaraj K <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
(cherry picked from commit 46d2d2c)
Signed-off-by: Marcelo Vanzin <[email protected]>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…istoryProvider to read lastBlockBeingWritten data for logs

## What changes were proposed in this pull request?

`hsync` has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. `hsync` uses the force `FileChannel.force` to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events.

I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api `DFSInputStream.getFileLength()` which gives the file length including the `lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui.

## How was this patch tested?

Added new test and verified manually, with the added conf `spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data.

Closes apache#22752 from devaraj-kavali/SPARK-24787.

Authored-by: Devaraj K <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants