diff --git a/pom.xml b/pom.xml
index fb7750602c42..d5c017f3d75b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -214,6 +214,7 @@
64m512m512m
+ scalastyle-config.xml
@@ -2225,7 +2226,7 @@
false${basedir}/src/main/scala${basedir}/src/test/scala
- scalastyle-config.xml
+ ${scalastyle.path}${basedir}/target/scalastyle-output.xml${project.build.sourceEncoding}${project.reporting.outputEncoding}
diff --git a/yarn/pom.xml b/yarn/pom.xml
index a8c122fd40a1..eb5f902d3bb5 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -189,6 +189,106 @@
+
+
+
+ hadoop-2.6
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-json
+
+
+ com.sun.jersey
+ jersey-server
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/history/main/scala
+
+
+
+
+ add-test-source
+ generate-test-sources
+
+ add-test-source
+
+
+
+ src/history/test/scala
+
+
+
+
+ add-test-resource
+ generate-test-resources
+
+ add-test-resource
+
+
+
+
+ src/history/test/resources
+
+
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+
+ false
+ true
+
+ true
+ false
+
+ ${basedir}/src/main/scala
+ ${basedir}/src/history/main/scala
+
+
+ ${basedir}/src/test/scala
+ ${basedir}/src/history/test/scala
+
+ ${basedir}/src/test/scala
+ ${scalastyle.path}
+ ${basedir}/target/scalastyle-output.xml
+ ${project.build.sourceEncoding}
+ ${project.reporting.outputEncoding}
+
+
+
+
+ check
+
+
+
+
+
+
+
+
+
target/scala-${scala.binary.version}/classestarget/scala-${scala.binary.version}/test-classes
diff --git a/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/ExtendedMetricsSource.scala b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/ExtendedMetricsSource.scala
new file mode 100644
index 000000000000..ea4137f2a95b
--- /dev/null
+++ b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/ExtendedMetricsSource.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import com.codahale.metrics.{Metric, Timer}
+
+import org.apache.spark.metrics.source.Source
+
+/**
+ * An extended metrics source with some operations to build up the registry, and
+ * to time a closure.
+ */
+private[history] trait ExtendedMetricsSource extends Source {
+
+ /**
+ * A map to build up of all metrics to register and include in the string value
+ * @return
+ */
+ def metricsMap: Map[String, Metric]
+
+ protected def init(): Unit = {
+ metricsMap.foreach(elt => metricRegistry.register(elt._1, elt._2))
+ }
+
+ override def toString: String = {
+ def sb = new StringBuilder()
+ metricsMap.foreach(elt => sb.append(s" ${elt._1} = ${elt._2}\n"))
+ sb.toString()
+ }
+
+ /**
+ * Time a closure, returning its output.
+ * @param t timer
+ * @param f function
+ * @tparam T type of return value of the function
+ * @return the result of the function.
+ */
+ def time[T](t: Timer)(f: => T): T = {
+ val timeCtx = t.time()
+ try {
+ f
+ } finally {
+ timeCtx.close()
+ }
+ }
+}
diff --git a/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala
new file mode 100644
index 000000000000..14c02a97a20e
--- /dev/null
+++ b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import org.apache.spark.{Logging, SparkContext, SparkFirehoseListener}
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * Spark listener which queues up all received events to the [[YarnHistoryService]] passed
+ * as a constructor. There's no attempt to filter event types at this point.
+ *
+ * @param sc context
+ * @param service service to forward events to
+ */
+private[spark] class YarnEventListener(sc: SparkContext, service: YarnHistoryService)
+ extends SparkFirehoseListener with Logging {
+
+ /**
+ * queue the event with the service, timestamped to the current time.
+ *
+ * @param event event to queue
+ */
+ override def onEvent(event: SparkListenerEvent): Unit = {
+ service.enqueue(event)
+ }
+
+}
diff --git a/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
new file mode 100644
index 000000000000..0edc4d41a893
--- /dev/null
+++ b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.InterruptedIOException
+import java.net.{ConnectException, URI}
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.codahale.metrics.{Metric, Counter, MetricRegistry, Timer}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerBlockUpdated, SparkListenerEvent, SparkListenerExecutorMetricsUpdate}
+import org.apache.spark.scheduler.cluster.{SchedulerExtensionService, SchedulerExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered YARN Timeline Server.
+ *
+ * Posting algorithm
+ *
+ * 1. The service subscribes to all events coming from the Spark Context.
+ * 1. These events are serialized into JSON objects for publishing to the timeline service through
+ * HTTP(S) posts.
+ * 1. Events are buffered into `pendingEvents` until a batch is aggregated into a
+ * [[TimelineEntity]] for posting.
+ * 1. That aggregation happens when a lifecycle event (application start/stop) takes place,
+ * or the number of pending events in a running application exceeds the limit set in
+ * `spark.hadoop.yarn.timeline.batch.size`.
+ * 1. Posting operations take place in a separate thread from the spark event listener.
+ * 1. If an attempt to post to the timeline server fails, the service sleeps and then
+ * it is re-attempted after the retry period defined by
+ * `spark.hadoop.yarn.timeline.post.retry.interval`.
+ * 1. If the number of events buffered in the history service exceed the limit set in
+ * `spark.hadoop.yarn.timeline.post.limit`, then further events other than application start/stop
+ * are dropped.
+ * 1. When the service is stopped, it will make a best-effort attempt to post all queued events.
+ * the call of [[stop()]] can block up to the duration of
+ * `spark.hadoop.yarn.timeline.shutdown.waittime` for this to take place.
+ * 1. No events are posted until the service receives a [[SparkListenerApplicationStart]] event.
+ *
+ * If the spark context has a metrics registry, then the internal counters of queued entities,
+ * post failures and successes, and the performance of the posting operation are all registered
+ * as metrics.
+ *
+ * The shutdown logic is somewhat convoluted, as the posting thread may be blocked on HTTP IO
+ * when the shutdown process begins. In this situation, the thread continues to be blocked, and
+ * will be interrupted once the wait time has expired. All time consumed during the ongoing
+ * operation will be counted as part of the shutdown time period.
+ */
+private[spark] class YarnHistoryService extends SchedulerExtensionService with Logging {
+
+ import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+ /** Simple state model implemented in an atomic integer. */
+ private val _serviceState = new AtomicInteger(CreatedState)
+
+ /** Get the current state. */
+ def serviceState: Int = {
+ _serviceState.get()
+ }
+
+ /**
+ * Atomic operatin to enter a new state, returning the old one.
+ * There are no checks on state model.
+ * @param state new state
+ * @return previous state
+ */
+ private def enterState(state: Int): Int = {
+ logDebug(s"Entering state $state from $serviceState")
+ _serviceState.getAndSet(state)
+ }
+
+ /** Spark context; valid once started. */
+ private var sparkContext: SparkContext = _
+
+ /** YARN configuration from the spark context. */
+ private var config: YarnConfiguration = _
+
+ /** Application ID. */
+ private[yarn] var applicationId: ApplicationId = _
+
+ /** Attempt ID -this will be null if the service is started in yarn-client mode. */
+ private var attemptId: Option[ApplicationAttemptId] = None
+
+ /** YARN timeline client. */
+ private var _timelineClient: Option[TimelineClient] = None
+
+ /** Registered event listener. */
+ private var listener: Option[YarnEventListener] = None
+
+ /** Application name from the spark start event. */
+ private var applicationName: String = _
+
+ /** Application ID received from a [[SparkListenerApplicationStart]]. */
+ private var sparkApplicationId: Option[String] = None
+
+ /** Optional Attempt ID string from [[SparkListenerApplicationStart]]. */
+ private var sparkApplicationAttemptId: Option[String] = None
+
+ /** User name as derived from `SPARK_USER` env var or [[Utils]]. */
+ private var userName = Utils.getCurrentUserName
+
+ /** Clock for recording time */
+ private val clock = new SystemClock
+
+ /** Start time of the application, as received in the start event. */
+ private var startTime: Long = _
+
+ /** Start time of the application, as received in the end event. */
+ private var endTime: Long = _
+
+ /** Number of events to batch up before posting. */
+ private[yarn] var batchSize = DEFAULT_BATCH_SIZE
+
+ /** Queue of entities to asynchronously post, plus the number of events in each entry. */
+ private val postingQueue = new LinkedBlockingDeque[PostQueueAction]()
+
+ /** Number of events in the post queue. */
+ private val postQueueEventSize = new AtomicLong
+
+ /** Limit on the total number of events permitted. */
+ private var postQueueLimit = DEFAULT_POST_EVENT_LIMIT
+
+ /** List of events which will be pulled into a timeline entity when created. */
+ private var pendingEvents = new mutable.LinkedList[TimelineEvent]()
+
+ /** The received application started event; `None` if no event has been received. */
+ private var applicationStartEvent: Option[SparkListenerApplicationStart] = None
+
+ /** The received application end event; `None` if no event has been received. */
+ private var applicationEndEvent: Option[SparkListenerApplicationEnd] = None
+
+ /** Has a start event been processed? */
+ private val appStartEventProcessed = new AtomicBoolean(false)
+
+ /** Has the application event event been processed? */
+ private val appEndEventProcessed = new AtomicBoolean(false)
+
+ /** Event handler thread. */
+ private var entityPostThread: Option[Thread] = None
+
+ /** Flag to indicate the queue is stopped; events aren't being processed. */
+ private val queueStopped = new AtomicBoolean(true)
+
+ /** Boolean to track when the post thread is active; Set and reset in the thread itself. */
+ private val postThreadActive = new AtomicBoolean(false)
+
+ /** How long to wait in millseconds for shutdown before giving up? */
+ private var shutdownWaitTime = 0L
+
+ /** What is the initial and incrementing interval for POST retries? */
+ private var retryInterval = 0L
+
+ /** Domain ID for entities: may be null. */
+ private var domainId: Option[String] = None
+
+ /** URI to timeline web application -valid after [[start()]]. */
+ private[yarn] var timelineWebappAddress: URI = _
+
+ /** Metric fields. Used in tests as well as metrics infrastructure. */
+ val metrics = new HistoryMetrics
+
+ /**
+ * Create a timeline client and start it. This does not update the
+ * `timelineClient` field, though it does verify that the field
+ * is unset.
+ *
+ * The method is private to the package so that tests can access it, which
+ * some of the mock tests do to override the timeline client creation.
+ * @return the timeline client
+ */
+ private[yarn] def createTimelineClient(): TimelineClient = {
+ require(_timelineClient.isEmpty, "timeline client already set")
+ YarnTimelineUtils.createTimelineClient(sparkContext)
+ }
+
+ /**
+ * Get the timeline client.
+ * @return the client
+ * @throws Exception if the timeline client is not currently running
+ */
+ def timelineClient: TimelineClient = {
+ synchronized { _timelineClient.get }
+ }
+
+ /**
+ * Get the total number of events dropped due to the queue of
+ * outstanding posts being too long.
+ * @return counter of events processed
+ */
+
+ def eventsDropped: Long = metrics.eventsDropped.getCount
+
+ /**
+ * Get the total number of processed events, those handled in the back-end thread without
+ * being rejected.
+ *
+ * @return counter of events processed
+ */
+ def eventsProcessed: Long = metrics.eventsProcessed.getCount
+
+ /**
+ * Get the total number of events queued.
+ *
+ * @return the total event count
+ */
+ def eventsQueued: Long = metrics.eventsQueued.getCount
+
+ /**
+ * Get the current size of the posting queue.
+ *
+ * @return the current queue length
+ */
+ def postingQueueSize: Int = postingQueue.size()
+
+ /**
+ * Query the counter of attempts to post entities to the timeline service.
+ *
+ * @return the current value
+ */
+ def postAttempts: Long = metrics.entityPostAttempts.getCount
+
+ /**
+ * Get the total number of failed post operations.
+ *
+ * @return counter of timeline post operations which failed
+ */
+ def postFailures: Long = metrics.entityPostFailures.getCount
+
+ /**
+ * Query the counter of successful post operations (this is not the same as the
+ * number of events posted).
+ *
+ * @return the number of successful post operations.
+ */
+ def postSuccesses: Long = metrics.entityPostSuccesses.getCount
+
+ /**
+ * Is the asynchronous posting thread active?
+ *
+ * @return true if the post thread has started; false if it has not yet/ever started, or
+ * if it has finished.
+ */
+ def isPostThreadActive: Boolean = postThreadActive.get
+
+ /**
+ * Reset the timeline client. Idempotent.
+ *
+ * 1. Stop the timeline client service if running.
+ * 2. set the `timelineClient` field to `None`
+ */
+ def stopTimelineClient(): Unit = {
+ synchronized {
+ _timelineClient.foreach(_.stop())
+ _timelineClient = None
+ }
+ }
+
+ /**
+ * Create the timeline domain.
+ *
+ * A Timeline Domain is a uniquely identified 'namespace' for accessing parts of the timeline.
+ * Security levels are are managed at the domain level, so one is created if the
+ * spark acls are enabled. Full access is then granted to the current user,
+ * all users in the configuration options `"spark.modify.acls"` and `"spark.admin.acls"`;
+ * read access to those users and those listed in `"spark.ui.view.acls"`
+ *
+ * @return an optional domain string. If `None`, then no domain was created.
+ */
+ private def createTimelineDomain(): Option[String] = {
+ val sparkConf = sparkContext.getConf
+ val aclsOn = sparkConf.getBoolean("spark.ui.acls.enable",
+ sparkConf.getBoolean("spark.acls.enable", false))
+ if (!aclsOn) {
+ logDebug("ACLs are disabled; not creating the timeline domain")
+ return None
+ }
+ val predefDomain = sparkConf.getOption(TIMELINE_DOMAIN)
+ if (predefDomain.isDefined) {
+ logDebug(s"Using predefined domain $predefDomain")
+ return predefDomain
+ }
+ val current = UserGroupInformation.getCurrentUser.getShortUserName
+ val adminAcls = stringToSet(sparkConf.get("spark.admin.acls", ""))
+ val viewAcls = stringToSet(sparkConf.get("spark.ui.view.acls", ""))
+ val modifyAcls = stringToSet(sparkConf.get("spark.modify.acls", ""))
+
+ val readers = (Seq(current) ++ adminAcls ++ modifyAcls ++ viewAcls).mkString(" ")
+ val writers = (Seq(current) ++ adminAcls ++ modifyAcls).mkString(" ")
+ val domain = DOMAIN_ID_PREFIX + applicationId
+ logInfo(s"Creating domain $domain with readers: $readers and writers: $writers")
+
+ // create the timeline domain with the reader and writer permissions
+ val timelineDomain = new TimelineDomain()
+ timelineDomain.setId(domain)
+ timelineDomain.setReaders(readers)
+ timelineDomain.setWriters(writers)
+ try {
+ timelineClient.putDomain(timelineDomain)
+ Some(domain)
+ } catch {
+ case e: Exception =>
+ logError(s"cannot create the domain $domain", e)
+ // fallback to default
+ None
+ }
+ }
+
+ /**
+ * Start the service.
+ *
+ * @param binding binding to the spark application and YARN
+ */
+ override def start(binding: SchedulerExtensionServiceBinding): Unit = {
+ val oldstate = enterState(StartedState)
+ if (oldstate != CreatedState) {
+ // state model violation
+ _serviceState.set(oldstate)
+ throw new IllegalArgumentException(s"Cannot start the service from state $oldstate")
+ }
+ val context = binding.sparkContext
+ val appId = binding.applicationId
+ val attemptId = binding.attemptId
+ require(context != null, "Null context parameter")
+ bindToYarnApplication(appId, attemptId)
+ this.sparkContext = context
+ this.config = new YarnConfiguration(context.hadoopConfiguration)
+ val sparkConf = sparkContext.conf
+
+ // work out the attempt ID from the YARN attempt ID. No attempt, assume "1".
+ val attempt1 = attemptId match {
+ case Some(attempt) => attempt.getAttemptId.toString
+ case None => CLIENT_BACKEND_ATTEMPT_ID
+ }
+ setContextAppAndAttemptInfo(Some(appId.toString), Some(attempt1))
+ batchSize = sparkConf.getInt(BATCH_SIZE, batchSize)
+ postQueueLimit = sparkConf.getInt(POST_EVENT_LIMIT, postQueueLimit)
+ retryInterval = 1000 * sparkConf.getTimeAsSeconds(POST_RETRY_INTERVAL,
+ DEFAULT_POST_RETRY_INTERVAL)
+ shutdownWaitTime = 1000 * sparkConf.getTimeAsSeconds(SHUTDOWN_WAIT_TIME,
+ DEFAULT_SHUTDOWN_WAIT_TIME)
+
+ // the full metrics integration happens if the spark context has a metrics system
+ val metricsSystem = sparkContext.metricsSystem
+ if (metricsSystem != null) {
+ metricsSystem.registerSource(metrics)
+ }
+
+ // set up the timeline service, unless it's been disabled for testing
+ if (timelineServiceEnabled) {
+ timelineWebappAddress = getTimelineEndpoint(config)
+
+ logInfo(s"Starting $this")
+ logInfo(s"Spark events will be published to the Timeline at $timelineWebappAddress")
+ _timelineClient = Some(createTimelineClient())
+ domainId = createTimelineDomain()
+ // declare that the processing is started
+ queueStopped.set(false)
+ val thread = new Thread(new EntityPoster(), "EventPoster")
+ entityPostThread = Some(thread)
+ thread.setDaemon(true)
+ thread.start()
+ } else {
+ logInfo("Timeline service is disabled")
+ }
+ if (registerListener()) {
+ logInfo(s"History Service listening for events: $this")
+ } else {
+ logInfo(s"History Service is not listening for events: $this")
+ }
+ }
+
+ /**
+ * Check the service configuration to see if the timeline service is enabled.
+ *
+ * @return true if `YarnConfiguration.TIMELINE_SERVICE_ENABLED` is set.
+ */
+ def timelineServiceEnabled: Boolean = {
+ YarnTimelineUtils.timelineServiceEnabled(config)
+ }
+
+ /**
+ * Return a summary of the service state to help diagnose problems
+ * during test runs, possibly even production.
+ *
+ * @return a summary of the current service state
+ */
+ override def toString(): String =
+ s"""YarnHistoryService for application $applicationId attempt $attemptId;
+ | state=$serviceState;
+ | endpoint=$timelineWebappAddress;
+ | bonded to ATS=$bondedToATS;
+ | listening=$listening;
+ | batchSize=$batchSize;
+ | flush count=$getFlushCount;
+ | total number queued=$eventsQueued, processed=$eventsProcessed;
+ | attempted entity posts=$postAttempts
+ | successful entity posts=$postSuccesses
+ | failed entity posts=$postFailures;
+ | events dropped=$eventsDropped;
+ | app start event received=$appStartEventProcessed;
+ | app end event received=$appEndEventProcessed;
+ """.stripMargin
+
+ /**
+ * Is the service listening to events from the spark context?
+ *
+ * @return true if it has registered as a listener
+ */
+ def listening: Boolean = {
+ listener.isDefined
+ }
+
+ /**
+ * Is the service hooked up to an ATS server?
+ *
+ * This does not check the validity of the link, only whether or not the service
+ * has been set up to talk to ATS.
+ *
+ * @return true if the service has a timeline client
+ */
+ def bondedToATS: Boolean = {
+ _timelineClient.isDefined
+ }
+
+ /**
+ * Set the YARN binding information.
+ *
+ * This is called during startup. It is private to the package so that tests
+ * may update this data.
+ * @param appId YARN application ID
+ * @param maybeAttemptId optional attempt ID
+ */
+ private[yarn] def bindToYarnApplication(appId: ApplicationId,
+ maybeAttemptId: Option[ApplicationAttemptId]): Unit = {
+ require(appId != null, "Null appId parameter")
+ applicationId = appId
+ attemptId = maybeAttemptId
+ }
+
+ /**
+ * Set the "spark" application and attempt information -the information
+ * provided in the start event. The attempt ID here may be `None`; even
+ * if set it may only be unique amongst the attempts of this application.
+ * That is: not unique enough to be used as the entity ID
+ *
+ * @param appId application ID
+ * @param attemptId attempt ID
+ */
+ private def setContextAppAndAttemptInfo(appId: Option[String],
+ attemptId: Option[String]): Unit = {
+ logDebug(s"Setting application ID to $appId; attempt ID to $attemptId")
+ sparkApplicationId = appId
+ sparkApplicationAttemptId = attemptId
+ }
+
+ /**
+ * Add the listener if it is not disabled.
+ * This is accessible in the same package purely for testing
+ *
+ * @return true if the register was enabled
+ */
+ private[yarn] def registerListener(): Boolean = {
+ assert(sparkContext != null, "Null context")
+ if (sparkContext.conf.getBoolean(REGISTER_LISTENER, true)) {
+ logDebug("Registering listener to spark context")
+ val l = new YarnEventListener(sparkContext, this)
+ listener = Some(l)
+ sparkContext.listenerBus.addListener(l)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Queue an action, or, if the service's `stopped` flag is set, discard it.
+ *
+ * This is the method called by the event listener when forward events to the service.
+ * @param event event to process
+ * @return true if the event was queued
+ */
+ def enqueue(event: SparkListenerEvent): Boolean = {
+ if (!queueStopped.get) {
+ metrics.eventsQueued.inc()
+ logDebug(s"Enqueue $event")
+ handleEvent(event)
+ true
+ } else {
+ // the service is stopped, so the event will not be processed.
+ if (timelineServiceEnabled) {
+ // if a timeline service was ever enabled, log the fact the event
+ // is being discarded. Don't do this if it was not, as it will
+ // only make the (test run) logs noisy.
+ logInfo(s"History service stopped; ignoring queued event : $event")
+ }
+ false
+ }
+ }
+
+ /**
+ * Stop the service; this triggers flushing the queue and, if not already processed,
+ * a pushing out of an application end event.
+ *
+ * This operation will block for up to `maxTimeToWaitOnShutdown` milliseconds
+ * to await the asynchronous action queue completing.
+ */
+ override def stop(): Unit = {
+ val oldState = enterState(StoppedState)
+ if (oldState != StartedState) {
+ // stopping from a different state
+ logDebug(s"Ignoring stop() request from state $oldState")
+ return
+ }
+ try {
+ stopQueue()
+ } finally {
+ if (sparkContext.metricsSystem != null) {
+ // unregister from metrics
+ sparkContext.metricsSystem.removeSource(metrics)
+ }
+ }
+ }
+
+ /**
+ * Stop the queue system.
+ */
+ private def stopQueue(): Unit = {
+ // if the queue is live
+ if (!queueStopped.get) {
+
+ if (appStartEventProcessed.get && !appEndEventProcessed.get) {
+ // push out an application stop event if none has been received
+ logDebug("Generating a SparkListenerApplicationEnd during service stop()")
+ enqueue(SparkListenerApplicationEnd(now()))
+ }
+
+ // flush out the events
+ asyncFlush()
+
+ // push out that queue stop event; this immediately sets the `queueStopped` flag
+ pushQueueStop(now(), shutdownWaitTime)
+
+ // Now await the halt of the posting thread.
+ var shutdownPosted = false
+ if (postThreadActive.get) {
+ postThreadActive.synchronized {
+ // check it hasn't switched state
+ if (postThreadActive.get) {
+ logDebug(s"Stopping posting thread and waiting $shutdownWaitTime mS")
+ shutdownPosted = true
+ postThreadActive.wait(shutdownWaitTime)
+ // then interrupt the thread if it is still running
+ if (postThreadActive.get) {
+ logInfo("Interrupting posting thread after $shutdownWaitTime mS")
+ entityPostThread.foreach(_.interrupt())
+ }
+ }
+ }
+ }
+ if (!shutdownPosted) {
+ // there was no running post thread, just stop the timeline client ourselves.
+ // (if there is a thread running, it must be the one to stop it)
+ stopTimelineClient()
+ logInfo(s"Stopped: $this")
+ }
+ }
+ }
+
+ /**
+ * Can an event be added?
+ *
+ * The policy is: only if the number of queued entities is below the limit, or the
+ * event marks the end of the application.
+ *
+ * @param isLifecycleEvent is this operation triggered by an application start/end?
+ * @return true if the event can be added to the queue
+ */
+ private def canAddEvent(isLifecycleEvent: Boolean): Boolean = {
+ isLifecycleEvent || metrics.eventsQueued.getCount < postQueueLimit
+ }
+
+ /**
+ * Add another event to the pending event list.
+ *
+ * Returns the size of the event list after the event was added
+ * (thread safe).
+ * @param event event to add
+ * @return the event list size
+ */
+ private def addPendingEvent(event: TimelineEvent): Int = {
+ pendingEvents.synchronized {
+ pendingEvents :+= event
+ pendingEvents.size
+ }
+ }
+
+ /**
+ * Publish next set of pending events if there are events to publish,
+ * and the application has been recorded as started.
+ *
+ * Builds the next event to push onto [[postingQueue]]; resets
+ * the current [[pendingEvents]] list and then adds a [[PostEntity]]
+ * operation to the queue.
+ *
+ * @return true if another entity was queued
+ */
+ private def publishPendingEvents(): Boolean = {
+ // verify that there are events to publish
+ val size = pendingEvents.synchronized {
+ pendingEvents.size
+ }
+ if (size > 0 && applicationStartEvent.isDefined) {
+ // push if there are events *and* the app is recorded as having started.
+ // -as the app name is needed for the the publishing.
+ metrics.flushCount.inc()
+ val timelineEntity = createTimelineEntity(
+ applicationId,
+ attemptId,
+ sparkApplicationId,
+ sparkApplicationAttemptId,
+ applicationName,
+ userName,
+ startTime,
+ endTime,
+ now())
+
+ // copy in pending events and then reset the list
+ pendingEvents.synchronized {
+ pendingEvents.foreach(timelineEntity.addEvent)
+ pendingEvents = new mutable.LinkedList[TimelineEvent]()
+ }
+ queueForPosting(timelineEntity)
+ true
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Queue an asynchronous flush operation.
+ * @return if the flush event was queued
+ */
+ def asyncFlush(): Boolean = {
+ publishPendingEvents()
+ }
+
+ /**
+ * A `StopQueueAction` action has a size of 0
+ * @param currentTime time when action was queued.
+ * @param waitTime time for shutdown to wait
+ */
+ private def pushQueueStop(currentTime: Long, waitTime: Long): Unit = {
+ queueStopped.set(true)
+ postingQueue.add(StopQueueAction(currentTime, waitTime))
+ }
+
+ /**
+ * Queue an entity for posting; also increases
+ * [[postQueueEventSize]] by the size of the entity.
+ * @param timelineEntity entity to push
+ */
+ def queueForPosting(timelineEntity: TimelineEntity): Unit = {
+ // queue the entity for posting
+ preflightCheck(timelineEntity)
+ val e = new PostEntity(timelineEntity)
+ postQueueEventSize.addAndGet(e.size)
+ metrics.postQueueEventSize.inc(e.size)
+ postingQueue.add(e)
+ }
+
+ /**
+ * Push a `PostQueueAction` to the start of the queue; also increments
+ * [[postQueueEventSize]] by the size of the action.
+ * @param action action to push
+ */
+ private def pushToFrontOfQueue(action: PostQueueAction): Unit = {
+ postingQueue.push(action)
+ postQueueEventSize.addAndGet(action.size)
+ metrics.postQueueEventSize.inc(action.size)
+ }
+
+ /**
+ * Take from the posting queue; decrements [[postQueueEventSize]] by the size
+ * of the action.
+ * @return the action
+ */
+ private def takeFromPostingQueue(): PostQueueAction = {
+ val taken = postingQueue.take()
+ postQueueEventSize.addAndGet(-taken.size)
+ metrics.postQueueEventSize.dec(taken.size)
+ taken
+ }
+
+ /**
+ * Poll from the posting queue; decrements [[postQueueEventSize]] by the size
+ * of the action.
+ * @return
+ */
+ private def pollFromPostingQueue(mills: Long): Option[PostQueueAction] = {
+ val taken = postingQueue.poll(mills, TimeUnit.MILLISECONDS)
+ postQueueEventSize.addAndGet(-taken.size)
+ metrics.postQueueEventSize.dec(taken.size)
+ Option(taken)
+ }
+
+ /**
+ * Perform any preflight checks.
+ *
+ * This is just a safety check to catch regressions in the code which
+ * publish data that cannot be parsed at the far end.
+ * @param entity timeline entity to review.
+ */
+ private def preflightCheck(entity: TimelineEntity): Unit = {
+ require(entity.getStartTime != null,
+ s"No start time in ${describeEntity(entity)}")
+ }
+
+ /** Actions in the post queue */
+ private sealed trait PostQueueAction {
+ /**
+ * Number of events in this entry
+ * @return a natural number
+ */
+ def size: Int
+ }
+
+ /**
+ * A `StopQueueAction` action has a size of 0
+ * @param currentTime time when action was queued.
+ * @param waitTime time for shutdown to wait
+ */
+ private case class StopQueueAction(currentTime: Long, waitTime: Long) extends PostQueueAction {
+ override def size: Int = 0
+ def timeLimit: Long = currentTime + waitTime
+ }
+
+ /**
+ * A `PostEntity` action has a size of the number of listed events
+ */
+ private case class PostEntity(entity: TimelineEntity) extends PostQueueAction {
+ override def size: Int = entity.getEvents.size()
+ }
+
+ /**
+ * Post a single entity.
+ *
+ * Any network/connectivity errors will be caught and logged, and returned as the
+ * exception field in the returned tuple.
+ *
+ * Any posting which generates a response will result in the timeline response being
+ * returned. This response *may* contain errors; these are almost invariably going
+ * to re-occur when resubmitted.
+ *
+ * @param entity entity to post
+ * @return Any exception other than an interruption raised during the operation.
+ * @throws InterruptedException if an [[InterruptedException]] or [[InterruptedIOException]] is
+ * received. These exceptions may also get caught and wrapped in the ATS client library.
+ */
+ private def postOneEntity(entity: TimelineEntity): Option[Exception] = {
+ domainId.foreach(entity.setDomainId)
+ val entityDescription = describeEntity(entity)
+ logInfo(s"About to POST entity ${entity.getEntityId} with ${entity.getEvents.size()} events" +
+ s" to timeline service $timelineWebappAddress")
+ logDebug(s"About to POST $entityDescription")
+ val timeContext = metrics.postOperationTimer.time()
+ metrics.entityPostAttempts.inc()
+ try {
+ val response = timelineClient.putEntities(entity)
+ val errors = response.getErrors
+ if (errors.isEmpty) {
+ logDebug(s"entity successfully posted")
+ metrics.entityPostSuccesses.inc()
+ } else {
+ // The ATS service rejected the request at the API level.
+ // this is something we assume cannot be re-tried
+ metrics.entityPostRejections.inc()
+ logError(s"Failed to post $entityDescription")
+ errors.asScala.foreach { err =>
+ logError(describeError(err))
+ }
+ }
+ // whether accepted or rejected, this request is not re-issued
+ None
+ } catch {
+
+ case e: InterruptedException =>
+ // interrupted; this will break out of IO/Sleep operations and
+ // trigger a rescan of the stopped() event.
+ throw e
+
+ case e: ConnectException =>
+ // connection failure: network, ATS down, config problems, ...
+ metrics.entityPostFailures.inc()
+ logDebug(s"Connection exception submitting $entityDescription", e)
+ Some(e)
+
+ case e: Exception =>
+ val cause = e.getCause
+ if (cause != null && cause.isInstanceOf[InterruptedException]) {
+ // hadoop 2.7 retry logic wraps the interrupt
+ throw cause
+ }
+ // something else has gone wrong.
+ metrics.entityPostFailures.inc()
+ logDebug(s"Could not handle history entity: $entityDescription", e)
+ Some(e)
+
+ } finally {
+ timeContext.stop()
+ }
+ }
+
+ /**
+ * Wait for and then post entities until stopped.
+ *
+ * Algorithm.
+ *
+ * 1. The thread waits for events in the [[postingQueue]] until stopped or interrupted.
+ * 1. Failures result in the entity being queued for resending, after a delay which grows
+ * linearly on every retry.
+ * 1. Successful posts reset the retry delay.
+ * 1. If the process is interrupted, the loop continues with the `stopFlag` flag being checked.
+ *
+ * To stop this process then, first set the `stopFlag` flag, then interrupt the thread.
+ *
+ * @param retryInterval delay in milliseconds for the first retry delay; the delay increases
+ * by this value on every future failure. If zero, there is no delay, ever.
+ * @return the [[StopQueueAction]] received to stop the process.
+ */
+ private def postEntities(retryInterval: Long): StopQueueAction = {
+ var lastAttemptFailed = false
+ var currentRetryDelay = retryInterval
+ var result: StopQueueAction = null
+ while (result == null) {
+ takeFromPostingQueue() match {
+ case PostEntity(entity) =>
+ val ex = postOneEntity(entity)
+ if (ex.isDefined && !queueStopped.get()) {
+ // something went wrong
+ if (!lastAttemptFailed) {
+ // avoid filling up logs with repeated failures
+ logWarning(s"Exception submitting entity to $timelineWebappAddress", ex.get)
+ }
+ // log failure and queue for posting again
+ lastAttemptFailed = true
+ currentRetryDelay += retryInterval
+ if (!queueStopped.get()) {
+ // push back to the head of the queue
+ postingQueue.addFirst(PostEntity(entity))
+ if (currentRetryDelay > 0) {
+ Thread.sleep(currentRetryDelay)
+ }
+ }
+ } else {
+ // success; reset flags and retry delay
+ lastAttemptFailed = false
+ currentRetryDelay = retryInterval
+ }
+ case stop: StopQueueAction =>
+ logDebug("Queue stopped")
+ result = stop
+ }
+ }
+ result
+ }
+
+ /**
+ * Shutdown phase: continually post oustanding entities until the timeout has been exceeded.
+ * The interval between failures is the retryInterval: there is no escalation, and if
+ * is longer than the remaining time in the shutdown, the remaining time sets the limit.
+ *
+ * @param shutdown shutdown parameters.
+ * @param retryInterval delay in milliseconds for every delay.
+ */
+ private def postEntitiesShutdownPhase(shutdown: StopQueueAction, retryInterval: Long): Unit = {
+ val timeLimit = shutdown.timeLimit
+ val timestamp = YarnTimelineUtils.timeShort(timeLimit, "")
+ logDebug(s"Queue shutdown, time limit= $timestamp")
+ while (now() < timeLimit && !postingQueue.isEmpty) {
+ pollFromPostingQueue(timeLimit - now()) match {
+ case Some(PostEntity(entity)) =>
+ postOneEntity(entity).foreach { e =>
+ if (!e.isInstanceOf[InterruptedException] &&
+ !e.isInstanceOf[InterruptedIOException]) {
+ // failure, push back to try again
+ pushToFrontOfQueue(PostEntity(entity))
+ if (retryInterval > 0) {
+ Thread.sleep(retryInterval)
+ } else {
+ // there's no retry interval, so fail immediately
+ throw e
+ }
+ } else {
+ // this was an interruption. Throw it again
+ throw e
+ }
+ }
+ case Some(StopQueueAction(_, _)) =>
+ // ignore these
+ logDebug("Ignoring StopQueue action")
+
+ case None =>
+ // get here then the queue is empty; all is well
+ }
+ }
+ }
+
+ /**
+ * If the event reaches the batch size or flush is true, push events to ATS.
+ *
+ * @param event event. If null, no event is queued, but the post-queue flush logic still applies
+ */
+ private def handleEvent(event: SparkListenerEvent): Unit = {
+ // publish events unless stated otherwise
+ var publish = true
+ // don't trigger a push to the ATS
+ var push = false
+ // lifecycle events get special treatment: they are never discarded from the queues,
+ // even if the queues are full.
+ var isLifecycleEvent = false
+ val timestamp = now()
+ metrics.eventsProcessed.inc()
+ if (metrics.eventsProcessed.getCount() % 1000 == 0) {
+ logDebug(s"${metrics.eventsProcessed} events are processed")
+ }
+ event match {
+ case start: SparkListenerApplicationStart =>
+ // we already have all information,
+ // flush it for old one to switch to new one
+ logDebug(s"Handling application start event: $event")
+ if (!appStartEventProcessed.getAndSet(true)) {
+ applicationStartEvent = Some(start)
+ applicationName = start.appName
+ if (applicationName == null || applicationName.isEmpty) {
+ logWarning("Application does not have a name")
+ applicationName = applicationId.toString
+ }
+ userName = start.sparkUser
+ startTime = start.time
+ if (startTime == 0) {
+ startTime = timestamp
+ }
+ setContextAppAndAttemptInfo(start.appId, start.appAttemptId)
+ logDebug(s"Application started: $event")
+ isLifecycleEvent = true
+ push = true
+ } else {
+ logWarning(s"More than one application start event received -ignoring: $start")
+ publish = false
+ }
+
+ case end: SparkListenerApplicationEnd =>
+ if (!appStartEventProcessed.get()) {
+ // app-end events being received before app-start events can be triggered in
+ // tests, even if not seen in real applications.
+ // react by ignoring the event altogether, as an un-started application
+ // cannot be reported.
+ logError(s"Received application end event without application start $event -ignoring.")
+ } else if (!appEndEventProcessed.getAndSet(true)) {
+ // the application has ended
+ logDebug(s"Application end event: $event")
+ applicationEndEvent = Some(end)
+ // flush old entity
+ endTime = if (end.time > 0) end.time else timestamp
+ push = true
+ isLifecycleEvent = true
+ } else {
+ // another test-time only situation: more than one application end event
+ // received. Discard the later one.
+ logInfo(s"Discarding duplicate application end event $end")
+ publish = false
+ }
+
+ case update: SparkListenerBlockUpdated =>
+ publish = false
+
+ case update: SparkListenerExecutorMetricsUpdate =>
+ publish = false
+
+ case _ =>
+ }
+
+ if (publish) {
+ val tlEvent = toTimelineEvent(event, timestamp)
+ val eventCount = if (tlEvent.isDefined && canAddEvent(isLifecycleEvent)) {
+ addPendingEvent(tlEvent.get)
+ } else {
+ // discarding the event
+ logInfo(s"Discarding event $tlEvent")
+ metrics.eventsDropped.inc()
+ 0
+ }
+
+ // trigger a push if the batch limit is reached
+ // There's no need to check for the application having started, as that is done later.
+ push |= eventCount >= batchSize
+
+ logDebug(s"current event num: $eventCount")
+ if (push) {
+ logDebug("Push triggered")
+ publishPendingEvents()
+ }
+ }
+ }
+
+ /**
+ * Return the current time in milliseconds.
+ * @return system time in milliseconds
+ */
+ private def now(): Long = {
+ clock.getTimeMillis()
+ }
+
+ /**
+ * Get the number of flush events that have taken place.
+ *
+ * This includes flushes triggered by the event list being bigger the batch size,
+ * but excludes flush operations triggered when the action processor thread
+ * is stopped, or if the timeline service binding is disabled.
+ *
+ * @return count of processed flush events.
+ */
+ def getFlushCount: Long = {
+ metrics.flushCount.getCount
+ }
+
+ /**
+ * Post events until told to stop.
+ */
+ private class EntityPoster extends Runnable {
+
+ override def run(): Unit = {
+ postThreadActive.set(true)
+ try {
+ val shutdown = postEntities(retryInterval)
+ // getting here means the `stop` flag is true
+ postEntitiesShutdownPhase(shutdown, retryInterval)
+ logInfo(s"Stopping dequeue service, final queue size is ${postingQueue.size};" +
+ s" outstanding events to post count: ${postQueueEventSize.get()}")
+ } catch {
+ // handle exceptions triggering thread exit. Interruptes are good; others less welcome.
+ case ex: InterruptedException =>
+ logInfo("Entity Posting thread interrupted")
+ logDebug("Entity Posting thread interrupted", ex)
+
+ case ex: InterruptedIOException =>
+ logInfo("Entity Posting thread interrupted")
+ logDebug("Entity Posting thread interrupted", ex)
+
+ case ex: Exception =>
+ logError("Entity Posting thread exiting after exception raised", ex)
+ } finally {
+ stopTimelineClient()
+ postThreadActive synchronized {
+ // declare that this thread is no longer active
+ postThreadActive.set(false)
+ // and notify all listeners of this fact
+ postThreadActive.notifyAll()
+ }
+ }
+ }
+ }
+
+}
+
+/**
+ * Metrics integration: the various counters of activity
+ */
+private[yarn] class HistoryMetrics extends ExtendedMetricsSource {
+
+ /** Name for metrics: yarn_history */
+ override val sourceName = YarnHistoryService.METRICS_NAME
+
+ /** Metrics registry */
+ override val metricRegistry = new MetricRegistry()
+
+ /** Number of events in the post queue. */
+ val postQueueEventSize = new Counter()
+
+ /** Counter of events processed -that is have been through handleEvent() */
+ val eventsProcessed = new Counter()
+
+ /** Counter of events queued. */
+ val eventsQueued = new Counter()
+
+ /** Counter of number of attempts to post entities. */
+ val entityPostAttempts = new Counter()
+
+ /** Counter of number of successful entity post operations. */
+ val entityPostSuccesses = new Counter()
+
+ /** How many entity postings failed? */
+ val entityPostFailures = new Counter()
+
+ /** How many entity postings were rejected? */
+ val entityPostRejections = new Counter()
+
+ /** The number of events which were dropped as the backlog of pending posts was too big. */
+ val eventsDropped = new Counter()
+
+ /** How many flushes have taken place? */
+ val flushCount = new Counter()
+
+ /** Timer to build up statistics on post operation times */
+ val postOperationTimer = new Timer()
+
+ val metricsMap: Map[String, Metric] = Map(
+ "eventsDropped" -> eventsDropped,
+ "eventsProcessed" -> eventsProcessed,
+ "eventsQueued" -> eventsQueued,
+ "entityPostAttempts" -> entityPostAttempts,
+ "entityPostFailures" -> entityPostFailures,
+ "entityPostRejections" -> entityPostRejections,
+ "entityPostSuccesses" -> entityPostSuccesses,
+ "entityPostTimer" -> postOperationTimer,
+ "flushCount" -> flushCount
+ )
+
+ init()
+
+}
+
+/**
+ * Constants and defaults for the history service.
+ */
+private[spark] object YarnHistoryService {
+
+ /**
+ * Name of the entity type used to declare spark Applications.
+ */
+ val SPARK_EVENT_ENTITY_TYPE = "spark_event_v01"
+
+ /**
+ * Domain ID.
+ */
+ val DOMAIN_ID_PREFIX = "Spark_ATS_"
+
+ /**
+ * Time in millis to wait for shutdown on service stop.
+ */
+ val DEFAULT_SHUTDOWN_WAIT_TIME = "30s"
+
+ /**
+ * The maximum time in to wait for event posting to complete when the service stops.
+ */
+ val SHUTDOWN_WAIT_TIME = "spark.hadoop.yarn.timeline.shutdown.waittime"
+
+ /**
+ * Option to declare that the history service should register as a spark context
+ * listener. (default: true; this option is here for testing)
+ *
+ * This is a spark option, though its use of name will cause it to propagate down to the Hadoop
+ * Configuration.
+ */
+ val REGISTER_LISTENER = "spark.hadoop.yarn.timeline.listen"
+
+ /**
+ * Option for the size of the batch for timeline uploads. Bigger: less chatty.
+ * Smaller: history more responsive.
+ */
+ val BATCH_SIZE = "spark.hadoop.yarn.timeline.batch.size"
+
+ /**
+ * The default size of a batch
+ */
+ val DEFAULT_BATCH_SIZE = 10
+
+ /**
+ * Name of a domain for the timeline
+ */
+ val TIMELINE_DOMAIN = "spark.hadoop.yarn.timeline.domain"
+
+ /**
+ * Limit on number of posts in the outbound queue -when exceeded
+ * new events will be dropped
+ */
+ val POST_EVENT_LIMIT = "spark.hadoop.yarn.timeline.post.limit"
+
+ /**
+ * The default limit of events in the post queue
+ */
+ val DEFAULT_POST_EVENT_LIMIT = 1000
+
+ /**
+ * Interval in milliseconds between POST retries. Every
+ * failure causes the interval to increase by this value.
+ */
+ val POST_RETRY_INTERVAL = "spark.hadoop.yarn.timeline.post.retry.interval"
+
+ /**
+ * The default retry interval in millis
+ */
+ val DEFAULT_POST_RETRY_INTERVAL = "1000ms"
+
+ /**
+ * Primary key used for events
+ */
+ val PRIMARY_KEY = "spark_application_entity"
+
+ /**
+ * Entity `OTHER_INFO` field: start time
+ */
+ val FIELD_START_TIME = "startTime"
+
+ /**
+ * Entity `OTHER_INFO` field: last updated time.
+ */
+ val FIELD_LAST_UPDATED = "lastUpdated"
+
+ /**
+ * Entity `OTHER_INFO` field: end time. Not present if the app is running.
+ */
+ val FIELD_END_TIME = "endTime"
+
+ /**
+ * Entity `OTHER_INFO` field: application name from context.
+ */
+ val FIELD_APP_NAME = "appName"
+
+ /**
+ * Entity `OTHER_INFO` field: user.
+ */
+ val FIELD_APP_USER = "appUser"
+
+ /**
+ * Entity `OTHER_INFO` field: YARN application ID.
+ */
+ val FIELD_APPLICATION_ID = "applicationId"
+
+ /**
+ * Entity `OTHER_INFO` field: attempt ID from spark start event.
+ */
+ val FIELD_ATTEMPT_ID = "attemptId"
+
+ /**
+ * Entity `OTHER_INFO` field: a counter which is incremented whenever a new timeline entity
+ * is created in this JVM (hence, attempt). It can be used to compare versions of the
+ * current entity with any cached copy -it is less brittle than using timestamps.
+ */
+ val FIELD_ENTITY_VERSION = "entityVersion"
+
+ /**
+ * Entity `OTHER_INFO` field: Spark version.
+ */
+ val FIELD_SPARK_VERSION = "sparkVersion"
+
+ /**
+ * Entity filter field: to search for entities that have started.
+ */
+ val FILTER_APP_START = "startApp"
+
+ /**
+ * Value of the `startApp` filter field.
+ */
+ val FILTER_APP_START_VALUE = "SparkListenerApplicationStart"
+
+ /**
+ * Entity filter field: to search for entities that have ended.
+ */
+ val FILTER_APP_END = "endApp"
+
+ /**
+ * Value of the `endApp`filter field.
+ */
+ val FILTER_APP_END_VALUE = "SparkListenerApplicationEnd"
+
+ /**
+ * ID used in yarn-client attempts only.
+ */
+ val CLIENT_BACKEND_ATTEMPT_ID = "1"
+
+ /**
+ * The classname of the history service to instantiate in the YARN AM.
+ */
+ val CLASSNAME = "org.apache.spark.deploy.history.yarn.YarnHistoryService"
+
+ /**
+ * Name of metrics.
+ */
+ val METRICS_NAME = "yarn_history"
+
+ /**
+ * Enum value of application created state
+ */
+ val CreatedState = 0
+
+ /**
+ * Enum value of started state.
+ */
+ val StartedState = 1
+
+ /**
+ * Enum value of stopped state.
+ */
+ val StoppedState = 2
+}
diff --git a/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
new file mode 100644
index 000000000000..da105c52b1aa
--- /dev/null
+++ b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
@@ -0,0 +1,772 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.{lang, util}
+import java.io.IOException
+import java.net.{InetSocketAddress, NoRouteToHostException, URI, URL}
+import java.text.DateFormat
+import java.util.{ArrayList => JArrayList, Collection => JCollection, Date, HashMap => JHashMap, Map => JMap}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.json4s.{MappingException, JValue}
+import org.json4s.JsonAST.{JNull, JNothing, JArray, JBool, JDecimal, JDouble, JString, JInt, JObject}
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted}
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+ /**
+ * What attempt ID to use as the attempt ID field (not the entity ID) when
+ * there is no attempt info.
+ */
+ val SINGLE_ATTEMPT = "1"
+
+ /**
+ * Exception text when there is no event info data to unmarshall.
+ */
+ val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+ /**
+ * Exception text when there is event info entry in the timeline event, but it is empty.
+ */
+
+ val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+ /**
+ * Counter incremented on every spark event to timeline event creation,
+ * so guaranteeing uniqueness of event IDs across a single application attempt
+ * (which is implicitly, one per JVM).
+ */
+ val eventCreateCounter = new AtomicLong(System.currentTimeMillis())
+
+ /**
+ * A counter incremented every time a new entity is created. This is included as an "other"
+ * field in the entity information -so can be used as a probe to determine if the entity
+ * has been updated since a previous check.
+ */
+ val entityVersionCounter = new AtomicLong(1)
+
+ /**
+ * Converts a Java object to its equivalent json4s representation.
+ */
+ def toJValue(obj: Object): JValue = {
+ obj match {
+ case str: String => JString(str)
+ case dbl: java.lang.Double => JDouble(dbl)
+ case dec: java.math.BigDecimal => JDecimal(dec)
+ case int: java.lang.Integer => JInt(BigInt(int))
+ case long: java.lang.Long => JInt(BigInt(long))
+ case bool: java.lang.Boolean => JBool(bool)
+ case map: JMap[_, _] =>
+ val jmap = map.asInstanceOf[JMap[String, Object]]
+ JObject(jmap.entrySet().asScala.map { e => e.getKey -> toJValue(e.getValue) }.toList)
+ case array: JCollection[_] =>
+ JArray(array.asInstanceOf[JCollection[Object]].asScala.map(o => toJValue(o)).toList)
+ case null => JNothing
+ }
+ }
+
+ /**
+ * Converts a JValue into its Java equivalent.
+ */
+ def toJavaObject(v: JValue): Object = {
+ v match {
+ case JNothing => null
+ case JNull => null
+ case JString(s) => s
+ case JDouble(num) => java.lang.Double.valueOf(num)
+ case JDecimal(num) => num.bigDecimal
+ case JInt(num) => java.lang.Long.valueOf(num.longValue())
+ case JBool(value) => java.lang.Boolean.valueOf(value)
+ case obj: JObject => toJavaMap(obj)
+ case JArray(vals) =>
+ val list = new JArrayList[Object]()
+ vals.foreach(x => list.add(toJavaObject(x)))
+ list
+ }
+ }
+
+ /**
+ * Converts a json4s list of fields into a Java Map suitable for serialization by Jackson,
+ * which is used by the ATS client library.
+ */
+ def toJavaMap(sourceObj: JObject): JHashMap[String, Object] = {
+ val map = new JHashMap[String, Object]()
+ sourceObj.obj.foreach { case (k, v) => map.put(k, toJavaObject(v)) }
+ map
+ }
+
+ /**
+ * Convert a timeline event to a spark one. Includes some basic checks for validity of
+ * the event payload.
+ * @param event timeline event
+ * @return an unmarshalled event
+ */
+ def toSparkEvent(event: TimelineEvent): SparkListenerEvent = {
+ val info = event.getEventInfo
+ if (info == null) {
+ throw new IOException(E_NO_EVENTINFO)
+ }
+ if (info.size() == 0) {
+ throw new IOException(E_EMPTY_EVENTINFO)
+ }
+ val payload = toJValue(info)
+ def jsonToString: String = {
+ val json = compact(render(payload))
+ val limit = 256
+ if (json.length < limit) {
+ json
+ } else {
+ json.substring(0, limit) + " ... }"
+ }
+ }
+ logDebug(s"toSparkEvent payload is $jsonToString")
+ val eventField = payload \ "Event"
+ if (eventField == JNothing) {
+ throw new IOException(s"No 'Event' entry in $jsonToString")
+ }
+
+ // now the real unmarshalling
+ try {
+ JsonProtocol.sparkEventFromJson(payload)
+ } catch {
+ // failure in the marshalling; include payload in the message
+ case ex: MappingException =>
+ logDebug(s"$ex while rendering $jsonToString", ex)
+ throw ex
+ }
+ }
+
+ /**
+ * Convert a spark event to a timeline event
+ * @param event handled spark event
+ * @return a timeline event if it could be marshalled
+ */
+ def toTimelineEvent(event: SparkListenerEvent, timestamp: Long): Option[TimelineEvent] = {
+ try {
+ val tlEvent = new TimelineEvent()
+ tlEvent.setEventType(Utils.getFormattedClassName(event)
+ + "-" + eventCreateCounter.incrementAndGet.toString)
+ tlEvent.setTimestamp(timestamp)
+ val kvMap = new JHashMap[String, Object]()
+ val json = JsonProtocol.sparkEventToJson(event)
+ val jObject = json.asInstanceOf[JObject]
+ // the timeline event wants a map of java objects for Jackson to serialize
+ val hashMap = toJavaMap(jObject)
+ tlEvent.setEventInfo(hashMap)
+ Some(tlEvent)
+ }
+ catch {
+ case e: MatchError =>
+ log.debug(s"Failed to convert $event to JSON: $e", e)
+ None
+ }
+ }
+
+ /**
+ * Describe the event for logging.
+ *
+ * @param event timeline event
+ * @return a description
+ */
+ def describeEvent(event: TimelineEvent): String = {
+ val sparkEventDetails = try {
+ toSparkEvent(event).toString
+ } catch {
+ case _: MappingException =>
+ "(cannot convert event details to spark exception)"
+ }
+ s"${event.getEventType()} @ ${new Date(event.getTimestamp())}" +
+ s"\n $sparkEventDetails"
+ }
+
+ /**
+ * Create details of a timeline entity, by describing every event inside it.
+ *
+ * @param entity entity containing a possibly empty or null list of events
+ * @return a list of event details, with a newline between each one
+ */
+ def eventDetails(entity: TimelineEntity): String = {
+ val events = entity.getEvents
+ if (events != null) {
+ events.asScala.map(describeEvent).mkString("\n")
+ } else {
+ ""
+ }
+ }
+
+ /**
+ * Describe a timeline entity.
+ * @param entity entity
+ * @return a string description.
+ */
+ def describeEntity(entity: TimelineEntity): String = {
+ val events: util.List[TimelineEvent] = entity.getEvents
+ val eventSummary = if (events != null) {
+ s"contains ${events.size()} event(s)"
+ } else {
+ "contains no events"
+ }
+
+ val domain = if (entity.getDomainId != null) s" Domain ${entity.getDomainId}" else ""
+ val header = s"${entity.getEntityType}/${entity.getEntityId} $domain"
+ try {
+ events.asScala.map(describeEvent).mkString("\n")
+ val otherInfo = entity.getOtherInfo.asScala.map {
+ case (k, v) => s" $k ='$v': ${v.getClass};"
+ }.mkString("\n")
+ s"Timeline Entity " + header +
+ " " + otherInfo + "\n" +
+ " started: " + timeFieldToString(entity.getStartTime, "start") + "\n" +
+ " " + eventSummary
+ } catch {
+ case e: MappingException =>
+ // failure to marshall/unmarshall; downgrade
+ s"Timeline Entity $header"
+ }
+ }
+
+ /**
+ * Convert a `java.lang.Long` reference to a string value, or, if the reference is null,
+ * to text declaring that the named field is empty.
+ *
+ * @param time time reference
+ * @param field field name for error message
+ * @return a string to describe the field
+ */
+ def timeFieldToString(time: lang.Long, field: String): String = {
+ if (time != null) {
+ new Date(time).toString
+ } else {
+ s"no $field time"
+ }
+ }
+
+ /**
+ * A verbose description of the entity which contains event details and info about
+ * primary/secondary keys.
+ *
+ * @param entity timeline entity
+ * @return a verbose description of the field
+ */
+ def describeEntityVerbose(entity: TimelineEntity): String = {
+ val header = describeEntity(entity)
+ val primaryFilters = entity.getPrimaryFilters.asScala.toMap
+ var filterElements = ""
+ for ((k, v) <- primaryFilters) {
+ filterElements = filterElements +
+ " filter " + k + ": [ " + v.asScala.foldLeft("")((s, o) => s + o.toString + " ") + "]\n"
+ }
+ val events = eventDetails(entity)
+ header + "\n" + filterElements + events
+ }
+
+ /**
+ * Split a comma separated String, filter out any empty items, and return a `Set` of strings.
+ */
+ def stringToSet(list: String): Set[String] = {
+ list.split(',').map(_.trim).filter(!_.isEmpty).toSet
+ }
+
+ /**
+ * Try to get the event time off an event. Not all events have the required information.
+ *
+ * @param event event to process
+ * @return the event time
+ */
+ def eventTime(event: SparkListenerEvent): Option[Long] = {
+ event match {
+ case evt: SparkListenerApplicationStart =>
+ Some(evt.time)
+ case evt: SparkListenerApplicationEnd =>
+ Some(evt.time)
+ case evt: SparkListenerJobStart =>
+ Some(evt.time)
+ case evt: SparkListenerJobEnd =>
+ Some(evt.time)
+ case evt: SparkListenerExecutorAdded =>
+ Some(evt.time)
+ case evt: SparkListenerExecutorRemoved =>
+ Some(evt.time)
+ case evt: SparkListenerStageSubmitted =>
+ evt.stageInfo.submissionTime
+ case evt: SparkListenerStageCompleted =>
+ evt.stageInfo.completionTime
+ case _ => None
+ }
+ }
+
+ /**
+ * Create and start a timeline client, using the configuration context to
+ * set up the binding.
+ *
+ * @param sparkContext spark context
+ * @return the started instance
+ */
+ def createTimelineClient(sparkContext: SparkContext): TimelineClient = {
+ val client = TimelineClient.createTimelineClient
+ client.init(sparkContext.hadoopConfiguration)
+ client.start()
+ client
+ }
+
+ /**
+ * The path for the V1 ATS REST API.
+ */
+ val TIMELINE_REST_PATH = s"/ws/v1/timeline/"
+
+ /**
+ * Build the URI to the base of the timeline web application
+ * from the Hadoop context.
+ *
+ * Raises an exception if the address cannot be determined or is considered invalid from
+ * a networking perspective.
+ *
+ * Does not perform any checks as to whether or not the timeline service is enabled
+ * @param conf configuration
+ * @return the URI to the timeline service.
+ */
+ def getTimelineEndpoint(conf: Configuration): URI = {
+ val isHttps = YarnConfiguration.useHttps(conf)
+ val address = if (isHttps) {
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS)
+ } else {
+ conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS)
+ }
+ val protocol = if (isHttps) "https://" else "http://"
+ require(address != null, s"No timeline service defined")
+ validateEndpoint(URI.create(s"$protocol$address$TIMELINE_REST_PATH"))
+ }
+
+ /**
+ * Create a URI to the history service. This uses the entity type of
+ * [[YarnHistoryService#ENTITY_TYPE]] for spark application histories.
+ * @param conf hadoop configuration to examine
+ * @return
+ */
+ def timelineWebappUri(conf: Configuration): URI = {
+ timelineWebappUri(conf, YarnHistoryService.SPARK_EVENT_ENTITY_TYPE)
+ }
+
+ /**
+ * Get the URI of a path under the timeline web UI.
+ *
+ * @param conf configuration
+ * @param subpath path under the root web UI
+ * @return a URI
+ */
+ def timelineWebappUri(conf: Configuration, subpath: String): URI = {
+ val base = getTimelineEndpoint(conf)
+ new URL(base.toURL, subpath).toURI
+ }
+
+ /**
+ * Check the service configuration to see if the timeline service is enabled.
+ *
+ * @return true if `YarnConfiguration.TIMELINE_SERVICE_ENABLED` is set.
+ */
+ def timelineServiceEnabled(conf: Configuration): Boolean = {
+ conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+ }
+
+ /**
+ * Get the URI to an application under the timeline
+ * (this requires the applicationID to have been used to
+ * publish entities there)
+ * @param timelineUri timeline URI
+ * @param appId App ID (really, the entityId used to publish)
+ * @return the path
+ */
+ def applicationURI(timelineUri: URI, appId: String): URI = {
+ require(appId != null && !appId.isEmpty, "No application ID")
+ require(!appId.contains("/"), s"Illegal character '/' in $appId")
+ timelineUri.resolve(s"${timelineUri.getPath()}/$appId")
+ }
+
+ /**
+ * Map an error code to a string. For known codes, it returns
+ * a description; for others it just returns the error code.
+ *
+ * @param code error code
+ * @return a string description for error messages
+ */
+ def timelineErrorCodeToString(code: Int): String = {
+ code match {
+ case 0 => "0: no error"
+ case 1 => "No start time"
+ case 2 => "IO Exception"
+ case 3 => "System Filter Conflict"
+ case 4 => "Access Denied"
+ case 5 => "No Domain"
+ case 6 => "Forbidden Relation"
+ case other: Int => s"Error code $other"
+ }
+ }
+
+ /**
+ * Convert a timeline error response to a slightly more meaningful string.
+ * @param error error
+ * @return text for diagnostics
+ */
+ def describeError(error: TimelinePutError): String = {
+ s"Entity ID=${error.getEntityId()}; Entity type=${error.getEntityType}" +
+ s" Error code ${error.getErrorCode}" +
+ s": ${timelineErrorCodeToString(error.getErrorCode)}"
+ }
+
+ /**
+ * Describe a put response by enumerating and describing all errors.
+ * (if present. A null `errors` element is handled robustly).
+ *
+ * @param response response to describe
+ * @return text for diagnostics
+ */
+ def describePutResponse(response: TimelinePutResponse) : String = {
+ val responseErrs = response.getErrors
+ if (responseErrs != null) {
+ val errors = mutable.MutableList(s"TimelinePutResponse with ${responseErrs.size()} errors")
+ for (err <- responseErrs.asScala) {
+ errors += describeError(err)
+ }
+ errors.foldLeft("")((buff, elt) => buff + "\n" + elt)
+ } else {
+ s"TimelinePutResponse with null error list"
+ }
+ }
+
+ /**
+ * This is used to highlight an undefined field.
+ */
+ val UNDEFINED_FIELD = "Undefined"
+
+ /**
+ * Lookup a field in the `otherInfo` section of a [[TimelineEntity]].
+ *
+ * @param en entity
+ * @param name field name
+ * @return the value or the string [[UNDEFINED_FIELD]] if not
+ * @throws Exception if the field is not found
+ */
+ def field(en: TimelineEntity, name: String) : Object = {
+ fieldOption(en, name).getOrElse(UNDEFINED_FIELD)
+ }
+
+ /**
+ * Lookup a field in the `otherInfo` section of a [[TimelineEntity]].
+ *
+ * @param en entity
+ * @param name field name
+ * @return the value
+ * @throws Exception if the field is not found
+ */
+ def fieldOption(en: TimelineEntity, name: String) : Option[Object] = {
+ Option(en.getOtherInfo.get(name))
+ }
+
+ /**
+ * Lookup a field in the `otherInfo` section of a [[TimelineEntity]]
+ * @param en entity
+ * @param name field name
+ * @return the value converted to a string
+ * @throws Exception if the field is not found
+ */
+ def stringFieldOption(en: TimelineEntity, name: String): Option[String] = {
+ val value = en.getOtherInfo.get(name)
+ if (value != null ) {
+ Some(value.toString)
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Lookup a numeric field in the `otherInfo` section of a [[TimelineEntity]],
+ * fall back to `defval` if the field is absent or cannot be parsed.
+ *
+ * @param en entity
+ * @param name field name
+ * @param defval default value; default is 0L
+ * @return the value
+ */
+ def numberField(en: TimelineEntity, name: String, defval: Long = 0L) : Number = {
+ try {
+ fieldOption(en, name) match {
+ case Some(n: Number) => n
+ case _ => defval
+ }
+ } catch {
+ case NonFatal(e) => defval
+ }
+ }
+
+ /**
+ * Take a sequence of timeline events and return an ordered list of spark events.
+ *
+ * Important: this reverses the input in the process.
+ * @param events event sequence
+ * @return spark event sequence
+ */
+ def asSparkEvents(events: Seq[TimelineEvent]): Seq[SparkListenerEvent] = {
+ events.reverse.map { event =>
+ toSparkEvent(event)
+ }
+ }
+
+ /**
+ * Build date for display in status messages.
+ *
+ * @param timestamp time in milliseconds post-Epoch
+ * @param unset string to use if timestamp == 0
+ * @return a string for messages
+ */
+ def humanDateCurrentTZ(timestamp: Long, unset: String) : String = {
+ if (timestamp == 0) {
+ unset
+ } else {
+ val dateFormatter = DateFormat.getDateTimeInstance(DateFormat.DEFAULT, DateFormat.LONG)
+ dateFormatter.format(timestamp)
+ }
+ }
+
+ /**
+ * Short formatted time.
+ *
+ * @param timestamp time in milliseconds post-Epoch
+ * @param unset string to use if timestamp == 0
+ * @return a string for messages
+ */
+ def timeShort(timestamp: Long, unset: String) : String = {
+ if (timestamp == 0) {
+ unset
+ } else {
+ val dateFormatter = DateFormat.getTimeInstance(DateFormat.SHORT)
+ dateFormatter.format(timestamp)
+ }
+ }
+
+ /**
+ * Generate the timeline entity ID from the application and attempt ID.
+ * This is required to be unique across all entities in the timeline server.
+ *
+ * @param yarnAppId yarn application ID as passed in during creation
+ * @param yarnAttemptId YARN attempt ID as passed in during creation
+ */
+ def buildEntityId(yarnAppId: ApplicationId,
+ yarnAttemptId: Option[ApplicationAttemptId]): String = {
+ yarnAttemptId match {
+ case Some(aid) => aid.toString
+ case None => yarnAppId.toString
+ }
+ }
+
+ /**
+ * Generate the application ID for use in entity fields from the application and attempt ID.
+ *
+ * @param yarnAppId yarn application ID as passed in during creation
+ */
+ def buildApplicationIdField(yarnAppId: ApplicationId): String = {
+ yarnAppId.toString
+ }
+
+ /**
+ * Generate an attempt ID for use in the timeline entity "other/app_id" field
+ * from the application and attempt ID.
+ *
+ * This is not guaranteed to be unique across all entities. It is
+ * only required to be unique across all attempts of an application.
+ *
+ * If the application doesn't have an attempt ID, then it is
+ * an application instance which, implicitly, is single-attempt.
+ * The value [[SINGLE_ATTEMPT]] is returned
+ * @param sparkAttemptId attempt ID
+ * @return the attempt ID.
+ */
+ def buildApplicationAttemptIdField(sparkAttemptId: Option[String]): String = {
+ sparkAttemptId.getOrElse(SINGLE_ATTEMPT)
+ }
+
+ /**
+ * Add a filter and field if the value is set.
+ *
+ * @param entity entity to update
+ * @param name filter/field name
+ * @param value optional value
+ */
+ private def addFilterAndField(entity: TimelineEntity,
+ name: String, value: Option[String]): Unit = {
+ value.foreach { v => addFilterAndField(entity, name, v) }
+ }
+
+ /**
+ * Add a filter and field.
+ *
+ * @param entity entity to update
+ * @param name filter/field name
+ * @param value value
+ */
+ private def addFilterAndField(entity: TimelineEntity, name: String, value: String): Unit = {
+ entity.addPrimaryFilter(name, value)
+ entity.addOtherInfo(name, value)
+ }
+
+ /**
+ * Generate the entity ID from the application and attempt ID.
+ * Current policy is to use the attemptId, falling back to the YARN application ID.
+ *
+ * @param appId yarn application ID as passed in during creation
+ * @param attemptId yarn application ID
+ * @param sparkApplicationId application ID as submitted in the application start event
+ * @param sparkApplicationAttemptId attempt ID, or `None`
+ * @param appName application name
+ * @param userName user name
+ * @param startTime time in milliseconds when this entity was started (must be non zero)
+ * @param endTime time in milliseconds when this entity was last updated (0 means not ended)
+ * @param lastUpdated time in milliseconds when this entity was last updated (0 leaves unset)
+ * @return the timeline entity
+ */
+ def createTimelineEntity(
+ appId: ApplicationId,
+ attemptId: Option[ApplicationAttemptId],
+ sparkApplicationId: Option[String],
+ sparkApplicationAttemptId: Option[String],
+ appName: String,
+ userName: String,
+ startTime: Long, endTime: Long,
+ lastUpdated: Long): TimelineEntity = {
+ require(appId != null, "no application Id")
+ require(appName != null, "no application name")
+ require(startTime > 0, "no start time")
+
+ val entity: TimelineEntity = new TimelineEntity()
+ val entityId = buildEntityId(appId, attemptId)
+ val appIdField = buildApplicationIdField(appId)
+ entity.setEntityType(SPARK_EVENT_ENTITY_TYPE)
+ entity.setEntityId(entityId)
+ // add app/attempt ID information
+ addFilterAndField(entity, FIELD_APPLICATION_ID, appIdField)
+
+ entity.addOtherInfo(FIELD_ATTEMPT_ID,
+ buildApplicationAttemptIdField(sparkApplicationAttemptId))
+ entity.addOtherInfo(FIELD_APP_NAME, appName)
+ entity.addOtherInfo(FIELD_APP_USER, userName)
+ entity.addOtherInfo(FIELD_SPARK_VERSION, spark.SPARK_VERSION)
+ entity.addOtherInfo(FIELD_ENTITY_VERSION, entityVersionCounter.getAndIncrement())
+ started(entity, startTime)
+ if (endTime != 0) {
+ entity.addPrimaryFilter(FILTER_APP_END, FILTER_APP_END_VALUE)
+ entity.addOtherInfo(FIELD_END_TIME, endTime)
+ }
+ if (lastUpdated != 0) {
+ entity.addOtherInfo(FIELD_LAST_UPDATED, lastUpdated)
+ }
+ entity
+ }
+
+ /**
+ * Add the information to declare that an application has finished and that
+ * it has a start time and an end time.
+ *
+ * @param entity entity to update
+ * @param startTime start time
+ * @param endtime end time
+ * @param sparkApplicationId app ID
+ * @param sparkApplicationAttemptId optional attempt ID
+ * @return the updated entity
+ */
+ def completed(
+ entity: TimelineEntity,
+ startTime: Long,
+ endtime: Long,
+ sparkApplicationId: Option[String],
+ sparkApplicationAttemptId: Option[String]): TimelineEntity = {
+ entity.addOtherInfo(FIELD_ATTEMPT_ID,
+ buildApplicationAttemptIdField(sparkApplicationAttemptId))
+ // set the start info
+ started(entity, startTime)
+ // add the end info
+ entity.addPrimaryFilter(FILTER_APP_END, FILTER_APP_END_VALUE)
+ entity.addOtherInfo(FIELD_END_TIME, endtime)
+ // this must be the end time
+ entity.addOtherInfo(FIELD_LAST_UPDATED, endtime)
+ entity
+ }
+
+ /**
+ * Add the information to declare that an application has started and that
+ * it has a start time.
+ *
+ * @param entity entity to update
+ * @param startTime start time.
+ * @return the updated entity
+ */
+ def started(entity: TimelineEntity, startTime: Long): TimelineEntity = {
+ entity.addPrimaryFilter(FILTER_APP_START, FILTER_APP_START_VALUE)
+ entity.setStartTime(startTime)
+ entity.addOtherInfo(FIELD_START_TIME, startTime)
+ entity.addOtherInfo(FIELD_LAST_UPDATED, startTime)
+ entity
+ }
+
+ /**
+ * Simple sanity checks for endpoint address, including hostname lookup.
+ *
+ * This can be used to help validate the address on startup, to postpone
+ * later delays.
+ *
+ * @param endpoint address of service to talk to
+ * @return the URL passed in
+ */
+ def validateEndpoint(endpoint: URI): URI = {
+ val host = endpoint.getHost
+ if (host == null || host == "0.0.0.0") {
+ throw new NoRouteToHostException(s"Invalid host in $endpoint" +
+ s" - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort")
+ }
+ val port = endpoint.getPort
+ if (port == 0) {
+ throw new NoRouteToHostException(s"Invalid Port in $endpoint" +
+ s" - see https://wiki.apache.org/hadoop/UnsetHostnameOrPort")
+ }
+ // get the address; will trigger a hostname lookup failure if the
+ // host is not resolveable.
+ val addr = new InetSocketAddress(host, port)
+ endpoint
+ }
+}
diff --git a/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/package.scala b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/package.scala
new file mode 100644
index 000000000000..809dd399245a
--- /dev/null
+++ b/yarn/src/history/main/scala/org/apache/spark/deploy/history/yarn/package.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.history
+
+/**
+ * Contains the classes needed to listen to spark events and publish them to a YARN application
+ * timeline service.
+ *
+ * How it works
+ *
+ * 1. `YarnEventListener` subscribes to events in the current spark context.
+ *
+ * 2. These are forwarded to an instance of `YarnHistoryService`.
+ *
+ * 3. This, if enabled, publishes events to the configured ATS server.
+ *
+ * 4. The Spark History Service, is configured to use `YarnHistoryProvider`
+ * as its provider of history information.
+ *
+ * 5. It enumerates application instances and attempts, for display in the web UI and access
+ * via the REST API.
+ *
+ * 6. When details of a specific attempt is requested, it is retrieved from the ATS server.
+ *
+ * See: [[http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/TimelineServer.html]]
+ */
+package object yarn {
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/AbstractYarnHistoryTests.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/AbstractYarnHistoryTests.scala
new file mode 100644
index 000000000000..d0fc8048c07e
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/AbstractYarnHistoryTests.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.testtools
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.{LocalSparkContext, Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.deploy.history.yarn.{YarnEventListener, YarnHistoryService}
+import org.apache.spark.scheduler.cluster.SchedulerExtensionServiceBinding
+
+/**
+ * This is a base class for the YARN history test suites, both mock and integration
+ *
+ * Subclasses are expected to use traits and/or overriding of
+ * [[ContextSetup.setupConfiguration()]]
+ * to tune the configuration of the instantiated context.
+ *
+ * To avoid any ambiguity about the ordering/invocation of
+ * any before/after code, the operations are passed to
+ * overriddeable `setup()` and `teardown()`
+ * invocations. Subclasses must relay the method calls to their
+ * superclasses to ensure correct setup and teardown.
+ */
+abstract class AbstractYarnHistoryTests
+ extends SparkFunSuite
+ with TimelineOptionsInContext
+ with TimelineServiceDisabled
+ with HistoryServiceNotListeningToSparkContext
+ with LocalSparkContext
+ with BeforeAndAfter
+ with Logging
+ with ExtraAssertions
+ with Matchers {
+
+ /*
+ * Setup phase creates the spark context, and anything else which tests require
+ */
+ before {
+ setup()
+ }
+
+ /*
+ * Setup creates the spark context
+ */
+ protected def setup(): Unit = {
+ val sparkConf = new SparkConf()
+
+ if (sc != null) {
+ fail("Spark Context is not null -a previous test did not clean up properly")
+ }
+
+ sc = createSparkContext(sparkConf)
+ logDebug(s"Created context $sc")
+ }
+
+ /**
+ * Create the spark context
+ * @param sparkConf configuration to extend
+ */
+ protected def createSparkContext(sparkConf: SparkConf): SparkContext = {
+ sparkConf.setMaster("local").setAppName("AbstractYarnHistoryTests")
+ logInfo("Creating a new spark context")
+ new SparkContext(setupConfiguration(sparkConf))
+ }
+
+ /**
+ * Create and start a history service.
+ *
+ * @param sc context
+ * @param id application ID
+ * @param appAttemptId optional attempt ID
+ * @return the instantiated service
+ */
+ protected def startHistoryService(
+ sc: SparkContext,
+ id: ApplicationId = applicationId,
+ appAttemptId: Option[ApplicationAttemptId] = Some(attemptId)): YarnHistoryService = {
+ assertNotNull(sc, "Spark context")
+ val service = new YarnHistoryService()
+ service.start(SchedulerExtensionServiceBinding(sc, id, appAttemptId))
+ assert(YarnHistoryService.StartedState === service.serviceState, s"wrong state: $service")
+ service
+ }
+
+ /**
+ * Create a history service, post an event sequence, then stop the service.
+ * The @ of attempts posted is returned in the response to stop tests being brittle
+ * against the numbe of events posted automatically. This is the code which posts the events
+ * -it's the code that can reliably announce this.
+ * @param sc context
+ * @return (the (now closed) history service, the count of events posted for use in assertions)
+ */
+ def postEvents(sc: SparkContext): (YarnHistoryService, Int) = {
+ val service: YarnHistoryService = startHistoryService(sc)
+ val listener = new YarnEventListener(sc, service)
+ var eventsPosted = 0
+ try {
+ listener.onApplicationStart(applicationStart)
+ listener.onEnvironmentUpdate(environmentUpdate)
+ listener.onApplicationEnd(applicationEnd)
+ eventsPosted += 3
+ // wait for them all to be processed
+ awaitEventsProcessed(service, eventsPosted, TEST_STARTUP_DELAY)
+ // only 2 post events, as the update does not trigger a post unless batch size == 1
+ awaitAtLeast(2, TEST_STARTUP_DELAY,
+ () => service.postAttempts,
+ () => s"Post count in $service")
+ } finally {
+ // because the events have been processed and waited for,
+ // a duplicate end event will not be posted
+ service.stop()
+ }
+ (service, eventsPosted)
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ContextSetup.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ContextSetup.scala
new file mode 100644
index 000000000000..cbfc208b7e77
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ContextSetup.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.testtools
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+
+/**
+ * Trait implemented by everything setting up a context; the model is that
+ * the traits can be chained, with the final state determined by the order
+ *
+ * 1. base implementation does nothing.
+ * 2. subclass traits are expected to call the superclass first, then
+ * apply their own options.
+ */
+trait ContextSetup {
+
+ def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ sparkConf
+ }
+}
+
+trait TimelineServiceEnabled extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ hadoopOpt(super.setupConfiguration(sparkConf),
+ YarnConfiguration.TIMELINE_SERVICE_ENABLED, "true")
+ }
+}
+
+trait TimelineServiceDisabled extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ hadoopOpt(super.setupConfiguration(sparkConf),
+ YarnConfiguration.TIMELINE_SERVICE_ENABLED, "false")
+ }
+}
+
+/**
+ * Add the timeline options
+ */
+trait TimelineOptionsInContext extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ YarnTestUtils.addBasicTimelineOptions(super.setupConfiguration(sparkConf))
+ }
+}
+
+/**
+ * request that created history services register with the spark context for lifecycle events
+ */
+trait HistoryServiceListeningToSparkContext extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ super.setupConfiguration(sparkConf).set(YarnHistoryService.REGISTER_LISTENER, "true")
+ }
+}
+
+/**
+ * request that created history services are not registered with the spark context for
+ * lifecycle events
+ */
+trait HistoryServiceNotListeningToSparkContext extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ super.setupConfiguration(sparkConf).set(YarnHistoryService.REGISTER_LISTENER, "false")
+ }
+}
+
+/**
+ * Switch to single entry batch sizes
+ */
+trait TimelineSingleEntryBatchSize extends ContextSetup {
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ super.setupConfiguration(sparkConf).set(YarnHistoryService.BATCH_SIZE, "1")
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ExtraAssertions.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ExtraAssertions.scala
new file mode 100644
index 000000000000..e012694a0733
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/ExtraAssertions.scala
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.testtools
+
+import java.util.{Collection => JCollection}
+
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
+import org.scalatest.Assertions
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+
+/**
+ * Miscellaneous extra assertions
+ */
+trait ExtraAssertions extends Logging with Assertions {
+
+ /**
+ * Assert that an exception's toString value contains the supplied text.
+ * If not, an error is logged and the exception is rethrown
+ *
+ * @param ex exception
+ * @param text text
+ */
+ def assertExceptionMessageContains(ex: Exception, text: String): Unit = {
+ if (!ex.toString.contains(text)) {
+ logError(s"Did not find text $text in $ex", ex)
+ throw ex
+ }
+ }
+
+ /**
+ * Check the exception message text and toString check
+ *
+ * @param ex exception to examine
+ * @param messageCheck string to check in the `Exception.getMessage()` string
+ * @param stringCheck string to look for in the `Exception.toString()` string;
+ * if empty the check is skipped.
+ */
+ def assertExceptionDetails(ex: Throwable, messageCheck: String, stringCheck: String): Unit = {
+ assertNotNull(ex.getMessage, s"Exception message in $ex")
+ if (!ex.getMessage.contains(messageCheck)) {
+ throw ex
+ }
+ if (!stringCheck.isEmpty && !ex.toString.contains(stringCheck)) {
+ throw ex
+ }
+ }
+
+ /**
+ * Assert that a value is not null
+ *
+ * @param value value
+ * @param text text for assertion
+ */
+ def assertNotNull(value: Any, text: String): Unit = {
+ assert(value !== null, s"Null value; $text")
+ }
+
+ /**
+ * Assert that an optional value is not `None`.
+ *
+ * @param value value
+ * @param text text for assertion
+ */
+ def assertSome(value: Option[Any], text: String): Unit = {
+ assert(value.nonEmpty, s"optional value is None; $text")
+ }
+
+ /**
+ * Assert that an optional value is `None`
+ *
+ * @param value value
+ * @param text text for assertion
+ */
+ def assertNone(value: Option[Any], text: String): Unit = {
+ assert(value.isEmpty, s"Optional value is $value.get; $text")
+ }
+
+ /**
+ * Assert that a Spark traversable instance is not empty
+ *
+ * @param traversable the traversable to check
+ * @param text text for assertion
+ * @tparam T traversable type
+ */
+ def assertNotEmpty[T](traversable: Traversable[T], text: String): Unit = {
+ assert(traversable.nonEmpty, s"Empty traversable; $text")
+ }
+
+ /**
+ * Assert that a java collection is not empty
+ *
+ * @param collection the collection to check
+ * @param text text for assertion
+ * @tparam T collection type
+ */
+ def assertNotEmpty[T](collection: JCollection[T], text: String): Unit = {
+ assert (!collection.isEmpty, s"Empty collection; $text")
+ }
+
+ /**
+ * Assert the list is of the given size. if not all elements are logged @ error,
+ * then the method raises a failure.
+ *
+ * @param list list to examine
+ * @param expectedSize expected size
+ * @param message error message
+ * @tparam T list type
+ */
+ def assertListSize[T](list: Seq[T], expectedSize: Int, message: String): Unit = {
+ assertNotNull(list, message)
+ if (list.size != expectedSize) {
+ // no match
+ val errorText = s"Wrong list size: expected=$expectedSize actual=${list.size}: $message"
+ logError(errorText)
+ list.foreach { e => logError(e.toString) }
+ fail(errorText)
+ }
+ }
+
+ /**
+ * Assert that a list is Nil (and implicitly, not null)
+ *
+ * If not, an assertion is raised that contains the message and the list
+ * @param list list to check
+ * @param message message to raise
+ * @tparam T list type
+ */
+ def assertNil[T](list: Seq[T], message: String): Unit = {
+ assertNotNull(list, message)
+ if (list != Nil) {
+ fail(message + " " + list)
+ }
+ }
+
+ /**
+ * Assert that a service is not listening
+ *
+ * @param historyService history service
+ */
+ def assertNotListening(historyService: YarnHistoryService): Unit = {
+ assert(!historyService.listening, s"history service is listening for events: $historyService")
+ }
+
+ /**
+ * Assert that the number of events processed matches the number expected
+ *
+ * @param historyService history service
+ * @param expected expected number
+ * @param details text to include in error messages
+ */
+ def assertEventsProcessed(historyService: YarnHistoryService,
+ expected: Int, details: String): Unit = {
+ assertResult(expected, "wrong number of events processed " + details) {
+ historyService.eventsProcessed
+ }
+ }
+
+ /**
+ * Assert that two timeline entities are non-null and equal
+ *
+ * @param expected expected entry
+ * @param actual actual
+ */
+ def assertEntitiesEqual(expected: TimelineEntity, actual: TimelineEntity): Unit = {
+ require(expected != null)
+ require(actual != null)
+ assert(expected === actual,
+ s"Expected ${describeEntity(expected)}; got ${describeEntity(actual)}}")
+ }
+
+ /**
+ * Assert that a service is in a specific state
+ *
+ * @param service service
+ * @param state required state
+ */
+ def assertInState(service: Service, state: Service.STATE): Unit = {
+ assertNotNull(service, "null service")
+ assert(service.isInState(state), s"not in state $state: $service")
+ }
+
+ /**
+ * Assert that a source string contains the `contained` substring.
+ * (This is not a check for a proper subset; equality is also acceptable)
+ * @param source source string
+ * @param contained string to look for
+ */
+ def assertContains(source: String, contained: String, text: String = ""): Unit = {
+ assertNotNull(source, s"$text null source")
+ assertNotNull(contained, s"$text null `contained`")
+ if (!source.contains(contained)) {
+ fail(s"$text -Did not find '$contained' in '$source'")
+ }
+ }
+
+ /**
+ * Assert that a source string does contains the `contained` substring.
+ * @param source source string
+ * @param contained string to look for
+ */
+ def assertDoesNotContain(source: String, contained: String, text: String = ""): Unit = {
+ assertNotNull(source, s"$text null source")
+ assertNotNull(contained, s"$text null `contained`")
+ if (source.contains(contained)) {
+ fail(s"$text -Found '$contained' in '$source'")
+ }
+ }
+
+ /**
+ * Assert that a `[String, String]` map contains a `key:value` mapping,
+ * and that the value contains the specified text.
+ * @param map map to query
+ * @param key key to retrieve
+ * @param text text to look for in the resolved value
+ */
+ protected def assertMapValueContains(map: Map[String, String],
+ key: String, text: String): Unit = {
+ map.get(key) match {
+ case Some(s) =>
+ if (!text.isEmpty && !s.contains(text)) {
+ fail(s"Did not find '$text' in key[$key] = '$s'")
+ }
+
+ case None =>
+ fail(s"No entry for key $key")
+ }
+ }
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/FreePortFinder.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/FreePortFinder.scala
new file mode 100644
index 000000000000..7174f4bbcbc5
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/FreePortFinder.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.testtools
+
+import java.net.{InetAddress, ServerSocket}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Trait to find free ports on localhost
+ */
+trait FreePortFinder {
+
+ /**
+ * Find a free port by listening on port 0
+ * @return
+ */
+ def findPort(): Int = {
+ tryToListen(0)._2
+ }
+
+ /**
+ * Simple function to see if a port is free; if it is return the address and the port allocated.
+ *
+ * This function can be passed to `Util.startServiceOnPort`
+ * @param port port to try. If 0, the OS chooses the port
+ * @return an (address, port) tuple
+ */
+ def tryToListen(port: Int): (InetAddress, Int) = {
+ val socket = new ServerSocket(port)
+ val address = socket.getInetAddress
+ val localPort = socket.getLocalPort
+ socket.close()
+ (address, localPort)
+ }
+
+ /**
+ * Return the value of the local host address -defaults to 127.0.0.1
+ * @return the address to use for local/loopback addresses.
+ */
+ def localIPv4Address(): String = {
+ "127.0.0.1"
+ }
+
+ /**
+ * Get a local address as an address:port string and an integer port value
+ * @return a free port to bind to
+ */
+ def findIPv4AddressAsPortPair(): (String, Int) = {
+ val port = findPort()
+ (localhostAndPort(port), port)
+ }
+
+ /**
+ * Given a port, return a localhost:port pair
+ * @param port port
+ * @return the name for the localhost for test runs.
+ */
+ def localhostAndPort(port: Int): String = {
+ localIPv4Address() + ":" + port
+ }
+
+ /**
+ * Find the specified number of unique ports. This is done in parallel, so the
+ * ports are guaranteed to be different. The ports are all closed afterwards,
+ * so other network services started may grab those same ports.
+ *
+ * @param count number of ports to find.
+ * @return a list of ports to use.
+ */
+ def findUniquePorts(count: Integer): Seq[Integer] = {
+ val sockets = new ListBuffer[ServerSocket]()
+ val ports = new ListBuffer[Integer]()
+ for (i <- 1 to count) {
+ val socket = new ServerSocket(0)
+ sockets += socket
+ ports += socket.getLocalPort
+ }
+ sockets.foreach(_.close())
+ // Uniqueness: foreach port, there only exists one port in the list which has that value
+ require(ports.map( p => ports.count(_ == p) == 1).count(_ == true) == count,
+ s"Duplicate port allocation in " + ports.mkString(","))
+
+ ports
+ }
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/YarnTestUtils.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/YarnTestUtils.scala
new file mode 100644
index 000000000000..8ee3c3a0bf5f
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/testtools/YarnTestUtils.scala
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.testtools
+
+import java.io.IOException
+import java.net.URL
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.deploy.history.yarn.{YarnHistoryService, YarnTimelineUtils}
+import org.apache.spark.scheduler.cluster.{StubApplicationAttemptId, StubApplicationId}
+import org.apache.spark.scheduler.{JobFailed, JobSucceeded, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEnvironmentUpdate, SparkListenerEvent, SparkListenerJobEnd, SparkListenerJobStart}
+import org.apache.spark.util.Utils
+
+object YarnTestUtils extends ExtraAssertions with FreePortFinder {
+
+ val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
+ "JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
+ "Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
+ "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
+ "Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))))
+
+ /**
+ * Application name used in the app start event and tests
+ */
+ val APP_NAME = "spark-demo"
+
+ /**
+ * User submitting the job
+ */
+ val APP_USER = "data-scientist"
+
+ /**
+ * application ID
+ */
+ val APP_ID = "application_id_0001"
+
+ /**
+ * Spark option to set for the history provider
+ */
+ val SPARK_HISTORY_PROVIDER = "spark.history.provider"
+
+ /**
+ * Constant used to define history port in Spark `HistoryServer` class
+ */
+ val SPARK_HISTORY_UI_PORT = "spark.history.ui.port"
+
+ val completedJobsMarker = "Completed Jobs (1)"
+ val activeJobsMarker = "Active Jobs (1)"
+
+
+ /**
+ * Time to wait for anything to start/state to be reached
+ */
+ val TEST_STARTUP_DELAY = 5000
+
+ /** probes during service shutdown need to handle delayed posting */
+ val SERVICE_SHUTDOWN_DELAY = 10000
+
+ /**
+ * Cancel a test if the network isn't there.
+ *
+ * If called during setup, this will abort the test
+ */
+ def cancelIfOffline(): Unit = {
+
+ try {
+ val hostname = Utils.localHostName()
+ log.debug(s"local hostname is $hostname")
+ } catch {
+ case ex: IOException =>
+ cancel(s"Localhost name not known: $ex", ex)
+ }
+ }
+
+ /**
+ * Return a time value
+ *
+ * @return the current time in milliseconds
+ */
+ def now(): Long = {
+ System.currentTimeMillis()
+ }
+
+ /**
+ * Get a time in the future
+ *
+ * @param millis future time in millis
+ * @return now + the time offset
+ */
+ def future(millis: Long): Long = {
+ now() + millis
+ }
+
+ /**
+ * Log an entry with a line either side. This aids splitting up tests from the noisy logs
+ *
+ * @param text text to log
+ */
+ def describe(text: String): Unit = {
+ logInfo(s"\nTest:\n $text\n\n")
+ }
+
+ /**
+ * Set a hadoop opt in the config.
+ *
+ * This adds the `"spark.hadoop."` prefix to all entries which do not already have it
+ *
+ * @param sparkConfig target configuration
+ * @param key hadoop option key
+ * @param value value
+ */
+ def hadoopOpt(sparkConfig: SparkConf, key: String, value: String): SparkConf = {
+ if (key.startsWith("spark.hadoop.")) {
+ sparkConfig.set(key, value)
+ } else {
+ sparkConfig.set("spark.hadoop." + key, value)
+ }
+ }
+
+ /**
+ * Bulk set of an entire map of Hadoop options
+ *
+ * @param sparkConfig target configuration
+ * @param options option map
+ */
+ def applyHadoopOptions(sparkConfig: SparkConf, options: Map[String, String]): SparkConf = {
+ options.foreach( e => hadoopOpt(sparkConfig, e._1, e._2))
+ sparkConfig
+ }
+
+ /**
+ * Apply the basic timeline options to the hadoop config
+ *
+ * @return the modified config
+ */
+ def addBasicTimelineOptions(sparkConf: SparkConf): SparkConf = {
+ val ports = findUniquePorts(3)
+ applyHadoopOptions(sparkConf,
+ Map(YarnConfiguration.TIMELINE_SERVICE_ENABLED -> "true",
+ YarnConfiguration.TIMELINE_SERVICE_ADDRESS -> localhostAndPort(ports.head),
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS -> localhostAndPort(ports(1)),
+ YarnConfiguration.TIMELINE_SERVICE_STORE -> classOf[MemoryTimelineStore].getName,
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES -> "1",
+ YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS -> "200"))
+
+ // final port in the set
+ sparkConf.set(SPARK_HISTORY_UI_PORT, ports(2).toString)
+
+ // turn off the minimum refresh interval.
+ // uses a string to it can be set from code that doesn't refer to any provider-side
+ // classes
+ sparkConf.set("spark.history.yarn.min-refresh-interval", "0")
+
+ // shorter reset interval and shutdown time
+ sparkConf.set(POST_RETRY_INTERVAL, "10ms")
+ sparkConf.set(SHUTDOWN_WAIT_TIME, "5s")
+ }
+
+ /**
+ * Convert the single timeline event in a timeline entity to a spark event
+ *
+ * @param entity entity to convert, which must contain exactly one event.
+ * @return the converted event
+ */
+ def convertToSparkEvent(entity: TimelineEntity): SparkListenerEvent = {
+ assertResult(1, "-wrong # of events in the timeline entry") {
+ entity.getEvents().size()
+ }
+ YarnTimelineUtils.toSparkEvent(entity.getEvents().get(0))
+ }
+
+ /**
+ * Create an app start event, using the fixed [[APP_NAME]] and [[APP_USER]] values
+ * for appname and user; no attempt ID
+ *
+ * @param time application start time
+ * @param appId event ID; default is [[APP_ID]]
+ * @return the event
+ */
+ def appStartEventWithAttempt(time: Long = 1,
+ appId: String,
+ user: String,
+ attemptId: ApplicationAttemptId): SparkListenerApplicationStart = {
+ appStartEvent(time, appId, user, Some(attemptId.toString))
+ }
+
+ /**
+ * Create an app start event
+ *
+ * @param time application start time
+ * @param appId event ID; default is [[APP_ID]]
+ * @param user the user; defaults is [[APP_USER]]
+ * @param attempt attempt ID; default is `None`
+ * @return the event
+ */
+ def appStartEvent(time: Long = 1434920400000L,
+ appId: String = APP_ID,
+ user: String = APP_USER,
+ attempt: Option[String] = None,
+ name: String = APP_NAME): SparkListenerApplicationStart = {
+ require(name != null)
+ require(appId != null)
+ require(user != null)
+ SparkListenerApplicationStart(name, Some(appId), time, user, attempt)
+ }
+
+ def appStartEvent(time: Long, ctx: SparkContext): SparkListenerApplicationStart = {
+ appStartEvent(time, ctx.applicationId, ctx.sparkUser, ctx.applicationAttemptId)
+ }
+
+ def appStopEvent(time: Long = 1): SparkListenerApplicationEnd = {
+ new SparkListenerApplicationEnd(time)
+ }
+
+ def jobStartEvent(time: Long, id: Int) : SparkListenerJobStart = {
+ new SparkListenerJobStart(id, time, Nil, null)
+ }
+
+ def jobSuccessEvent(time: Long, id: Int) : SparkListenerJobEnd = {
+ new SparkListenerJobEnd(id, time, JobSucceeded)
+ }
+
+ def jobFailureEvent(time: Long, id: Int, ex: Exception) : SparkListenerJobEnd = {
+ new SparkListenerJobEnd(id, time, JobFailed(ex))
+ }
+
+ def newEntity(time: Long): TimelineEntity = {
+ val entity = new TimelineEntity
+ entity.setStartTime(time)
+ entity.setEntityId("post")
+ entity.setEntityType(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE)
+ entity
+ }
+
+ val applicationId: ApplicationId = new StubApplicationId(0, 1111L)
+ val attemptId: ApplicationAttemptId = new StubApplicationAttemptId(applicationId, 1)
+ val applicationStart = appStartEventWithAttempt(now(), applicationId.toString, "bob", attemptId)
+ val applicationEnd = SparkListenerApplicationEnd(now() + 60000)
+
+ /**
+ * Outcomes of probes
+ */
+ sealed abstract class Outcome
+ case class Fail() extends Outcome
+ case class Retry() extends Outcome
+ case class Success() extends Outcome
+ case class TimedOut() extends Outcome
+
+ /**
+ * Spin and sleep awaiting an observable state.
+ *
+ * The scalatest `eventually` operator is similar, and even adds exponential backoff.
+ * What this offers is:
+ *
+ * 1. The ability of the probe to offer more than just success/fail, but a "fail fast"
+ * operation which stops spinning early.
+ * 2. A detailed operation to invoke on failure, so provide more diagnostics than
+ * just the assertion.
+ *
+ * @param interval sleep interval
+ * @param timeout time to wait
+ * @param probe probe to execute
+ * @param failure closure invoked on timeout/probe failure
+ */
+ def spinForState(description: String,
+ interval: Long,
+ timeout: Long,
+ probe: () => Outcome,
+ failure: (Outcome, Int, Boolean) => Unit): Unit = {
+ logInfo(description)
+ val timelimit = now() + timeout
+ var result: Outcome = Retry()
+ var current = 0L
+ var iterations = 0
+ do {
+ iterations += 1
+ result = probe()
+ if (result == Retry()) {
+ // probe says retry
+ current = now()
+ if (current> timelimit) {
+ // timeout, uprate to fail
+ result = TimedOut()
+ } else {
+ Thread.sleep(interval)
+ }
+ }
+ } while (result == Retry())
+ result match {
+ case Fail() => failure(result, iterations, false)
+ case TimedOut() => failure(result, iterations, true)
+ case _ =>
+ }
+ }
+
+ /**
+ * Convert a boolean into a success or retry outcome, that is:
+ * false is considered "retry", not a failure
+ *
+ * @param value value to probe
+ * @return
+ */
+ def outcomeFromBool(value: Boolean): Outcome = {
+ if (value) Success() else Retry()
+ }
+
+ /**
+ * From an increasing counter, compare the results and decide whether to succeed, fail or try
+ * again. Requires that if actual is greater than expected, it is a failed state.
+ *
+ * @param expected expected outcome
+ * @param actual actual value
+ */
+ def outcomeFromCounter(expected: Long, actual: Long): Outcome = {
+ if (expected == actual) {
+ Success()
+ } else if (actual < expected) {
+ Retry()
+ } else {
+ Fail()
+ }
+ }
+
+ /**
+ * From an increasing counter, compare the results and decide whether to succeed or try
+ * again. Any value equal to or greater than expected is a success. Ideal for waiting for
+ * asynchronous operations to complete
+ * @param expected expected outcome
+ * @param actual actual value
+ */
+ def outcomeAtLeast(expected: Long, actual: Long): Outcome = {
+ if (actual >= expected) Success() else Retry()
+ }
+
+ /**
+ * Curryable function to use for timeouts if something more specific is not needed
+ *
+ * @param text text mesage on timeouts
+ * @param iterations number of iterations performed
+ * @param timeout true if the event was a timeout (i.e. not a failure)
+ */
+ def timeout(text: String, iterations: Int, timeout: Boolean): Unit = {
+ fail(text)
+ }
+
+ /**
+ * a No-op on failure
+ *
+ * @param outcome outcome of the last operation
+ * @param iterations number of iterations performed
+ * @param timeout did the wait result in a timeout
+ */
+ def failure_noop(outcome: Outcome, iterations: Int, timeout: Boolean): Unit = {
+ }
+
+ /**
+ * Spin for the number of processed events to exactly match the supplied value.
+ *
+ * Fails if the timeout is exceeded
+ *
+ * @param historyService history
+ * @param expected exact number to await
+ * @param timeout timeout in millis
+ */
+ def awaitEventsProcessed(historyService: YarnHistoryService,
+ expected: Int, timeout: Long): Unit = {
+
+ def eventsProcessedCheck(): Outcome = {
+ outcomeFromCounter(expected, historyService.eventsProcessed)
+ }
+
+ def eventProcessFailure(outcome: Outcome, iterations: Int, timeout: Boolean): Unit = {
+ val eventsCount = historyService.eventsProcessed
+ val details = s"Expected $expected events" + s" actual=$eventsCount" +
+ s" after $iterations iterations; in $historyService"
+ if (timeout) {
+ val message = s"Timeout: $details"
+ logError(message)
+ fail(message)
+ } else if (expected != eventsCount) {
+ val message = s"event count mismatch; $details;"
+ logError(message)
+ fail(message)
+ fail(s"Expected $details")
+ }
+ }
+
+ spinForState("awaitEventsProcessed",
+ interval = 50,
+ timeout = timeout,
+ probe = eventsProcessedCheck,
+ failure = eventProcessFailure)
+ }
+
+ /**
+ * Spin awaiting a URL to be accessible. Useful to await a web application
+ * going live before running the tests against it
+ *
+ * @param url URL to probe
+ * @param timeout timeout in mils
+ */
+ def awaitURL(url: URL, timeout: Long): Unit = {
+ def probe(): Outcome = {
+ try {
+ url.openStream().close()
+ Success()
+ } catch {
+ case ioe: IOException => Retry()
+ }
+ }
+
+ /*
+ failure action is simply to attempt the connection without
+ catching the exception raised
+ */
+ def failure(outcome: Outcome, iterations: Int, timeout: Boolean): Unit = {
+ url.openStream().close()
+ }
+
+ spinForState(s"Awaiting a response from URL $url",
+ interval = 50, timeout = timeout, probe = probe, failure = failure)
+ }
+
+
+ /**
+ * Wait for a history service's queue to become empty
+ *
+ * @param historyService service
+ * @param timeout timeout
+ */
+ def awaitEmptyQueue(historyService: YarnHistoryService, timeout: Long): Unit = {
+
+ spinForState("awaiting empty queue",
+ interval = 50,
+ timeout = timeout,
+ probe = () => outcomeFromBool(historyService.postingQueueSize == 0),
+ failure = (_, _, _) => fail(s"queue never cleared after $timeout mS for $historyService"))
+ }
+
+ /**
+ * Await for the count of flushes in the history service to match the expected value
+ *
+ * @param historyService service
+ * @param count min number of flushes
+ * @param timeout timeout
+ */
+ def awaitFlushCount(historyService: YarnHistoryService, count: Long, timeout: Long): Unit = {
+ spinForState(s"awaiting flush count of $count",
+ interval = 50,
+ timeout = timeout,
+ probe = () => outcomeFromBool(historyService.getFlushCount >= count),
+ failure = (_, _, _) => fail(s"flush count not $count after $timeout mS in $historyService"))
+ }
+
+ /**
+ * Await the number of post events
+ * @param service service
+ * @param posts attempt count.
+ */
+ def awaitPostAttemptCount(service: YarnHistoryService, posts: Long): Unit = {
+ awaitCount(posts, TEST_STARTUP_DELAY,
+ () => service.postAttempts,
+ () => s"Post count in $service")
+ }
+
+ /**
+ * Await the number of post events
+ *
+ * @param service service
+ * @param posts attempt count.
+ */
+ def awaitPostSuccessCount(service: YarnHistoryService, posts: Long): Unit = {
+ awaitCount(posts, TEST_STARTUP_DELAY,
+ () => service.postSuccesses,
+ () => s"Post count in $service")
+ }
+
+ /**
+ * Await for the counter function to match the expected value
+ *
+ * @param expected desired count
+ * @param timeout timeout
+ * @param counter function to return an integer
+ * @param diagnostics diagnostics string evaluated on timeout
+ */
+ def awaitCount(expected: Long, timeout: Long,
+ counter: () => Long, diagnostics: () => String): Unit = {
+ spinForState(s"awaiting probe count of $expected",
+ 50, timeout,
+ () => outcomeFromCounter(expected, counter()),
+ (_, _, _) =>
+ fail(s"Expected $expected equalled ${counter()} after $timeout mS: ${diagnostics()}"))
+ }
+
+ /**
+ * Await for the counter function to match the expected value
+ *
+ * @param expected desired count
+ * @param timeout timeout
+ * @param counter function to return an integer
+ * @param diagnostics diagnostics string evaluated on timeout
+ */
+ def awaitAtLeast(expected: Long, timeout: Long,
+ counter: () => Long, diagnostics: () => String): Unit = {
+ spinForState(s"awaiting probe count of at least $expected",
+ 50, timeout,
+ () => outcomeAtLeast(expected, counter()),
+ (_, _, _) =>
+ fail(s"Expected >= $expected got ${counter()} after $timeout mS: ${diagnostics()}"))
+ }
+
+
+ /**
+ * Probe operation to wait for an empty queue
+ *
+ * @param historyService history service
+ * @param timeout timeout in milliseconds
+ * @param failOnTimeout flag -fail vs warn on timeout. Default: true
+ */
+ def awaitServiceThreadStopped(historyService: YarnHistoryService, timeout: Long,
+ failOnTimeout: Boolean = true): Unit = {
+ assertNotNull(historyService, "null historyService")
+ spinForState("awaitServiceThreadStopped",
+ interval = 50,
+ timeout = timeout,
+ probe = () => outcomeFromBool(!historyService.isPostThreadActive),
+ failure = (_, _, _) => if (failOnTimeout) {
+ fail(s"After $timeout mS, history service post thread did not finish:" +
+ s" $historyService")
+ } else {
+ logWarning(s"After $timeout mS, history service post thread did not finish:" +
+ s" $historyService")
+ })
+ }
+
+ /**
+ * Wait for a specified operation to return a list of the desired size
+ *
+ * @param expectedSize expected size of list
+ * @param message message on failure
+ * @param timeout timeout
+ * @param operation operation to create the list
+ * @tparam T list type
+ * @return the list created in the last successful operation
+ */
+ def awaitListSize[T](expectedSize: Int, message: String, timeout: Long,
+ operation: () => List[T]): List[T] = {
+ // last list fetched
+ var list: List[T] = Nil
+ def probeOperation(): Outcome = {
+ list = operation()
+ outcomeFromBool(list.size == expectedSize)
+ }
+ def failOperation(o: Outcome, i: Int, b: Boolean) = {
+ assertListSize(list, expectedSize, message)
+ }
+ spinForState(message, 50, timeout, probeOperation, failOperation)
+ list
+ }
+
+ /**
+ * Show a Spark context in a string form
+ * @param ctx context
+ * @return a string value for assertions and other messages
+ */
+ def asString(ctx: SparkContext): String = {
+ s"Spark Context ${ctx.appName} ID ${ctx.applicationId} attempts ${ctx.applicationAttemptId}"
+ }
+
+}
+
+
+
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/AbstractMockHistorySuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/AbstractMockHistorySuite.scala
new file mode 100644
index 000000000000..69a90865abc3
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/AbstractMockHistorySuite.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.hadoop.service.ServiceOperations
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.testtools.{AbstractYarnHistoryTests, TimelineServiceEnabled}
+import org.apache.spark.scheduler.cluster.SchedulerExtensionServiceBinding
+
+/**
+ * Mock histories have the timeline service enabled by default -it is
+ * a mock one though.
+ */
+class AbstractMockHistorySuite() extends AbstractYarnHistoryTests
+ with TimelineServiceEnabled with MockitoSugar {
+
+ protected var timelineClient: TimelineClient = _
+
+ protected var response: TimelinePutResponse = _
+
+ /**
+ * Set up the mock timeline client and response instances.
+ */
+ override protected def setup(): Unit = {
+ super.setup()
+ timelineClient = mock[TimelineClient]
+ response = mock[TimelinePutResponse]
+ when(timelineClient.putEntities(any(classOf[TimelineEntity]))).thenReturn(response)
+ }
+
+ override def afterEach(): Unit = {
+ ServiceOperations.stopQuietly(timelineClient)
+ super.afterEach()
+ }
+
+ /**
+ * Create and start a history service.
+ * @param sc context
+ * @param id application ID
+ * @param attemptId
+ * @return the instantiated service
+ */
+ override protected def startHistoryService(
+ sc: SparkContext,
+ id: ApplicationId,
+ attemptId: Option[ApplicationAttemptId] = None): YarnHistoryService = {
+ val service = spy(new YarnHistoryService())
+ assertNotNull(timelineClient, "timeline client")
+ doReturn(timelineClient).when(service).createTimelineClient()
+ service.start(SchedulerExtensionServiceBinding(sc, id, attemptId))
+ service
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/EventMarshallingSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/EventMarshallingSuite.scala
new file mode 100644
index 000000000000..a3dad56edd08
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/EventMarshallingSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import java.io.IOException
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.deploy.history.yarn.testtools.ExtraAssertions
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.{AccumulableInfo, JobSucceeded, SparkListenerBlockUpdated, SparkListenerEvent, SparkListenerJobEnd, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskGettingResult, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
+
+/**
+ * Test low-level marshalling, robustness and quality of exception messages.
+ */
+class EventMarshallingSuite extends SparkFunSuite
+ with BeforeAndAfter with Logging with ExtraAssertions {
+
+ val stageInfo = new StageInfo(12, 13, "stageinfo-1", 4, Nil, Nil, "staged info")
+
+ val taskInfo = new TaskInfo(100, 101, 102, 103, "executor", "host", TaskLocality.ANY, true)
+
+ before {
+ stageInfo.submissionTime = Some(100000)
+ stageInfo.completionTime = Some(200000)
+ stageInfo.failureReason = Some("network problems")
+ val ai = new AccumulableInfo(1, "accumulator", Some("update"), "value", false)
+ stageInfo.accumulables.put(1, ai)
+ }
+
+ test("unmarshall empty event") {
+ val event = new TimelineEvent
+ val ex = intercept[IOException] {
+ toSparkEvent(event)
+ }
+ assertExceptionMessageContains(ex, E_EMPTY_EVENTINFO)
+ }
+
+ test("unmarshall entity type") {
+ val event = new TimelineEvent
+ event.setEventType(YarnHistoryService.SPARK_EVENT_ENTITY_TYPE)
+ val ex = intercept[IOException] {
+ toSparkEvent(event)
+ }
+ assertExceptionMessageContains(ex, E_EMPTY_EVENTINFO)
+ }
+
+ test("round trip app start") {
+ val startEvent = appStartEvent(1)
+ assert(APP_USER === startEvent.sparkUser)
+ assert(APP_NAME === startEvent.appName)
+ val dest = validateRoundTrip(startEvent)
+ assert(startEvent.time === dest.time )
+ assert(startEvent.sparkUser === dest.sparkUser )
+ assert(APP_NAME === dest.appName)
+ }
+
+ test("round trip app end") {
+ validateRoundTrip(appStopEvent(1))
+ }
+
+ test("SparkListenerStageSubmitted") {
+ val src = new SparkListenerStageSubmitted(stageInfo)
+ val dest = roundTrip(src)
+ assert(isEqual(stageInfo, dest.stageInfo))
+ }
+
+ test("SparkListenerStageCompleted") {
+ val src = new SparkListenerStageCompleted(stageInfo)
+ val dest = roundTrip(src)
+ assert(isEqual(stageInfo, dest.stageInfo))
+ }
+
+ test("SparkListenerTaskStart") {
+ val src = new SparkListenerTaskStart(1, 2, taskInfo)
+ val dest = roundTrip(src)
+ assert(isEqual(taskInfo, dest.taskInfo))
+ }
+
+ test("SparkListenerTaskGettingResult") {
+ val src = new SparkListenerTaskGettingResult(taskInfo)
+ val dest = roundTrip(src)
+ assert(isEqual(taskInfo, dest.taskInfo))
+ }
+
+ test("SparkListenerJobEnd") {
+ val endTime = 3000L
+ val id = 3
+ val result = JobSucceeded
+ val src = new SparkListenerJobEnd(id, endTime, result)
+ val dest = roundTrip(src)
+ assert(endTime === dest.time)
+ assert(id === dest.jobId)
+ assert(result === dest.jobResult)
+ }
+
+ test("SparkListenerBlockUpdated is ignored") {
+ assert(toTimelineEvent(new SparkListenerBlockUpdated(null), 0).isEmpty)
+ }
+
+ def validateRoundTrip[T <: SparkListenerEvent](sparkEvt: T): T = {
+ val trip = roundTrip(sparkEvt)
+ assertResult(sparkEvt) {
+ trip
+ }
+ trip
+ }
+
+ /**
+ * Marshall then unmarshall a spark event.
+ *
+ * @param src source
+ * @return a new spark event built from the marshalled JSON value.
+ */
+ private def roundTrip[T <: SparkListenerEvent ](src: T): T = {
+ val event = toSparkEvent(toTimelineEvent(src, 100).get)
+ event.asInstanceOf[T]
+ }
+
+ /**
+ * Task info equality; does not check accumulables.
+ *
+ * @param l left item
+ * @param r right item
+ * @return true if the values are equal
+ */
+ def isEqual(l: TaskInfo, r: TaskInfo) : Boolean = {
+ l.taskId == r.taskId &&
+ l.index == r.index &&
+ l.attemptNumber == r.attemptNumber &&
+ l.executorId == r.executorId &&
+ l.host == r.host &&
+ l.speculative == r.speculative &&
+ l.taskLocality == r.taskLocality &&
+ l.gettingResultTime == r.gettingResultTime &&
+ l.finishTime == r.finishTime &&
+ l.failed == r.failed &&
+ l.accumulables.size == r.accumulables.size
+ }
+
+ def isEqual(l: StageInfo, r: StageInfo): Boolean = {
+ l.stageId == r.stageId &&
+ l.name == r.name &&
+ l.attemptId == r.attemptId &&
+ l.numTasks == r.numTasks &&
+ l.details == r.details &&
+ l.submissionTime == r.submissionTime &&
+ l.completionTime == r.completionTime &&
+ l.failureReason == r.failureReason &&
+ l.accumulables.size == r.accumulables.size
+ }
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryServiceInstantiationSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryServiceInstantiationSuite.scala
new file mode 100644
index 000000000000..7ef1a2970541
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryServiceInstantiationSuite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.testtools.AbstractYarnHistoryTests
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.cluster.{SchedulerExtensionServiceBinding, SchedulerExtensionServices}
+
+/**
+ * Test the integration with [[SchedulerExtensionServices]].
+ */
+class HistoryServiceInstantiationSuite extends AbstractYarnHistoryTests {
+
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ super.setupConfiguration(sparkConf)
+ sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES, YarnHistoryService.CLASSNAME)
+ }
+
+ test("Contains History Service") {
+ val services = new SchedulerExtensionServices
+ try {
+ services.start(SchedulerExtensionServiceBinding(sc, applicationId))
+ val serviceList = services.getServices
+ assert(serviceList.nonEmpty, "empty service list")
+ val Seq(history) = serviceList
+ val historyService = history.asInstanceOf[YarnHistoryService]
+ assert(historyService.serviceState === YarnHistoryService.StartedState)
+ services.stop()
+ assert(historyService.serviceState === YarnHistoryService.StoppedState)
+ } finally {
+ services.stop()
+ }
+ }
+
+}
+
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryWithDisabledTimelineSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryWithDisabledTimelineSuite.scala
new file mode 100644
index 000000000000..b14be6efb6a4
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/HistoryWithDisabledTimelineSuite.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.deploy.history.yarn.testtools.AbstractYarnHistoryTests
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.cluster.SchedulerExtensionServiceBinding
+
+/**
+ * Test that with the timeline service disabled, public operations degrade gracefully.
+ */
+class HistoryWithDisabledTimelineSuite extends AbstractYarnHistoryTests {
+
+ test("BasicLifecycle") {
+ val service = new YarnHistoryService()
+ // verify that the string operator does not fail
+ service.toString()
+
+ service.start(SchedulerExtensionServiceBinding(sc, applicationId, Some(attemptId)))
+ assert(StartedState === service.serviceState, "not stopped : $service")
+ assert(!service.bondedToATS, s"service is bonded to ats $service")
+ assert(!service.listening, s"service is listening $service")
+ assertResult(null, s"service address : $service") {
+ service.timelineWebappAddress
+ }
+ intercept[Exception] {
+ service.timelineClient
+ }
+ assert(!service.isPostThreadActive, s"service post thread active: $service")
+
+ // verify that the string operator does not fail
+ service.toString()
+ service.stop()
+ assert(StoppedState === service.serviceState, "not stopped : $service")
+ // verify that the string operator does not fail
+ service.toString()
+ }
+
+ test("QueueAndFlush") {
+ val service = new YarnHistoryService()
+ try {
+ service.start(SchedulerExtensionServiceBinding(sc, applicationId, Some(attemptId)))
+ service.enqueue(appStartEvent())
+ service.enqueue(appStopEvent())
+
+ assert(0 === service.eventsQueued, "queue")
+
+ service.asyncFlush()
+ assert(0 === service.getFlushCount, "flush count")
+
+ service.stop()
+ assert(0 === service.getFlushCount, "flush count")
+ } finally {
+ service.stop()
+ }
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockBatchingTimelinePostSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockBatchingTimelinePostSuite.scala
new file mode 100644
index 000000000000..94dd20faa6dd
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockBatchingTimelinePostSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+import org.apache.spark.deploy.history.yarn.testtools.TimelineSingleEntryBatchSize
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+
+/**
+ * Mock tests with Batch size 1.
+ */
+class MockBatchingTimelinePostSuite extends AbstractMockHistorySuite
+ with TimelineSingleEntryBatchSize {
+
+ test("retry upload on failure") {
+ describe("mock failures, verify retry count incremented")
+ // timeline client to throw an RTE on the first put
+ when(timelineClient.putEntities(any(classOf[TimelineEntity])))
+ .thenThrow(new RuntimeException("triggered"))
+ .thenReturn(response)
+
+ val (service, eventsPosted) = postEvents(sc)
+ // now await some retries asynchronously
+ awaitAtLeast(2, TEST_STARTUP_DELAY,
+ () => service.postAttempts,
+ () => s"Post count in $service")
+ service.stop()
+ awaitServiceThreadStopped(service, TEST_STARTUP_DELAY)
+ // there should have been three flushed
+ assert(eventsPosted === service.getFlushCount, s"expected $eventsPosted flushed for $service" )
+ verify(timelineClient, times(service.postAttempts.toInt))
+ .putEntities(any(classOf[TimelineEntity]))
+ }
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryBulkPostingSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryBulkPostingSuite.scala
new file mode 100644
index 000000000000..28f33fba2373
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryBulkPostingSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import java.io.IOException
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.yarn.YarnEventListener
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.SparkListenerJobStart
+
+/**
+ * Mock event posting
+ */
+class MockHistoryBulkPostingSuite extends AbstractMockHistorySuite {
+
+ val batchSize = 5
+ val queueLimit = 5
+
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ val conf = super.setupConfiguration(sparkConf)
+ conf.set(BATCH_SIZE, batchSize.toString())
+ conf.set(POST_EVENT_LIMIT, queueLimit.toString)
+ conf.set(POST_RETRY_INTERVAL, "0ms")
+ }
+
+ test("Massive Event Posting") {
+ describe("Post many events to a failing")
+ // timeline client to throw an exception on every POST
+
+ when(timelineClient.putEntities(any(classOf[TimelineEntity])))
+ .thenThrow(new IOException("triggered"))
+
+ val service = startHistoryService(sc)
+ try {
+ val listener = new YarnEventListener(sc, service)
+ // start
+ listener.onApplicationStart(applicationStart)
+ // post many more events
+ 1 to (batchSize * 2 * queueLimit ) foreach { t =>
+ listener.onJobStart(new SparkListenerJobStart(1, t, Nil))
+ }
+ // events dropped
+ awaitAtLeast(batchSize, TEST_STARTUP_DELAY,
+ () => service.eventsDropped,
+ () => service.toString())
+
+ // posts failed
+ awaitAtLeast(10, SERVICE_SHUTDOWN_DELAY,
+ () => service.postFailures,
+ () => service.toString())
+
+ // now trigger a service shutdown with the blocking queue
+ describe("Service stop")
+ service.stop()
+ awaitServiceThreadStopped(service, SERVICE_SHUTDOWN_DELAY)
+ logDebug(s"$service")
+ } finally {
+ logDebug("stopping service in finally() clause")
+ service.stop()
+ }
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryEventPostingSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryEventPostingSuite.scala
new file mode 100644
index 000000000000..18f35284e449
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryEventPostingSuite.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, TimelinePutResponse}
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.deploy.history.yarn.YarnEventListener
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+
+/**
+ * Mock event posting.
+ */
+class MockHistoryEventPostingSuite extends AbstractMockHistorySuite {
+
+ /*
+ * Make sure the low-level stuff the other tests depend on is there
+ */
+ test("Timeline client") {
+ describe("low-level timeline client test")
+ assert(response === timelineClient.putEntities(new TimelineEntity))
+ verify(timelineClient).putEntities(any(classOf[TimelineEntity]))
+ }
+
+ test("Event Queueing") {
+ describe("event queueing")
+ val (history, eventsPosted) = postEvents(sc)
+ awaitEventsProcessed(history, eventsPosted, TEST_STARTUP_DELAY)
+ }
+
+ test("batch processing of Spark listener events") {
+ val (historyService, _) = postEvents(sc)
+ verify(timelineClient, times(historyService.postAttempts.toInt))
+ .putEntities(any(classOf[TimelineEntity]))
+ }
+
+ test("PostEventsNoServiceStop") {
+ describe("verify that events are pushed on any triggered flush," +
+ " even before a service is stopped")
+ val service = startHistoryService(sc)
+ try {
+ val listener = new YarnEventListener(sc, service)
+ listener.onApplicationStart(applicationStart)
+ service.asyncFlush()
+ awaitEventsProcessed(service, 1, TEST_STARTUP_DELAY)
+ awaitFlushCount(service, 1, TEST_STARTUP_DELAY)
+ awaitPostAttemptCount(service, 1)
+ logDebug(s"$service")
+ verify(timelineClient, times(1)).putEntities(any(classOf[TimelineEntity]))
+ } finally {
+ logDebug("stopping service in finally() clause")
+ service.stop()
+ }
+ }
+
+ /**
+ * This is a convoluted little test designed to verify something: stopping
+ * the service while it is waiting for something to happen in the thread posting
+ * service will cause the thread to complete successfully.
+ *
+ * That is: even though the YARN code may potentially swallow interrupted exceptions,
+ * once it returns, the exit flag is picked up and the post thread switches
+ * into fast shutdown mode, attempting to post any remaining threads.
+ */
+ test("PostEventsBlockingOperation") {
+ describe("verify that events are pushed on any triggered flush," +
+ " even before a service is stopped")
+ val entered = new AtomicBoolean(false)
+ val exit = new AtomicBoolean(false)
+
+ /**
+ * Set a flag ang then notify the listeners
+ * @param b atomic bool flag to set
+ */
+ def setAndNotify(b: AtomicBoolean): Unit = {
+ b.synchronized {
+ b.set(true)
+ b.notify()
+ }
+ }
+
+ // wait for a boolean to be set; interrupts are discarded.
+ def waitForSet(b: AtomicBoolean, timeout: Long): Unit = {
+ b.synchronized {
+ while (!b.get()) {
+ try {
+ b.wait(timeout)
+ if (!b.get()) {
+ fail("post operation never started")
+ }
+ } catch {
+ case irq: InterruptedException =>
+ case ex: RuntimeException => throw ex
+ }
+ }
+ }
+ }
+
+ // Mockito answer which doesn't return until the `exit` flag is set,
+ // and which sets the `entered` flag on entry.
+ // this is designed to synchronize the posting thread with the test runner thread
+ class delayedAnswer extends Answer[TimelinePutResponse]() {
+
+
+ override def answer(invocation: InvocationOnMock): TimelinePutResponse = {
+ // flag the operation has started
+ if (!exit.get()) {
+ logDebug("setting `entered` flag")
+ setAndNotify(entered)
+ logDebug("waiting for `exit` flag")
+ exit.synchronized {
+ exit.wait(TEST_STARTUP_DELAY)
+ }
+ }
+ new TimelinePutResponse
+ }
+ }
+ when(timelineClient.putEntities(any(classOf[TimelineEntity])))
+ .thenAnswer(new delayedAnswer())
+
+ val service = startHistoryService(sc)
+ try {
+ val listener = new YarnEventListener(sc, service)
+ listener.onApplicationStart(applicationStart)
+ awaitPostAttemptCount(service, 1)
+ setAndNotify(entered)
+ entered.synchronized {
+ if (!entered.get()) {
+ logDebug("waiting for `entered` flag")
+ entered.wait(TEST_STARTUP_DELAY)
+ if (!entered.get()) {
+ fail("post operation never started")
+ }
+ }
+ }
+ // trigger the stop process. The interrupt will be probably be lost
+ logDebug("stopping service")
+ service.stop()
+ logDebug("setting `exit` flag")
+ setAndNotify(exit)
+ awaitServiceThreadStopped(service, SERVICE_SHUTDOWN_DELAY)
+ logDebug(s"$service")
+ } finally {
+ setAndNotify(exit)
+ logDebug("stopping service in finally() clause")
+ service.stop()
+ }
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryFlushingSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryFlushingSuite.scala
new file mode 100644
index 000000000000..a893e8d2acc0
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryFlushingSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.history.yarn.YarnEventListener
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD
+
+/**
+ * Tests to verify that timeline operations happen even before services are closed.
+ *
+ * There's an async queue involved here, so the tests spin until a state is met or not.
+ */
+class MockHistoryFlushingSuite extends AbstractMockHistorySuite
+ with BeforeAndAfter with Matchers with Logging {
+
+ test("PostEventsNoServiceStop") {
+ describe("verify that events are pushed on any triggered flush," +
+ " even before a service is stopped")
+ val service = startHistoryService(sc)
+ try {
+ assert(service.timelineServiceEnabled, s"no timeline service in $service")
+ service.timelineClient
+ service.createTimelineClient()
+ val listener = new YarnEventListener(sc, service)
+ listener.onApplicationStart(applicationStart)
+ service.asyncFlush()
+ awaitPostAttemptCount(service, 1)
+ verify(timelineClient, times(1)).putEntities(any(classOf[TimelineEntity]))
+ } finally {
+ service.stop()
+ }
+ }
+
+ test("PostEventsWithServiceStop") {
+ describe("verify that events are pushed on service stop")
+ val service = startHistoryService(sc)
+ try {
+ service.timelineClient
+ service.createTimelineClient()
+ val listener = new YarnEventListener(sc, service)
+ listener.onApplicationStart(applicationStart)
+ awaitPostAttemptCount(service, 1)
+ verify(timelineClient, times(1)).putEntities(any(classOf[TimelineEntity]))
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
+ // expecting two events
+ awaitPostAttemptCount(service, 1)
+
+ // now stop the service and await the final post
+ service.stop()
+ awaitServiceThreadStopped(service, TEST_STARTUP_DELAY)
+ verify(timelineClient, times(2)).putEntities(any(classOf[TimelineEntity]))
+ } finally {
+ logDebug(s"teardown of $service")
+ service.stop()
+ }
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryServiceLifecycleSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryServiceLifecycleSuite.scala
new file mode 100644
index 000000000000..885791b1ef7f
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/MockHistoryServiceLifecycleSuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.history.yarn.YarnHistoryService
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.deploy.history.yarn.testtools.{ContextSetup, HistoryServiceListeningToSparkContext}
+import org.apache.spark.deploy.history.yarn.testtools.YarnTestUtils._
+import org.apache.spark.scheduler.cluster.SchedulerExtensionServiceBinding
+
+class MockHistoryServiceLifecycleSuite
+ extends AbstractMockHistorySuite
+ with ContextSetup
+ with HistoryServiceListeningToSparkContext {
+
+ /**
+ * Set the batch size to 2, purely so that we can trace its path through
+ * the configuration system.
+ */
+ override def setupConfiguration(sparkConf: SparkConf): SparkConf = {
+ super.setupConfiguration(sparkConf).set(BATCH_SIZE, "2")
+ }
+
+ /*
+ * Test service lifecycle ops and that stop() is re-entrant
+ */
+ test("Service Lifecycle") {
+ describe("service lifecycle operations")
+
+ assertResult("2", s"batch size in context") {
+ sc.conf.get(BATCH_SIZE)
+ }
+
+ assertResult("true", s"listening flag") {
+ sc.conf.get(REGISTER_LISTENER)
+ }
+
+ val service = startHistoryService(sc)
+ assertResult(StartedState, "not started") {
+ service.serviceState
+ }
+ assertResult(2, s"batch size in $service") {
+ service.batchSize
+ }
+ assertResult(true, s"listen flag in $service") {
+ service.listening
+ }
+
+ service.stop()
+ assertResult(StoppedState, s"not stopped: $service") {
+ service.serviceState
+ }
+
+ // and expect an attempt to start again to fail
+ intercept[IllegalArgumentException] {
+ service.start(SchedulerExtensionServiceBinding(sc, applicationId, None))
+ }
+ // repeated stop() is harmless
+ service.stop()
+ }
+
+ test("ServiceStartArguments1") {
+ val service = new YarnHistoryService()
+ intercept[IllegalArgumentException] {
+ service.start(SchedulerExtensionServiceBinding(null, applicationId, None))
+ }
+ }
+
+ test("ServiceStartArguments2") {
+ val service = new YarnHistoryService()
+ intercept[IllegalArgumentException] {
+ service.start(SchedulerExtensionServiceBinding(sc, null, None))
+ }
+ }
+
+}
diff --git a/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/YarnTimelineUtilsSuite.scala b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/YarnTimelineUtilsSuite.scala
new file mode 100644
index 000000000000..0f4ab0ff04d2
--- /dev/null
+++ b/yarn/src/history/test/scala/org/apache/spark/deploy/history/yarn/unit/YarnTimelineUtilsSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn.unit
+
+import java.net.{NoRouteToHostException, URI}
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.deploy.history.yarn.testtools.ExtraAssertions
+
+/**
+ * Tests of methods in [[org.apache.spark.deploy.history.yarn.YarnTimelineUtils]].
+ */
+class YarnTimelineUtilsSuite extends SparkFunSuite
+ with BeforeAndAfter with Logging with ExtraAssertions {
+
+ test("verifyNoHost") {
+ intercept[NoRouteToHostException] {
+ validateEndpoint(new URI("http://0.0.0.0:8080/ws"))
+ }
+ }
+
+ test("verifyNoPort") {
+ intercept[NoRouteToHostException] {
+ validateEndpoint(new URI("http://127.0.1.1:0/ws"))
+ }
+ }
+
+ test("verifyValid") {
+ validateEndpoint(new URI("http://127.0.1.1:8080/ws"))
+ }
+
+}