diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1a2443f7ee78d..b2a26c51d4de1 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -195,6 +195,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _conf: SparkConf = _ private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None + private var _listenerBus: LiveListenerBus = _ private var _env: SparkEnv = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ @@ -247,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging { def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus(this) + private[spark] def listenerBus: LiveListenerBus = _listenerBus // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -423,6 +424,8 @@ class SparkContext(config: SparkConf) extends Logging { if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") + _listenerBus = new LiveListenerBus(_conf) + // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. _jobProgressListener = new JobProgressListener(_conf) @@ -2388,7 +2391,7 @@ class SparkContext(config: SparkConf) extends Logging { } } - listenerBus.start() + listenerBus.start(this, _env.metricsSystem) _listenerBusStarted = true } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4ad04b04c312d..7827e6760f355 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -158,6 +158,12 @@ package object config { .checkValue(_ > 0, "The capacity of listener bus event queue must not be negative") .createWithDefault(10000) + private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED = + ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed") + .internal() + .intConf + .createWithDefault(128) + // This property sets the root namespace for metrics reporting private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 801dfaa62306a..f0887e090b956 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -20,10 +20,16 @@ package org.apache.spark.scheduler import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.collection.mutable import scala.util.DynamicVariable -import org.apache.spark.SparkContext +import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.util.Utils /** @@ -33,15 +39,20 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { +private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { + self => import LiveListenerBus._ + private var sparkContext: SparkContext = _ + // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( - sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + private val eventQueue = + new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + + private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue) // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -67,6 +78,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa setDaemon(true) override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { + val timer = metrics.eventProcessingTime while (true) { eventLock.acquire() self.synchronized { @@ -82,7 +94,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa } return } - postToAll(event) + val timerContext = timer.time() + try { + postToAll(event) + } finally { + timerContext.stop() + } } finally { self.synchronized { processingEvent = false @@ -93,6 +110,10 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa } } + override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { + metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + } + /** * Start sending events to attached listeners. * @@ -100,9 +121,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * + * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(): Unit = { + def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { if (started.compareAndSet(false, true)) { + sparkContext = sc + metricsSystem.registerSource(metrics) listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") @@ -115,12 +139,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa logError(s"$name has already stopped! Dropping event $event") return } + metrics.numEventsPosted.inc() val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { onDropEvent(event) - droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get @@ -200,6 +224,8 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa * Note: `onDropEvent` can be called in any thread. */ def onDropEvent(event: SparkListenerEvent): Unit = { + metrics.numDroppedEvents.inc() + droppedEventsCounter.incrementAndGet() if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. logError("Dropping SparkListenerEvent because no remaining room in event queue. " + @@ -217,3 +243,64 @@ private[spark] object LiveListenerBus { val name = "SparkListenerBus" } +private[spark] class LiveListenerBusMetrics( + conf: SparkConf, + queue: LinkedBlockingQueue[_]) + extends Source with Logging { + + override val sourceName: String = "LiveListenerBus" + override val metricRegistry: MetricRegistry = new MetricRegistry + + /** + * The total number of events posted to the LiveListenerBus. This is a count of the total number + * of events which have been produced by the application and sent to the listener bus, NOT a + * count of the number of events which have been processed and delivered to listeners (or dropped + * without being delivered). + */ + val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) + + /** + * The total number of events that were dropped without being delivered to listeners. + */ + val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) + + /** + * The amount of time taken to post a single event to all listeners. + */ + val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) + + /** + * The number of messages waiting in the queue. + */ + val queueSize: Gauge[Int] = { + metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ + override def getValue: Int = queue.size() + }) + } + + // Guarded by synchronization. + private val perListenerClassTimers = mutable.Map[String, Timer]() + + /** + * Returns a timer tracking the processing time of the given listener class. + * events processed by that listener. This method is thread-safe. + */ + def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = { + synchronized { + val className = cls.getName + val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED) + perListenerClassTimers.get(className).orElse { + if (perListenerClassTimers.size == maxTimed) { + logError(s"Not measuring processing time for listener class $className because a " + + s"maximum of $maxTimed listener classes are already timed.") + None + } else { + perListenerClassTimers(className) = + metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className)) + perListenerClassTimers.get(className) + } + } + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index fa5ad4e8d81e1..76a56298aaebc 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -23,6 +23,8 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal +import com.codahale.metrics.Timer + import org.apache.spark.internal.Logging /** @@ -30,14 +32,22 @@ import org.apache.spark.internal.Logging */ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])] + // Marked `private[spark]` for access in tests. - private[spark] val listeners = new CopyOnWriteArrayList[L] + private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava + + /** + * Returns a CodaHale metrics Timer for measuring the listener's event processing time. + * This method is intended to be overridden by subclasses. + */ + protected def getTimer(listener: L): Option[Timer] = None /** * Add a listener to listen events. This method is thread-safe and can be called in any thread. */ final def addListener(listener: L): Unit = { - listeners.add(listener) + listenersPlusTimers.add((listener, getTimer(listener))) } /** @@ -45,7 +55,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * in any thread. */ final def removeListener(listener: L): Unit = { - listeners.remove(listener) + listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer => + listenersPlusTimers.remove(listenerAndTimer) + } } /** @@ -56,14 +68,25 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here we use // Java Iterator directly. - val iter = listeners.iterator + val iter = listenersPlusTimers.iterator while (iter.hasNext) { - val listener = iter.next() + val listenerAndMaybeTimer = iter.next() + val listener = listenerAndMaybeTimer._1 + val maybeTimer = listenerAndMaybeTimer._2 + val maybeTimerContext = if (maybeTimer.isDefined) { + maybeTimer.get.time() + } else { + null + } try { doPostEvent(listener, event) } catch { case NonFatal(e) => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } finally { + if (maybeTimerContext != null) { + maybeTimerContext.stop() + } } } } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 31d9dd3de8acc..59d8c14d74e30 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -633,7 +633,12 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { test("port conflict") { val anotherEnv = createRpcEnv(new SparkConf(), "remote", env.address.port) - assert(anotherEnv.address.port != env.address.port) + try { + assert(anotherEnv.address.port != env.address.port) + } finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() + } } private def testSend(conf: SparkConf): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152c..4cae6c61118a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -25,12 +25,14 @@ import scala.io.Source import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ +import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.io._ +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{JsonProtocol, Utils} /** @@ -155,17 +157,18 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus(sc) + val listenerBus = new LiveListenerBus(conf) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start() + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) + listenerBus.stop() eventLogger.stop() // Verify file contains exactly the two events logged diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 80c7e0bfee6ef..f3d0bc19675fc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -22,10 +22,13 @@ import java.util.concurrent.Semaphore import scala.collection.mutable import scala.collection.JavaConverters._ +import org.mockito.Mockito import org.scalatest.Matchers import org.apache.spark._ import org.apache.spark.executor.TaskMetrics +import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_CAPACITY +import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers @@ -36,14 +39,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val jobCompletionTime = 1421191296660L + private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext]) + private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem]) + test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(sc.conf) bus.addListener(listener) // Starting listener bus should flush all buffered events - bus.start() + bus.start(sc, sc.env.metricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) @@ -52,35 +58,54 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("basic creation and shutdown of LiveListenerBus") { - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val conf = new SparkConf() val counter = new BasicJobCounter - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(conf) bus.addListener(counter) - // Listener bus hasn't started yet, so posting events should not increment counter + // Metrics are initially empty. + assert(bus.metrics.numEventsPosted.getCount === 0) + assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.eventProcessingTime.getCount === 0) + + // Post five events: (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } + + // Five messages should be marked as received and queued, but no messages should be posted to + // listeners yet because the the listener bus hasn't been started. + assert(bus.metrics.numEventsPosted.getCount === 5) + assert(bus.metrics.queueSize.getValue === 5) assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) + Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.eventProcessingTime.getCount === 5) // After listener bus has stopped, posting events should not increment counter bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) + assert(bus.metrics.numEventsPosted.getCount === 5) + + // Make sure per-listener-class timers were created: + assert(bus.metrics.getTimerForListenerClass( + classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5) // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus(sc) - bus.start() - bus.start() + val bus = new LiveListenerBus(conf) + bus.start(mockSparkContext, mockMetricsSystem) + bus.start(mockSparkContext, mockMetricsSystem) } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(conf) bus.stop() } } @@ -107,12 +132,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match drained = true } } - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(new SparkConf()) val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -138,6 +162,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(drained) } + test("metrics for dropped listener events") { + val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 1)) + + val listenerStarted = new Semaphore(0) + val listenerWait = new Semaphore(0) + + bus.addListener(new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + listenerStarted.release() + listenerWait.acquire() + } + }) + + bus.start(mockSparkContext, mockMetricsSystem) + + // Post a message to the listener bus and wait for processing to begin: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + listenerStarted.acquire() + assert(bus.metrics.queueSize.getValue === 0) + assert(bus.metrics.numDroppedEvents.getCount === 0) + + // If we post an additional message then it should remain in the queue because the listener is + // busy processing the first event: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + assert(bus.metrics.queueSize.getValue === 1) + assert(bus.metrics.numDroppedEvents.getCount === 0) + + // The queue is now full, so any additional events posted to the listener will be dropped: + bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + assert(bus.metrics.queueSize.getValue === 1) + assert(bus.metrics.numDroppedEvents.getCount === 1) + + + // Allow the the remaining events to be processed so we can stop the listener bus: + listenerWait.release(2) + bus.stop() + } + test("basic creation of StageInfo") { sc = new SparkContext("local", "SparkListenerSuite") val listener = new SaveStageAndTaskInfo @@ -354,14 +416,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) - val bus = new LiveListenerBus(sc) + val bus = new LiveListenerBus(new SparkConf()) // Propagate events to bad listener first bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start() + bus.start(mockSparkContext, mockMetricsSystem) // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index c100803279eaf..dd61dcd11bcda 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -100,7 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0d2912ba8c5fb..9d52b488b223e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -125,7 +125,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE when(sc.conf).thenReturn(conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index f6c8418ba3ac4..66dda382eb653 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage._ /** * Test various functionality in the StorageListener that supports the StorageTab. */ -class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter { +class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { private var bus: LiveListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ @@ -43,8 +43,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn before { val conf = new SparkConf() - sc = new SparkContext("local", "test", conf) - bus = new LiveListenerBus(sc) + bus = new LiveListenerBus(conf) storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index 92c296a9e6bd3..386066a85749f 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location). -The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity` +The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`. + +If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved. The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`. diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 6d6983c4bd419..9a4a1cf32a480 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) } - val rdd = new KafkaRDD[K, V]( - context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) + val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", + true) + val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, + getPreferredHosts, useConsumerCache) // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => @@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( b.map(OffsetRange(_)), getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple - // threads, so dont use cache + // threads, so do not use cache. false ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 4ed6728994193..bd144c9575c72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -278,7 +278,7 @@ class JacksonParser( // We cannot parse this token based on the given data type. So, we throw a // RuntimeException and this exception will be caught by `parse` method. throw new RuntimeException( - s"Failed to parse a value for data type $dataType (current token: $token).") + s"Failed to parse a value for data type ${dataType.catalogString} (current token: $token).") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 8abec85ee102a..f7637e005f317 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -131,7 +131,7 @@ private[sql] object Dataset { * * people.filter("age > 30") * .join(department, people("deptId") === department("id")) - * .groupBy(department("name"), "gender") + * .groupBy(department("name"), people("gender")) * .agg(avg(people("salary")), max(people("age"))) * }}} * @@ -141,9 +141,9 @@ private[sql] object Dataset { * Dataset people = spark.read().parquet("..."); * Dataset department = spark.read().parquet("..."); * - * people.filter("age".gt(30)) - * .join(department, people.col("deptId").equalTo(department("id"))) - * .groupBy(department.col("name"), "gender") + * people.filter(people.col("age").gt(30)) + * .join(department, people.col("deptId").equalTo(department.col("id"))) + * .groupBy(department.col("name"), people.col("gender")) * .agg(avg(people.col("salary")), max(people.col("age"))); * }}} * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 9ccd6792e5da4..b937a8a9f375b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -522,15 +522,15 @@ case class DescribeTableCommand( throw new AnalysisException( s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") } - describeSchema(catalog.lookupRelation(table).schema, result) + describeSchema(catalog.lookupRelation(table).schema, result, header = false) } else { val metadata = catalog.getTableMetadata(table) if (metadata.schema.isEmpty) { // In older version(prior to 2.1) of Spark, the table schema can be empty and should be // inferred at runtime. We should still support it. - describeSchema(sparkSession.table(metadata.identifier).schema, result) + describeSchema(sparkSession.table(metadata.identifier).schema, result, header = false) } else { - describeSchema(metadata.schema, result) + describeSchema(metadata.schema, result, header = false) } describePartitionInfo(metadata, result) @@ -550,7 +550,7 @@ case class DescribeTableCommand( private def describePartitionInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { if (table.partitionColumnNames.nonEmpty) { append(buffer, "# Partition Information", "", "") - describeSchema(table.partitionSchema, buffer) + describeSchema(table.partitionSchema, buffer, header = true) } } @@ -601,8 +601,13 @@ case class DescribeTableCommand( table.storage.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) } - private def describeSchema(schema: StructType, buffer: ArrayBuffer[Row]): Unit = { - append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) + private def describeSchema( + schema: StructType, + buffer: ArrayBuffer[Row], + header: Boolean): Unit = { + if (header) { + append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) + } schema.foreach { column => append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala index 159aef220be15..43591a9ff524a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.util._ @@ -65,7 +66,8 @@ class FailureSafeParser[IN]( case DropMalformedMode => Iterator.empty case FailFastMode => - throw e.cause + throw new SparkException("Malformed records are detected in record parsing. " + + s"Parse Mode: ${FailFastMode.name}.", e.cause) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala index fb632cf2bb70e..a270a6451d5dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala @@ -21,6 +21,7 @@ import java.util.Comparator import com.fasterxml.jackson.core._ +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil @@ -61,7 +62,8 @@ private[sql] object JsonInferSchema { case DropMalformedMode => None case FailFastMode => - throw e + throw new SparkException("Malformed records are detected in schema inference. " + + s"Parse Mode: ${FailFastMode.name}.", e) } } } @@ -231,8 +233,9 @@ private[sql] object JsonInferSchema { case FailFastMode => // If `other` is not struct type, consider it as malformed one and throws an exception. - throw new RuntimeException("Failed to infer a common schema. Struct types are expected" + - s" but ${other.catalogString} was found.") + throw new SparkException("Malformed records are detected in schema inference. " + + s"Parse Mode: ${FailFastMode.name}. Reasons: Failed to infer a common schema. " + + s"Struct types are expected, but `${other.catalogString}` was found.") } /** diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 678a3f0f0a3c6..ba8bc936f0c79 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -15,7 +15,6 @@ DESC test_change -- !query 1 schema struct -- !query 1 output -# col_name data_type comment a int b string c int @@ -35,7 +34,6 @@ DESC test_change -- !query 3 schema struct -- !query 3 output -# col_name data_type comment a int b string c int @@ -55,7 +53,6 @@ DESC test_change -- !query 5 schema struct -- !query 5 output -# col_name data_type comment a int b string c int @@ -94,7 +91,6 @@ DESC test_change -- !query 8 schema struct -- !query 8 output -# col_name data_type comment a int b string c int @@ -129,7 +125,6 @@ DESC test_change -- !query 12 schema struct -- !query 12 output -# col_name data_type comment a int this is column a b string #*02?` c int @@ -148,7 +143,6 @@ DESC test_change -- !query 14 schema struct -- !query 14 output -# col_name data_type comment a int this is column a b string #*02?` c int @@ -168,7 +162,6 @@ DESC test_change -- !query 16 schema struct -- !query 16 output -# col_name data_type comment a int this is column a b string #*02?` c int @@ -193,7 +186,6 @@ DESC test_change -- !query 18 schema struct -- !query 18 output -# col_name data_type comment a int this is column a b string #*02?` c int @@ -237,7 +229,6 @@ DESC test_change -- !query 23 schema struct -- !query 23 output -# col_name data_type comment a int this is column A b string #*02?` c int diff --git a/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out index 1cc11c475bc40..eece00d603db4 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out @@ -15,7 +15,6 @@ DESC FORMATTED table_with_comment -- !query 1 schema struct -- !query 1 output -# col_name data_type comment a string b int c string @@ -45,7 +44,6 @@ DESC FORMATTED table_with_comment -- !query 3 schema struct -- !query 3 output -# col_name data_type comment a string b int c string @@ -84,7 +82,6 @@ DESC FORMATTED table_comment -- !query 6 schema struct -- !query 6 output -# col_name data_type comment a string b int @@ -111,7 +108,6 @@ DESC formatted table_comment -- !query 8 schema struct -- !query 8 output -# col_name data_type comment a string b int @@ -139,7 +135,6 @@ DESC FORMATTED table_comment -- !query 10 schema struct -- !query 10 output -# col_name data_type comment a string b int diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out index de10b29f3c65b..46d32bbc52247 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -54,7 +54,6 @@ DESCRIBE t -- !query 5 schema struct -- !query 5 output -# col_name data_type comment a string b int c string @@ -70,7 +69,6 @@ DESC default.t -- !query 6 schema struct -- !query 6 output -# col_name data_type comment a string b int c string @@ -86,7 +84,6 @@ DESC TABLE t -- !query 7 schema struct -- !query 7 output -# col_name data_type comment a string b int c string @@ -102,7 +99,6 @@ DESC FORMATTED t -- !query 8 schema struct -- !query 8 output -# col_name data_type comment a string b int c string @@ -132,7 +128,6 @@ DESC EXTENDED t -- !query 9 schema struct -- !query 9 output -# col_name data_type comment a string b int c string @@ -162,7 +157,6 @@ DESC t PARTITION (c='Us', d=1) -- !query 10 schema struct -- !query 10 output -# col_name data_type comment a string b int c string @@ -178,7 +172,6 @@ DESC EXTENDED t PARTITION (c='Us', d=1) -- !query 11 schema struct -- !query 11 output -# col_name data_type comment a string b int c string @@ -206,7 +199,6 @@ DESC FORMATTED t PARTITION (c='Us', d=1) -- !query 12 schema struct -- !query 12 output -# col_name data_type comment a string b int c string @@ -268,7 +260,6 @@ DESC temp_v -- !query 16 schema struct -- !query 16 output -# col_name data_type comment a string b int c string @@ -280,7 +271,6 @@ DESC TABLE temp_v -- !query 17 schema struct -- !query 17 output -# col_name data_type comment a string b int c string @@ -292,7 +282,6 @@ DESC FORMATTED temp_v -- !query 18 schema struct -- !query 18 output -# col_name data_type comment a string b int c string @@ -304,7 +293,6 @@ DESC EXTENDED temp_v -- !query 19 schema struct -- !query 19 output -# col_name data_type comment a string b int c string @@ -316,7 +304,6 @@ DESC temp_Data_Source_View -- !query 20 schema struct -- !query 20 output -# col_name data_type comment intType int test comment test1 stringType string dateType date @@ -349,7 +336,6 @@ DESC v -- !query 22 schema struct -- !query 22 output -# col_name data_type comment a string b int c string @@ -361,7 +347,6 @@ DESC TABLE v -- !query 23 schema struct -- !query 23 output -# col_name data_type comment a string b int c string @@ -373,7 +358,6 @@ DESC FORMATTED v -- !query 24 schema struct -- !query 24 output -# col_name data_type comment a string b int c string @@ -396,7 +380,6 @@ DESC EXTENDED v -- !query 25 schema struct -- !query 25 output -# col_name data_type comment a string b int c string diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index e66a60d7503f3..65472cda9c1c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1036,24 +1036,24 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("Corrupt records: FAILFAST mode") { - val schema = StructType( - StructField("a", StringType, true) :: Nil) // `FAILFAST` mode should throw an exception for corrupt records. val exceptionOne = intercept[SparkException] { spark.read .option("mode", "FAILFAST") .json(corruptRecords) - } - assert(exceptionOne.getMessage.contains("JsonParseException")) + }.getMessage + assert(exceptionOne.contains( + "Malformed records are detected in schema inference. Parse Mode: FAILFAST.")) val exceptionTwo = intercept[SparkException] { spark.read .option("mode", "FAILFAST") - .schema(schema) + .schema("a string") .json(corruptRecords) .collect() - } - assert(exceptionTwo.getMessage.contains("JsonParseException")) + }.getMessage + assert(exceptionTwo.contains( + "Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) } test("Corrupt records: DROPMALFORMED mode") { @@ -1944,7 +1944,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .option("mode", "FAILFAST") .json(path) } - assert(exceptionOne.getMessage.contains("Failed to infer a common schema")) + assert(exceptionOne.getMessage.contains("Malformed records are detected in schema " + + "inference. Parse Mode: FAILFAST.")) val exceptionTwo = intercept[SparkException] { spark.read @@ -1954,7 +1955,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { .json(path) .collect() } - assert(exceptionTwo.getMessage.contains("Failed to parse a value")) + assert(exceptionTwo.getMessage.contains("Malformed records are detected in record " + + "parsing. Parse Mode: FAILFAST.")) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ab931b94987d3..aca964907d4cd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -806,7 +806,7 @@ class HiveDDLSuite checkAnswer( sql(s"DESC $tabName").select("col_name", "data_type", "comment"), - Row("# col_name", "data_type", "comment") :: Row("a", "int", "test") :: Nil + Row("a", "int", "test") :: Nil ) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 3c4a2716caf90..fe65353b9d502 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -50,7 +50,6 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) extends SparkFunSuite with BeforeAndAfter with Matchers - with LocalSparkContext with Logging { import WriteAheadLogBasedBlockHandler._ @@ -89,10 +88,9 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) - sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(sc))), conf, true) + new LiveListenerBus(conf))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf)