Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark fetcher is now able to fetch event logs via REST API #225

Merged
merged 11 commits into from
Apr 12, 2017

Conversation

superbobry
Copy link
Contributor

@superbobry superbobry commented Mar 21, 2017

This PR allows Dr. Elephant to fetch Spark logs without universal
read access to eventLog.dir on HDFS. SparkFetcher would use SparkRestClient
instead of SparkLogClient if configured as

<params>
  <use_rest_for_eventlogs>true</use_rest_for_eventlogs>
</params>

The default behaviour is to fetch the logs via SparkLogClient/WebHDFS.

Closes #199


What is missing?

  • Documentation
  • Tests

@shkhrgpt
Copy link
Contributor

Thanks @superbobry for this change. I am reviewing it.
@paulbramsen has also been looking this issue. I think he should also review this change.

This commit allows Dr. Elephant to fetch Spark logs without universal
read access to eventLog.dir on HDFS. SparkFetcher would use SparkRestClient
instead of SparkLogClient if configured as

    <params>
      <use_rest_for_eventlogs>true</use_rest_for_eventlogs>
    </params>

The default behaviour is to fetch the logs via SparkLogClient/WebHDFS.

Closes linkedin#199
resource.managed { getApplicationLogs(appTarget) }.acquireAndGet { zis =>
var seen = mutable.Buffer[String]()
var entry: ZipEntry = null
do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment to which explains the logic of this do...while block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val appTarget = apiTarget.path(s"applications/${appId}")
logger.info(s"calling REST API at ${appTarget.getUri}")

val logStem = attemptId.map(id => s"${appId}_$id").getOrElse(appId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please rename logStem to something which can be understood easily?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided to remove it completely, it's not required, because Spark guarantees the archive would only have a single entry.

@superbobry
Copy link
Contributor Author

@shkhrgpt thank you for the review. I will address your comments tomorrow and probably tackle the tests as well.

Do I need to update user docs on the wiki with a description of the new fetcher param?

@@ -89,6 +93,34 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}

def fetchLogData(appId: String, attemptId: Option[String])(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a design issue here. First, it is inconsistent that all the other methods that are getting data via rest (getApplicationInfo, getJobDatas etc. ) are defined as private methods. And, fetchLogData is a public method. The other problem is that on line 99 does not consider attemptId while generating appTarget, which is incorrect (please see line 82).

I think a better design would be to introduce an optional member, restEventLogData, in class SparkRestDerivedData and populate that member in the class. Define a private method, getEventLogData, in this class which directly returns Optional[SparkLogDerivedData]. The method getEventLogData can take attemptTarget (defined on line 82) as one of the arguments, in addition to appId, and attemptId. This seems more consistent with rest of the code, and you also don't need to recreate appTarget.

Copy link
Contributor Author

@superbobry superbobry Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good point. I've considered extending SparkRestDerivedData, but decided against it, because having two places for the same thing is rarely a good idea. However, in this particular case it doesn't look bad, I think.

Summary:

  • SparkRestDerivedData.logDerivedData is now package-private and therefore cannot escape spark.
  • restEventLogData was renamed to getLogData (not getEventLogData, because the rest of the code does not mention events) and stripped down: no more name checking logic and appId/attempId in the signature.

Sergei Lebedev added 2 commits March 22, 2017 18:22
'SparkRestDerivedData' now has a dedicated optional field for storing
'SparkLogDerivedData' fetched via REST.
@shkhrgpt
Copy link
Contributor

@superbobry I think you should first starting adding tests before updating user docs. I think that can be done once this change is in the final stage.

by defaulting 'logDerivedData' to 'None' and 'fetchLogs' to 'false'
val appTarget = apiTarget.path(s"applications/${appId}")
logger.info(s"calling REST API at ${appTarget.getUri}")

val applicationInfo = getApplicationInfo(appTarget)

// Limit scope of async.
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can move line 80 and 81 back to the asyc block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My motivation was that they are pure and exception free and threrefore shouldn't be part of async { ... }. Should I move them back anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Then I think you should keep them outside. Just add a comment there that why are they outside of the async block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@superbobry
Copy link
Contributor Author

A slightly unrelated question: we have two Spark versions deployed: 1.X and 2.X and we would like to aggregate data from both. It looks like this should be possible if we parameterize the SparkFetcher with SparkConf location. What do you think?

@shkhrgpt
Copy link
Contributor

Yeah, we also thought about it.
I believe you have two different Spark History Servers (SHS) for two different version of Spark? As far as I can think, one instance of Dr Elephant can only get data from one SHS (address of SHS comes from the configured SPARK_HOME or SPARK_CONF_DIR). Therefore, I believe you will have to run two Dr Elephant instances. And each instance can have different SPARK_HOME or SPARK_CONF_DIR based on what SHS it's trying to contact. Both instances can write results to sane DB. So I don't you need to make any change in SparkFetcher to support two different versions of Spark.
Did I answer your question in the right direction or you have something else in your mind?

@@ -135,6 +157,31 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}
}

private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a style comment. Maybe you should declare this method before getApplicationLogs because this the caller method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it is indeed better that way.


private def getLogData(attemptTarget: WebTarget): Option[SparkLogDerivedData] = {
val target = attemptTarget.path("logs")
logger.info(s"calling REST API at ${target.getUri}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change this to something the following to make it more specific:

logger.info(s"calling REST API at ${target.getUri} to get eventlogs")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@superbobry
Copy link
Contributor Author

superbobry commented Mar 23, 2017

I'm just curious, why all the client/cluster mode copy-paste in SparkRestClientTest? Why isn't it enough to define the server fixture once?

The current version only tests eventlogs for cluster mode. I can copy/paste the checks into the corresponding test for client mode, if you think that'd be appropriate.

@superbobry superbobry changed the title [WIP] Spark fetcher is now able to fetch event logs via REST API Spark fetcher is now able to fetch event logs via REST API Mar 23, 2017
case assertion: Try[Assertion] => assertion
case _ =>
sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID, fetchLogs = true)
.map { _.logDerivedData should not be(None) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test should also compare the value of logDerivedData against the config data in spark_event_logs/event_log_2 file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that be yet another test for SparkLogClient? because fetchData doesn't do any log parsing, it simply uncompresses the logs, which is already covered by the current test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that SparkLogClientTest is already testing the parsing, but in SparkRestClient, you are using a different logic to get the file. The only way you can test that your logic is correctly reading the file without corrupting the file is by validating the content of the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, done.

@shkhrgpt
Copy link
Contributor

Yeah, I agree that there is a lot of duplicate code in SparkRestClientTest, and it should be fixed. However, I would encourage you to fix that in separate PR, Making lot more changes in one PR can make the review a little difficult.

I do have an idea for client mode. In the client mode, rather than generating ZipOutputStream, directly respond a zip file. To do that, you can just create a zipped file of spark_event_logs/event_log_2 in spark_event_logs directory, and provide the path of the file in LogDataResource.

What do you think?

@shkhrgpt
Copy link
Contributor

@superbobry Btw, the change looks very good now. I just had a few more comments. Thank you very much for addressing my earlier comments.

@superbobry
Copy link
Contributor Author

I do have an idea for client mode. [...] What do you think?

I've decided not to do it, because ZIP-file generation is a method and can be shared between client/server mode tests.

I've also cherry-picked the Guava cleanup commit out of this PR. Will submit in a different one not to clutter the diff.

Copy link
Contributor

@shkhrgpt shkhrgpt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for making this change, and addressing my comments.

@shankar37 @akshayrai Please take a look.

@superbobry
Copy link
Contributor Author

There is one more issue with the current code: if use_rest_for_eventlogs is enabled, then SparkFetcher would always try to fetch the logs, ignoring the value of "spark.eventLog.enabled".

Prior to this commit 'SparkRestClient' would ignore this 'SparkConf'
paremeter ant attempt to fetch the logs anyway.
.exists(_.toBoolean)
private[fetchers] lazy val sparkRestClient: SparkRestClient = new SparkRestClient(sparkConf)

private[fetchers] lazy val sparkLogClient: SparkLogClient = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've deliberately removed Option[_] from the type, because otherwise we allow illegal state: sparkLogClient.isDefined and eventLogSource == None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.
Thank you.

@shkhrgpt
Copy link
Contributor

@superbobry Thanks for including spark.eventLog.enabled into the logic.
@shankar37 @akshayrai Can you guys also please review this change. LGTM.

@shkhrgpt
Copy link
Contributor

@shankar37 and @akshayrai
Can you please review this change. This is very important and useful change.
Thank you.

None
} else {
val codec = SparkLogClient.compressionCodecForLogName(sparkConf, entry.getName)
Some(SparkLogClient.findDerivedData(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about the timeouts and I came to the conclusion that the time to fetch the log does not depend on its size (unless I'm missing something), because findDerivedData looks up the first occurrence of SparkListenerEnvironmentUpdate, which should be one the first entries in the log.

This makes me wonder, what could be causing the timeouts if not the size? @shkhrgpt any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should check spark history server log, and Dr elephant log when the timeout happens. Can you paste the log output here if you see any stack traces or some other interesting things. My suspicion is that spark history server might be going out of memory because of a cache which keeps the SparkUI instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't have access to the production history server logs and I failed to reproduce the issue locally. Your suspicion though sounds convincing.

@superbobry
Copy link
Contributor Author

@shankar37 and @akshayrai is there anything left to do in this PR?

assertFalse(StringUtils.isEmpty(result.getJobDefUrl()));
assertFalse(StringUtils.isEmpty(result.getFlowExecUrl()));
assertFalse(StringUtils.isEmpty(result.getFlowDefUrl()));
// assertFalse(StringUtils.isEmpty(result.getJobExecId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@superbobry, any reason for commenting these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bah, sorry, I missed that. I think I got compilation errors: Cannot resolve method...

Reverted.

@akshayrai
Copy link
Contributor

+1 LGTM except for the one comment

@akshayrai akshayrai merged commit 8e4a094 into linkedin:master Apr 12, 2017
@akshayrai
Copy link
Contributor

Merged! Thanks @superbobry

@superbobry
Copy link
Contributor Author

The only remaining thing is to update the docs, but I think I cannot do it (not being a member of the organisation), can I?

@akshayrai
Copy link
Contributor

Apparently no. Can you share a google doc with us?

@superbobry
Copy link
Contributor Author

What do you think about making the docs part of the repo and publishing them via GitHub Pages? This would make contributions easier.

Also, is there a Wiki page documenting configuration? The only mention of FetcherConf.xml I've found is in "Project Structure".

@akshayrai
Copy link
Contributor

@superbobry, Github Pages looks interesting and I am totally for it. We should definitely move the documentation away from github wiki so that everyone can contribute.

The configurations are part of the respective sections like "Configuring the Heuristics" under Heuristics and "Configuring Schedulers" under Schedulers. I don't think we have a wiki for Configuring the Fetchers.

skakker pushed a commit to skakker/dr-elephant that referenced this pull request Dec 14, 2017
)

This commit allows Dr. Elephant to fetch Spark logs without universal
read access to eventLog.dir on HDFS. SparkFetcher would use SparkRestClient
instead of SparkLogClient if configured as

    <params>
      <use_rest_for_eventlogs>true</use_rest_for_eventlogs>
    </params>

The default behaviour is to fetch the logs via SparkLogClient/WebHDFS.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Why doesn't SparkFetcher use REST client to fetch eventlogs?
4 participants