Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark fetcher is now able to fetch event logs via REST API #225

Merged
merged 11 commits into from
Apr 12, 2017
31 changes: 21 additions & 10 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package com.linkedin.drelephant.spark.fetchers
import scala.async.Async
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.Try
import scala.util.control.NonFatal

import com.linkedin.drelephant.analysis.{AnalyticJob, ElephantFetcher}
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.SparkUtils
import org.apache.hadoop.conf.Configuration
import org.apache.log4j.Logger
Expand Down Expand Up @@ -61,11 +60,18 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
if (eventLogEnabled) Some(new SparkLogClient(hadoopConfiguration, sparkConf)) else None
}

private[fetchers] lazy val useRestForLogs: Boolean = {
fetcherConfigurationData.getParamMap
.getOrDefault("use_rest_for_eventlogs", "false")
.toBoolean
}

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
val appId = analyticJob.getAppId
logger.info(s"Fetching data for ${appId}")
try {
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId), DEFAULT_TIMEOUT)
Await.result(doFetchData(sparkRestClient, sparkLogClient, appId, useRestForLogs),
DEFAULT_TIMEOUT)
} catch {
case NonFatal(e) =>
logger.error(s"Failed fetching data for ${appId}", e)
Expand All @@ -83,17 +89,22 @@ object SparkFetcher {
private def doFetchData(
sparkRestClient: SparkRestClient,
sparkLogClient: Option[SparkLogClient],
appId: String
appId: String,
fetchLogsViaRest: Boolean
)(
implicit ec: ExecutionContext
): Future[SparkApplicationData] = async {
val restDerivedData = await(sparkRestClient.fetchData(appId))
val restDerivedData = await(sparkRestClient.fetchRestData(appId))
val lastAttemptId = restDerivedData.applicationInfo.attempts.maxBy { _.startTime }.attemptId

// Would use .map but await doesn't like that construction.
val logDerivedData = sparkLogClient match {
case Some(sparkLogClient) => Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
case None => None
val logDerivedData = if (fetchLogsViaRest) {
await(sparkRestClient.fetchLogData(appId, lastAttemptId))
} else {
// Would use .map but await doesn't like that construction.
sparkLogClient match {
case Some(sparkLogClient) =>
Some(await(sparkLogClient.fetchData(appId, lastAttemptId)))
case None => None
}
}

SparkApplicationData(appId, restDerivedData, logDerivedData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SparkLogClient(hadoopConfiguration: Configuration, sparkConf: SparkConf) {
val logPath = getLogPath(webhdfsEventLogUri, appId, attemptId, compressionCodecShortName)
logger.info(s"looking for logs at ${logPath}")

val codec = compressionCodecForLogPath(sparkConf, logPath)
val codec = compressionCodecForLogName(sparkConf, logPath.getName)

// Limit scope of async.
async {
Expand Down Expand Up @@ -189,10 +189,10 @@ object SparkLogClient {
new BufferedInputStream(fs.open(logPath))
}

private def compressionCodecForLogPath(conf: SparkConf, logPath: Path): Option[CompressionCodec] = {
private[fetchers] def compressionCodecForLogName(conf: SparkConf, logName: String): Option[CompressionCodec] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logBaseName = logPath.getName.stripSuffix(IN_PROGRESS)
val logBaseName = logName.stripSuffix(IN_PROGRESS)
logBaseName.split("\\.").tail.lastOption.map { codecName =>
compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(conf, codecName))
}
Expand Down
58 changes: 52 additions & 6 deletions app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@

package com.linkedin.drelephant.spark.fetchers

import java.io.BufferedInputStream
import java.net.URI
import java.text.SimpleDateFormat
import java.util.zip.{ZipEntry, ZipInputStream}
import java.util.{Calendar, SimpleTimeZone}

import scala.async.Async
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.linkedin.drelephant.spark.data.SparkRestDerivedData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo, ExecutorSummary, JobData, StageData}
import com.linkedin.drelephant.spark.data.{SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary, JobData, StageData}
import javax.ws.rs.client.{Client, ClientBuilder, WebTarget}
import javax.ws.rs.core.MediaType

import org.apache.hadoop.fs.Path
import org.apache.log4j.Logger
import org.apache.spark.SparkConf

import scala.collection.mutable

/**
* A client for getting data from the Spark monitoring REST API, e.g. <https://spark.apache.org/docs/1.4.1/monitoring.html#rest-api>.
Expand Down Expand Up @@ -67,16 +71,16 @@ class SparkRestClient(sparkConf: SparkConf) {

private val apiTarget: WebTarget = client.target(historyServerUri).path(API_V1_MOUNT_PATH)

def fetchData(appId: String)(implicit ec: ExecutionContext): Future[SparkRestDerivedData] = {
def fetchRestData(appId: String)(implicit ec: ExecutionContext): Future[SparkRestDerivedData] = {
val appTarget = apiTarget.path(s"applications/${appId}")
logger.info(s"calling REST API at ${appTarget.getUri}")

val applicationInfo = getApplicationInfo(appTarget)

// Limit scope of async.
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can move line 80 and 81 back to the asyc block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My motivation was that they are pure and exception free and threrefore shouldn't be part of async { ... }. Should I move them back anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. Then I think you should keep them outside. Just add a comment there that why are they outside of the async block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)
async {
val lastAttemptId = applicationInfo.attempts.maxBy {_.startTime}.attemptId
val attemptTarget = lastAttemptId.map(appTarget.path).getOrElse(appTarget)
val futureJobDatas = async { getJobDatas(attemptTarget) }
val futureStageDatas = async { getStageDatas(attemptTarget) }
val futureExecutorSummaries = async { getExecutorSummaries(attemptTarget) }
Expand All @@ -89,6 +93,34 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}

def fetchLogData(appId: String, attemptId: Option[String])(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a design issue here. First, it is inconsistent that all the other methods that are getting data via rest (getApplicationInfo, getJobDatas etc. ) are defined as private methods. And, fetchLogData is a public method. The other problem is that on line 99 does not consider attemptId while generating appTarget, which is incorrect (please see line 82).

I think a better design would be to introduce an optional member, restEventLogData, in class SparkRestDerivedData and populate that member in the class. Define a private method, getEventLogData, in this class which directly returns Optional[SparkLogDerivedData]. The method getEventLogData can take attemptTarget (defined on line 82) as one of the arguments, in addition to appId, and attemptId. This seems more consistent with rest of the code, and you also don't need to recreate appTarget.

Copy link
Contributor Author

@superbobry superbobry Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very good point. I've considered extending SparkRestDerivedData, but decided against it, because having two places for the same thing is rarely a good idea. However, in this particular case it doesn't look bad, I think.

Summary:

  • SparkRestDerivedData.logDerivedData is now package-private and therefore cannot escape spark.
  • restEventLogData was renamed to getLogData (not getEventLogData, because the rest of the code does not mention events) and stripped down: no more name checking logic and appId/attempId in the signature.

implicit ec: ExecutionContext
): Future[Option[SparkLogDerivedData]] = {
val appTarget = apiTarget.path(s"applications/${appId}")
logger.info(s"calling REST API at ${appTarget.getUri}")

val logPrefix = attemptId.map(id => s"${appId}_$id").getOrElse(appId)
async {
resource.managed { getApplicationLogs(appTarget) }.acquireAndGet { zis =>
var entry: ZipEntry = null
do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment to which explains the logic of this do...while block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

zis.closeEntry()
entry = zis.getNextEntry
} while (!(entry == null || entry.getName.startsWith(logPrefix)))

if (entry == null) {
logger.warn(
s"failed to resolve log starting with $logPrefix for ${appTarget.getUri}")
None
} else {
val codec = SparkLogClient.compressionCodecForLogName(sparkConf, entry.getName)
Some(SparkLogClient.findDerivedData(
codec.map { _.compressedInputStream(zis) }.getOrElse(zis)))
}
}
}
}

private def getApplicationInfo(appTarget: WebTarget): ApplicationInfo = {
try {
get(appTarget, SparkRestObjectMapper.readValue[ApplicationInfo])
Expand All @@ -100,6 +132,20 @@ class SparkRestClient(sparkConf: SparkConf) {
}
}

private def getApplicationLogs(appTarget: WebTarget): ZipInputStream = {
val target = appTarget.path("logs")
try {
val is = target.request(MediaType.APPLICATION_OCTET_STREAM)
.get(classOf[java.io.InputStream])
new ZipInputStream(new BufferedInputStream(is))
} catch {
case NonFatal(e) => {
logger.error(s"error reading logs ${target.getUri}", e)
throw e
}
}
}

private def getJobDatas(attemptTarget: WebTarget): Seq[JobData] = {
val target = attemptTarget.path("jobs")
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ object SparkFetcherTest {
implicit ec: ExecutionContext
): SparkRestClient = {
val sparkRestClient = Mockito.mock(classOf[SparkRestClient])
Mockito.when(sparkRestClient.fetchData(appId)).thenReturn(restDerivedData)
Mockito.when(sparkRestClient.fetchRestData(appId)).thenReturn(restDerivedData)
sparkRestClient
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"${historyServerUri.getHost}:${historyServerUri.getPort}")
val sparkRestClient = new SparkRestClient(sparkConf)

sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
sparkRestClient.fetchRestData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID)
restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME)
restDerivedData.jobDatas should not be(None)
Expand Down Expand Up @@ -101,7 +101,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"${historyServerUri.getHost}:${historyServerUri.getPort}")
val sparkRestClient = new SparkRestClient(sparkConf)

sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
sparkRestClient.fetchRestData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID)
restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME)
restDerivedData.jobDatas should not be(None)
Expand Down Expand Up @@ -135,7 +135,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"http://${historyServerUri.getHost}:${historyServerUri.getPort}")
val sparkRestClient = new SparkRestClient(sparkConf)

sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
sparkRestClient.fetchRestData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID)
restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME)
restDerivedData.jobDatas should not be(None)
Expand Down