+ * This method is called early in the initialization of the Spark driver. Explicitly, it is + * called before the Spark driver's task scheduler is initialized. This means that a lot + * of other Spark subsystems may yet not have been initialized. This call also blocks driver + * initialization. + *
+ * It's recommended that plugins be careful about what operations are performed in this call,
+ * preferrably performing expensive operations in a separate thread, or postponing them until
+ * the application has fully started.
+ *
+ * @param sc The SparkContext loading the plugin.
+ * @param pluginContext Additional plugin-specific about the Spark application where the plugin
+ * is running.
+ * @return A map that will be provided to the {@link ExecutorPlugin#init(PluginContext,Map)}
+ * method.
+ */
+ default Map
+ * This method is called later in the initialization of the Spark application, after most
+ * subsystems are up and the application ID is known. If there are metrics registered in
+ * the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the
+ * plugin name will be created.
+ *
+ * Note that even though the metric registry is still accessible after this method is called,
+ * registering new metrics after this method is called may result in the metrics not being
+ * available.
+ *
+ * @param appId The application ID from the cluster manager.
+ * @param pluginContext Additional plugin-specific about the Spark application where the plugin
+ * is running.
+ */
+ default void registerMetrics(String appId, PluginContext pluginContext) {}
+
+ /**
+ * RPC message handler.
+ *
+ * Plugins can use Spark's RPC system to send messages from executors to the driver (but not
+ * the other way around, currently). Messages sent by the executor component of the plugin will
+ * be delivered to this method, and the returned value will be sent back to the executor as
+ * the reply, if the executor has requested one.
+ *
+ * Any exception thrown will be sent back to the executor as an error, in case it is expecting
+ * a reply. In case a reply is not expected, a log message will be written to the driver log.
+ *
+ * The implementation of this handler should be thread-safe.
+ *
+ * Note all plugins share RPC dispatch threads, and this method is called synchronously. So
+ * performing expensive operations in this handler may affect the operation of other active
+ * plugins. Internal Spark endpoints are not directly affected, though, since they use different
+ * threads.
+ *
+ * Spark guarantees that the driver component will be ready to receive messages through this
+ * handler when executors are started.
+ *
+ * @param message The incoming message.
+ * @return Value to be returned to the caller. Ignored if the caller does not expect a reply.
+ */
+ default Object receive(Object message) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Informs the plugin that the Spark application is shutting down.
+ *
+ * This method is called during the driver shutdown phase. It is recommended that plugins
+ * not use any Spark functions (e.g. send RPC messages) during this call.
+ */
+ default void shutdown() {}
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java
new file mode 100644
index 000000000000..496130803516
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * Executor component of a {@link SparkPlugin}.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface ExecutorPlugin {
+
+ /**
+ * Initialize the executor plugin.
+ *
+ * When a Spark plugin provides an executor plugin, this method will be called during the
+ * initialization of the executor process. It will block executor initialization until it
+ * returns.
+ *
+ * Executor plugins that publish metrics should register all metrics with the context's
+ * registry ({@link PluginContext#metricRegistry()}) when this method is called. Metrics
+ * registered afterwards are not guaranteed to show up.
+ *
+ * @param ctx Context information for the executor where the plugin is running.
+ * @param extraConf Extra configuration provided by the driver component during its
+ * initialization.
+ */
+ default void init(PluginContext ctx, Map
+ * This method is called during the executor shutdown phase, and blocks executor shutdown.
+ */
+ default void shutdown() {}
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java
new file mode 100644
index 000000000000..b9413cf828aa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import java.io.IOException;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * Context information and operations for plugins loaded by Spark.
+ *
+ * An instance of this class is provided to plugins in their initialization method. It is safe
+ * for plugins to keep a reference to the instance for later use (for example, to send messages
+ * to the plugin's driver component).
+ *
+ * Context instances are plugin-specific, so metrics and messages are tied each plugin. It is
+ * not possible for a plugin to directly interact with other plugins.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface PluginContext {
+
+ /**
+ * Registry where to register metrics published by the plugin associated with this context.
+ */
+ MetricRegistry metricRegistry();
+
+ /** Configuration of the Spark application. */
+ SparkConf conf();
+
+ /** Executor ID of the process. On the driver, this will identify the driver. */
+ String executorID();
+
+ /** The host name which is being used by the Spark process for communication. */
+ String hostname();
+
+ /**
+ * Send a message to the plugin's driver-side component.
+ *
+ * This method sends a message to the driver-side component of the plugin, without expecting
+ * a reply. It returns as soon as the message is enqueued for sending.
+ *
+ * The message must be serializable.
+ *
+ * @param message Message to be sent.
+ */
+ void send(Object message) throws IOException;
+
+ /**
+ * Send an RPC to the plugin's driver-side component.
+ *
+ * This method sends a message to the driver-side component of the plugin, and blocks until a
+ * reply arrives, or the configured RPC ask timeout (
+ * If the driver replies with an error, an exception with the corresponding error will be thrown.
+ *
+ * The message must be serializable.
+ *
+ * @param message Message to be sent.
+ * @return The reply from the driver-side component.
+ */
+ Object ask(Object message) throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java
new file mode 100644
index 000000000000..a500f5d2188f
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * A plugin that can be dynamically loaded into a Spark application.
+ *
+ * Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration.
+ * Check the Spark configuration documentation for details.
+ *
+ * Plugins have two optional components: a driver-side component, of which a single instance is
+ * created per application, inside the Spark driver. And an executor-side component, of which one
+ * instance is created in each executor that is started by Spark. Details of each component can be
+ * found in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface SparkPlugin {
+
+ /**
+ * Return the plugin's driver-side component.
+ *
+ * @return The driver-side component, or null if one is not needed.
+ */
+ DriverPlugin driverPlugin();
+
+ /**
+ * Return the plugin's executor-side component.
+ *
+ * @return The executor-side component, or null if one is not needed.
+ */
+ ExecutorPlugin executorPlugin();
+
+}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index cf04db28804c..fac464e1353c 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -87,6 +87,9 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
collapseTablePageLoad('collapse-aggregated-failedExecutions','aggregated-failedExecutions');
+ collapseTablePageLoad('collapse-aggregated-sessionstat','aggregated-sessionstat');
+ collapseTablePageLoad('collapse-aggregated-sqlstat','aggregated-sqlstat');
+ collapseTablePageLoad('collapse-aggregated-sqlsessionstat','aggregated-sqlsessionstat');
});
$(function() {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2db880976c3a..cad88ad8aec6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
@@ -220,6 +221,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
+ private var _plugins: Option[PluginContainer] = None
/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
@@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+ // Initialize any plugins before the task scheduler is initialized.
+ _plugins = PluginContainer(this)
+
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
@@ -621,6 +626,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
+ _plugins.foreach(_.registerMetrics(applicationId))
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
@@ -1976,6 +1982,9 @@ class SparkContext(config: SparkConf) extends Logging {
_listenerBusStarted = false
}
}
+ Utils.tryLogNonFatalError {
+ _plugins.foreach(_.shutdown())
+ }
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ce6d0322bafd..0f595d095a22 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -37,6 +37,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
@@ -165,6 +166,11 @@ private[spark] class Executor(
}
}
+ // Plugins need to load using a class loader that includes the executor's user classpath
+ private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
+ PluginContainer(env)
+ }
+
// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxDirectResultSize = Math.min(
@@ -297,6 +303,7 @@ private[spark] class Executor(
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
}
}
+ plugins.foreach(_.shutdown())
}
if (!isLocal) {
env.stop()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 444a1544777a..295fe28e8b9a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1159,6 +1159,17 @@ package object config {
s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
.createWithDefault(1024 * 1024)
+ private[spark] val DEFAULT_PLUGINS_LIST = "spark.plugins.defaultList"
+
+ private[spark] val PLUGINS =
+ ConfigBuilder("spark.plugins")
+ .withPrepended(DEFAULT_PLUGINS_LIST, separator = ",")
+ .doc("Comma-separated list of class names implementing " +
+ "org.apache.spark.api.plugin.SparkPlugin to load into the application.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
private[spark] val EXECUTOR_PLUGINS =
ConfigBuilder("spark.executor.plugins")
.doc("Comma-separated list of class names for \"plugins\" implementing " +
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala
new file mode 100644
index 000000000000..fc7a9d85957c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import scala.collection.JavaConverters._
+import scala.util.{Either, Left, Right}
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.api.plugin._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+sealed abstract class PluginContainer {
+
+ def shutdown(): Unit
+ def registerMetrics(appId: String): Unit
+
+}
+
+private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin])
+ extends PluginContainer with Logging {
+
+ private val driverPlugins: Seq[(String, DriverPlugin, PluginContextImpl)] = plugins.flatMap { p =>
+ val driverPlugin = p.driverPlugin()
+ if (driverPlugin != null) {
+ val name = p.getClass().getName()
+ val ctx = new PluginContextImpl(name, sc.env.rpcEnv, sc.env.metricsSystem, sc.conf,
+ sc.env.executorId)
+
+ val extraConf = driverPlugin.init(sc, ctx)
+ if (extraConf != null) {
+ extraConf.asScala.foreach { case (k, v) =>
+ sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v)
+ }
+ }
+ logInfo(s"Initialized driver component for plugin $name.")
+ Some((p.getClass().getName(), driverPlugin, ctx))
+ } else {
+ None
+ }
+ }
+
+ if (driverPlugins.nonEmpty) {
+ val pluginsByName = driverPlugins.map { case (name, plugin, _) => (name, plugin) }.toMap
+ sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(),
+ new PluginEndpoint(pluginsByName, sc.env.rpcEnv))
+ }
+
+ override def registerMetrics(appId: String): Unit = {
+ driverPlugins.foreach { case (_, plugin, ctx) =>
+ plugin.registerMetrics(appId, ctx)
+ ctx.registerMetrics()
+ }
+ }
+
+ override def shutdown(): Unit = {
+ driverPlugins.foreach { case (name, plugin, _) =>
+ try {
+ logDebug(s"Stopping plugin $name.")
+ plugin.shutdown()
+ } catch {
+ case t: Throwable =>
+ logInfo(s"Exception while shutting down plugin $name.", t)
+ }
+ }
+ }
+
+}
+
+private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin])
+ extends PluginContainer with Logging {
+
+ private val executorPlugins: Seq[(String, ExecutorPlugin)] = {
+ val allExtraConf = env.conf.getAllWithPrefix(PluginContainer.EXTRA_CONF_PREFIX)
+
+ plugins.flatMap { p =>
+ val executorPlugin = p.executorPlugin()
+ if (executorPlugin != null) {
+ val name = p.getClass().getName()
+ val prefix = name + "."
+ val extraConf = allExtraConf
+ .filter { case (k, v) => k.startsWith(prefix) }
+ .map { case (k, v) => k.substring(prefix.length()) -> v }
+ .toMap
+ .asJava
+ val ctx = new PluginContextImpl(name, env.rpcEnv, env.metricsSystem, env.conf,
+ env.executorId)
+ executorPlugin.init(ctx, extraConf)
+ ctx.registerMetrics()
+
+ logInfo(s"Initialized executor component for plugin $name.")
+ Some(p.getClass().getName() -> executorPlugin)
+ } else {
+ None
+ }
+ }
+ }
+
+ override def registerMetrics(appId: String): Unit = {
+ throw new IllegalStateException("Should not be called for the executor container.")
+ }
+
+ override def shutdown(): Unit = {
+ executorPlugins.foreach { case (name, plugin) =>
+ try {
+ logDebug(s"Stopping plugin $name.")
+ plugin.shutdown()
+ } catch {
+ case t: Throwable =>
+ logInfo(s"Exception while shutting down plugin $name.", t)
+ }
+ }
+ }
+}
+
+object PluginContainer {
+
+ val EXTRA_CONF_PREFIX = "spark.plugins.internal.conf."
+
+ def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc))
+
+ def apply(env: SparkEnv): Option[PluginContainer] = PluginContainer(Right(env))
+
+ private def apply(ctx: Either[SparkContext, SparkEnv]): Option[PluginContainer] = {
+ val conf = ctx.fold(_.conf, _.conf)
+ val plugins = Utils.loadExtensions(classOf[SparkPlugin], conf.get(PLUGINS).distinct, conf)
+ if (plugins.nonEmpty) {
+ ctx match {
+ case Left(sc) => Some(new DriverPluginContainer(sc, plugins))
+ case Right(env) => Some(new ExecutorPluginContainer(env, plugins))
+ }
+ } else {
+ None
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala
new file mode 100644
index 000000000000..279f3d388fb2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.util.RpcUtils
+
+private class PluginContextImpl(
+ pluginName: String,
+ rpcEnv: RpcEnv,
+ metricsSystem: MetricsSystem,
+ override val conf: SparkConf,
+ override val executorID: String)
+ extends PluginContext with Logging {
+
+ override def hostname(): String = rpcEnv.address.hostPort.split(":")(0)
+
+ private val registry = new MetricRegistry()
+
+ private lazy val driverEndpoint = try {
+ RpcUtils.makeDriverRef(classOf[PluginEndpoint].getName(), conf, rpcEnv)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Failed to create driver plugin endpoint ref.", e)
+ null
+ }
+
+ override def metricRegistry(): MetricRegistry = registry
+
+ override def send(message: AnyRef): Unit = {
+ if (driverEndpoint == null) {
+ throw new IllegalStateException("Driver endpoint is not known.")
+ }
+ driverEndpoint.send(PluginMessage(pluginName, message))
+ }
+
+ override def ask(message: AnyRef): AnyRef = {
+ try {
+ if (driverEndpoint != null) {
+ driverEndpoint.askSync[AnyRef](PluginMessage(pluginName, message))
+ } else {
+ throw new IllegalStateException("Driver endpoint is not known.")
+ }
+ } catch {
+ case e: SparkException if e.getCause() != null =>
+ throw e.getCause()
+ }
+ }
+
+ def registerMetrics(): Unit = {
+ if (!registry.getMetrics().isEmpty()) {
+ val src = new PluginMetricsSource(s"plugin.$pluginName", registry)
+ metricsSystem.registerSource(src)
+ }
+ }
+
+ class PluginMetricsSource(
+ override val sourceName: String,
+ override val metricRegistry: MetricRegistry)
+ extends Source
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala
new file mode 100644
index 000000000000..9a59b6bf678f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import org.apache.spark.api.plugin.DriverPlugin
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv}
+
+case class PluginMessage(pluginName: String, message: AnyRef)
+
+private class PluginEndpoint(
+ plugins: Map[String, DriverPlugin],
+ override val rpcEnv: RpcEnv)
+ extends IsolatedRpcEndpoint with Logging {
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case PluginMessage(pluginName, message) =>
+ plugins.get(pluginName) match {
+ case Some(plugin) =>
+ try {
+ val reply = plugin.receive(message)
+ if (reply != null) {
+ logInfo(
+ s"Plugin $pluginName returned reply for one-way message of type " +
+ s"${message.getClass().getName()}.")
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Error in plugin $pluginName when handling message of type " +
+ s"${message.getClass().getName()}.", e)
+ }
+
+ case None =>
+ throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.")
+ }
+ }
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case PluginMessage(pluginName, message) =>
+ plugins.get(pluginName) match {
+ case Some(plugin) =>
+ context.reply(plugin.receive(message))
+
+ case None =>
+ throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.")
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
index 9e94a868ccc3..a7b7b5573cfe 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -19,7 +19,8 @@ package org.apache.spark.metrics.sink
import java.util.Properties
-import com.codahale.metrics.{JmxReporter, MetricRegistry}
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jmx.JmxReporter
import org.apache.spark.SecurityManager
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 7da0a9d2285b..a5850fc2ac4b 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -355,6 +355,8 @@ private[spark] class AppStatusListener(
val lastStageInfo = event.stageInfos.sortBy(_.stageId).lastOption
val jobName = lastStageInfo.map(_.name).getOrElse("")
+ val description = Option(event.properties)
+ .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) }
val jobGroup = Option(event.properties)
.flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
val sqlExecutionId = Option(event.properties)
@@ -363,6 +365,7 @@ private[spark] class AppStatusListener(
val job = new LiveJob(
event.jobId,
jobName,
+ description,
if (event.time > 0) Some(new Date(event.time)) else None,
event.stageIds,
jobGroup,
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 00c991b49920..a0ef8da0a4b6 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -62,6 +62,7 @@ private[spark] abstract class LiveEntity {
private class LiveJob(
val jobId: Int,
name: String,
+ description: Option[String],
val submissionTime: Option[Date],
val stageIds: Seq[Int],
jobGroup: Option[String],
@@ -92,7 +93,7 @@ private class LiveJob(
val info = new v1.JobData(
jobId,
name,
- None, // description is always None?
+ description,
submissionTime,
completionTime,
stageIds,
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index f2113947f6bf..ee43b76e1701 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -20,6 +20,8 @@ package org.apache.spark.storage
import java.io.{File, IOException}
import java.util.UUID
+import scala.util.control.NonFatal
+
import org.apache.spark.SparkConf
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.internal.{config, Logging}
@@ -117,20 +119,38 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
/** Produces a unique block id and File suitable for storing local intermediate results. */
def createTempLocalBlock(): (TempLocalBlockId, File) = {
- var blockId = new TempLocalBlockId(UUID.randomUUID())
- while (getFile(blockId).exists()) {
- blockId = new TempLocalBlockId(UUID.randomUUID())
+ var blockId = TempLocalBlockId(UUID.randomUUID())
+ var tempLocalFile = getFile(blockId)
+ var count = 0
+ while (!canCreateFile(tempLocalFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
+ blockId = TempLocalBlockId(UUID.randomUUID())
+ tempLocalFile = getFile(blockId)
+ count += 1
}
- (blockId, getFile(blockId))
+ (blockId, tempLocalFile)
}
/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
- var blockId = new TempShuffleBlockId(UUID.randomUUID())
- while (getFile(blockId).exists()) {
- blockId = new TempShuffleBlockId(UUID.randomUUID())
+ var blockId = TempShuffleBlockId(UUID.randomUUID())
+ var tempShuffleFile = getFile(blockId)
+ var count = 0
+ while (!canCreateFile(tempShuffleFile) && count < Utils.MAX_DIR_CREATION_ATTEMPTS) {
+ blockId = TempShuffleBlockId(UUID.randomUUID())
+ tempShuffleFile = getFile(blockId)
+ count += 1
+ }
+ (blockId, tempShuffleFile)
+ }
+
+ private def canCreateFile(file: File): Boolean = {
+ try {
+ file.createNewFile()
+ } catch {
+ case NonFatal(_) =>
+ logError("Failed to create temporary block file: " + file.getAbsoluteFile)
+ false
}
- (blockId, getFile(blockId))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 2488197814ff..fb43af357f7b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -25,6 +25,7 @@ import scala.xml.Node
import org.apache.spark.status.{AppStatusStore, StreamBlockData}
import org.apache.spark.status.api.v1
import org.apache.spark.ui._
+import org.apache.spark.ui.storage.ToolTips._
import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
@@ -56,7 +57,8 @@ private[ui] class StoragePage(parent: SparkUITab, store: AppStatusStore) extends
rddHeader,
rddRow(request, _: v1.RDDStorageInfo),
rdds,
- id = Some("storage-by-rdd-table"))}
+ id = Some("storage-by-rdd-table"),
+ tooltipHeaders = tooltips)}
}
@@ -72,6 +74,16 @@ private[ui] class StoragePage(parent: SparkUITab, store: AppStatusStore) extends
"Size in Memory",
"Size on Disk")
+ /** Tooltips for header fields of the RDD table */
+ val tooltips = Seq(
+ None,
+ Some(RDD_NAME),
+ Some(STORAGE_LEVEL),
+ Some(CACHED_PARTITIONS),
+ Some(FRACTION_CACHED),
+ Some(SIZE_IN_MEMORY),
+ Some(SIZE_ON_DISK))
+
/** Render an HTML row representing an RDD */
private def rddRow(request: HttpServletRequest, rdd: v1.RDDStorageInfo): Seq[Node] = {
// scalastyle:off
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/storage/ToolTips.scala
new file mode 100644
index 000000000000..4677eba63c83
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/ToolTips.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+private[ui] object ToolTips {
+
+ val RDD_NAME =
+ "Name of the persisted RDD"
+
+ val STORAGE_LEVEL =
+ "StorageLevel displays where the persisted RDD is stored, " +
+ "format of the persisted RDD (serialized or de-serialized) and" +
+ "replication factor of the persisted RDD"
+
+ val CACHED_PARTITIONS =
+ "Number of partitions cached"
+
+ val FRACTION_CACHED =
+ "Fraction of total partitions cached"
+
+ val SIZE_IN_MEMORY =
+ "Total size of partitions in memory"
+
+ val SIZE_ON_DISK =
+ "Total size of partitions on the disk"
+}
+
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f853ec836836..723fbdf73f8d 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -95,7 +95,7 @@ private[spark] object Utils extends Logging {
*/
val DEFAULT_DRIVER_MEM_MB = JavaUtils.DEFAULT_DRIVER_MEM_MB.toInt
- private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
+ val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
/** Scheme used for files that are locally available on worker nodes in the cluster. */
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index 773c390175b6..fb8523856da6 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -323,7 +323,7 @@ public static class InProcessTestApp {
public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
- assertEquals(args[0], "hello");
+ assertEquals("hello", args[0]);
new SparkContext().stop();
synchronized (LOCK) {
@@ -340,7 +340,7 @@ public static class ErrorInProcessTestApp {
public static void main(String[] args) {
assertNotEquals(0, args.length);
- assertEquals(args[0], "hello");
+ assertEquals("hello", args[0]);
throw DUMMY_EXCEPTION;
}
}
diff --git a/core/src/test/java/org/apache/spark/util/SerializableConfigurationSuite.java b/core/src/test/java/org/apache/spark/util/SerializableConfigurationSuite.java
index 0944d681599a..28d038a524c8 100644
--- a/core/src/test/java/org/apache/spark/util/SerializableConfigurationSuite.java
+++ b/core/src/test/java/org/apache/spark/util/SerializableConfigurationSuite.java
@@ -50,6 +50,6 @@ public void testSerializableConfiguration() {
hadoopConfiguration.set("test.property", "value");
SerializableConfiguration scs = new SerializableConfiguration(hadoopConfiguration);
SerializableConfiguration actual = rdd.map(val -> scs).collect().get(0);
- assertEquals(actual.value().get("test.property"), "value");
+ assertEquals("value", actual.value().get("test.property"));
}
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index d5b1a1c5f547..43977717f6c9 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -222,7 +222,7 @@ public void testSortingEmptyArrays() throws Exception {
public void testSortTimeMetric() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
long prevSortTime = sorter.getSortTimeNanos();
- assertEquals(prevSortTime, 0);
+ assertEquals(0, prevSortTime);
sorter.insertRecord(null, 0, 0, 0, false);
sorter.spill();
@@ -230,7 +230,7 @@ public void testSortTimeMetric() throws Exception {
prevSortTime = sorter.getSortTimeNanos();
sorter.spill(); // no sort needed
- assertEquals(sorter.getSortTimeNanos(), prevSortTime);
+ assertEquals(prevSortTime, sorter.getSortTimeNanos());
sorter.insertRecord(null, 0, 0, 0, false);
UnsafeSorterIterator iter = sorter.getSortedIterator();
diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
new file mode 100644
index 000000000000..24fa01736365
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import com.codahale.metrics.Gauge
+import com.google.common.io.Files
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{mock, spy, verify, when}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{ExecutorPlugin => _, _}
+import org.apache.spark.api.plugin._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.Utils
+
+class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with LocalSparkContext {
+
+ override def afterEach(): Unit = {
+ TestSparkPlugin.reset()
+ super.afterEach()
+ }
+
+ test("plugin initialization and communication") {
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName()))
+
+ TestSparkPlugin.extraConf = Map("foo" -> "bar", "bar" -> "baz").asJava
+
+ sc = new SparkContext(conf)
+
+ assert(TestSparkPlugin.driverPlugin != null)
+ verify(TestSparkPlugin.driverPlugin).init(meq(sc), any())
+
+ assert(TestSparkPlugin.executorPlugin != null)
+ verify(TestSparkPlugin.executorPlugin).init(any(), meq(TestSparkPlugin.extraConf))
+
+ assert(TestSparkPlugin.executorContext != null)
+
+ // One way messages don't block, so need to loop checking whether it arrives.
+ TestSparkPlugin.executorContext.send("oneway")
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ verify(TestSparkPlugin.driverPlugin).receive("oneway")
+ }
+
+ assert(TestSparkPlugin.executorContext.ask("ask") === "reply")
+
+ val err = intercept[Exception] {
+ TestSparkPlugin.executorContext.ask("unknown message")
+ }
+ assert(err.getMessage().contains("unknown message"))
+
+ // It should be possible for the driver plugin to send a message to itself, even if that doesn't
+ // make a whole lot of sense. It at least allows the same context class to be used on both
+ // sides.
+ assert(TestSparkPlugin.driverContext != null)
+ assert(TestSparkPlugin.driverContext.ask("ask") === "reply")
+
+ val metricSources = sc.env.metricsSystem
+ .getSourcesByName(s"plugin.${classOf[TestSparkPlugin].getName()}")
+ assert(metricSources.size === 2)
+
+ def findMetric(name: String): Int = {
+ val allFound = metricSources.filter(_.metricRegistry.getGauges().containsKey(name))
+ assert(allFound.size === 1)
+ allFound.head.metricRegistry.getGauges().get(name).asInstanceOf[Gauge[Int]].getValue()
+ }
+
+ assert(findMetric("driverMetric") === 42)
+ assert(findMetric("executorMetric") === 84)
+
+ sc.stop()
+ sc = null
+
+ verify(TestSparkPlugin.driverPlugin).shutdown()
+ verify(TestSparkPlugin.executorPlugin).shutdown()
+ }
+
+ test("do nothing if plugins are not configured") {
+ val conf = new SparkConf()
+ val env = mock(classOf[SparkEnv])
+ when(env.conf).thenReturn(conf)
+ assert(PluginContainer(env) === None)
+ }
+
+ test("merging of config options") {
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName()))
+ .set(DEFAULT_PLUGINS_LIST, classOf[TestSparkPlugin].getName())
+
+ assert(conf.get(PLUGINS).size === 2)
+
+ sc = new SparkContext(conf)
+ // Just check plugin is loaded. The plugin code below checks whether a single copy was loaded.
+ assert(TestSparkPlugin.driverPlugin != null)
+ }
+
+ test("plugin initialization in non-local mode") {
+ val path = Utils.createTempDir()
+
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local-cluster[2,1,1024]")
+ .set(PLUGINS, Seq(classOf[NonLocalModeSparkPlugin].getName()))
+ .set(NonLocalModeSparkPlugin.TEST_PATH_CONF, path.getAbsolutePath())
+
+ sc = new SparkContext(conf)
+ TestUtils.waitUntilExecutorsUp(sc, 2, 10000)
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ val children = path.listFiles()
+ assert(children != null)
+ assert(children.length >= 3)
+ }
+ }
+}
+
+class NonLocalModeSparkPlugin extends SparkPlugin {
+
+ override def driverPlugin(): DriverPlugin = {
+ new DriverPlugin() {
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
+ NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID())
+ Map.empty.asJava
+ }
+ }
+ }
+
+ override def executorPlugin(): ExecutorPlugin = {
+ new ExecutorPlugin() {
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID())
+ }
+ }
+ }
+}
+
+object NonLocalModeSparkPlugin {
+ val TEST_PATH_CONF = "spark.nonLocalPlugin.path"
+
+ def writeFile(conf: SparkConf, id: String): Unit = {
+ val path = conf.get(TEST_PATH_CONF)
+ Files.write(id, new File(path, id), StandardCharsets.UTF_8)
+ }
+}
+
+class TestSparkPlugin extends SparkPlugin {
+
+ override def driverPlugin(): DriverPlugin = {
+ val p = new TestDriverPlugin()
+ require(TestSparkPlugin.driverPlugin == null, "Driver plugin already initialized.")
+ TestSparkPlugin.driverPlugin = spy(p)
+ TestSparkPlugin.driverPlugin
+ }
+
+ override def executorPlugin(): ExecutorPlugin = {
+ val p = new TestExecutorPlugin()
+ require(TestSparkPlugin.executorPlugin == null, "Executor plugin already initialized.")
+ TestSparkPlugin.executorPlugin = spy(p)
+ TestSparkPlugin.executorPlugin
+ }
+
+}
+
+private class TestDriverPlugin extends DriverPlugin {
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
+ TestSparkPlugin.driverContext = ctx
+ TestSparkPlugin.extraConf
+ }
+
+ override def registerMetrics(appId: String, ctx: PluginContext): Unit = {
+ ctx.metricRegistry().register("driverMetric", new Gauge[Int] {
+ override def getValue(): Int = 42
+ })
+ }
+
+ override def receive(msg: AnyRef): AnyRef = msg match {
+ case "oneway" => null
+ case "ask" => "reply"
+ case other => throw new IllegalArgumentException(s"unknown: $other")
+ }
+
+}
+
+private class TestExecutorPlugin extends ExecutorPlugin {
+
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ ctx.metricRegistry().register("executorMetric", new Gauge[Int] {
+ override def getValue(): Int = 84
+ })
+ TestSparkPlugin.executorContext = ctx
+ }
+
+}
+
+private object TestSparkPlugin {
+ var driverPlugin: TestDriverPlugin = _
+ var driverContext: PluginContext = _
+
+ var executorPlugin: TestExecutorPlugin = _
+ var executorContext: PluginContext = _
+
+ var extraConf: JMap[String, String] = _
+
+ def reset(): Unit = {
+ driverPlugin = null
+ driverContext = null
+ executorPlugin = null
+ executorContext = null
+ extraConf = null
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 6bf163506e0c..a289dddbdc9e 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -155,6 +155,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
val jobProps = new Properties()
+ jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription")
jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool")
@@ -163,7 +164,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
check[JobDataWrapper](1) { job =>
assert(job.info.jobId === 1)
assert(job.info.name === stages.last.name)
- assert(job.info.description === None)
+ assert(job.info.description === Some("jobDescription"))
assert(job.info.status === JobExecutionStatus.RUNNING)
assert(job.info.submissionTime === Some(new Date(time)))
assert(job.info.jobGroup === Some("jobGroup"))
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index c757dee43808..ccc525e85483 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -51,7 +51,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
override def beforeEach(): Unit = {
super.beforeEach()
val conf = testConf.clone
- conf.set("spark.local.dir", rootDirs)
+ conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1")
diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
}
@@ -90,4 +90,45 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
for (i <- 0 until numBytes) writer.write(i)
writer.close()
}
+
+ test("temporary shuffle/local file should be able to handle disk failures") {
+ try {
+ // the following two lines pre-create subdirectories under each root dir of block manager
+ diskBlockManager.getFile("1")
+ diskBlockManager.getFile("2")
+
+ val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
+ val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
+ assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists")
+ assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists")
+
+ // partial disks damaged
+ rootDir0.setExecutable(false)
+ val tempShuffleFile2 = diskBlockManager.createTempShuffleBlock()._2
+ val tempLocalFile2 = diskBlockManager.createTempLocalBlock()._2
+ // It's possible that after 10 retries we still not able to find the healthy disk. we need to
+ // remove the flakiness of these two asserts
+ if (tempShuffleFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
+ assert(tempShuffleFile2.exists(),
+ "There is only one bad disk, so temp shuffle file should be created")
+ }
+ if (tempLocalFile2.getParentFile.getParentFile.getParent === rootDir1.getAbsolutePath) {
+ assert(tempLocalFile2.exists(),
+ "There is only one bad disk, so temp local file should be created")
+ }
+
+ // all disks damaged
+ rootDir1.setExecutable(false)
+ val tempShuffleFile3 = diskBlockManager.createTempShuffleBlock()._2
+ val tempLocalFile3 = diskBlockManager.createTempLocalBlock()._2
+ assert(!tempShuffleFile3.exists(),
+ "All disks are broken, so there should be no temp shuffle file created")
+ assert(!tempLocalFile3.exists(),
+ "All disks are broken, so there should be no temp local file created")
+ } finally {
+ rootDir0.setExecutable(true)
+ rootDir1.setExecutable(true)
+ }
+
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
index 06f01a60868f..f93ecd3b006b 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui.storage
import javax.servlet.http.HttpServletRequest
import org.mockito.Mockito._
+import scala.xml.{Node, Text}
import org.apache.spark.SparkFunSuite
import org.apache.spark.status.StreamBlockData
@@ -74,7 +75,21 @@ class StoragePageSuite extends SparkFunSuite {
"Fraction Cached",
"Size in Memory",
"Size on Disk")
- assert((xmlNodes \\ "th").map(_.text) === headers)
+
+ val headerRow: Seq[Node] = {
+ headers.view.zipWithIndex.map { x =>
+ storagePage.tooltips(x._2) match {
+ case Some(tooltip) =>
+ spark.rpc.askTimeout) elapses.
+ *
+
+ {Text(x._1)}
+
+
+ case None => {Text(x._1)}
+ }
+ }.toList
+ }
+ assert((xmlNodes \\ "th").map(_.text) === headerRow.map(_.text))
assert((xmlNodes \\ "tr").size === 3)
assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index e12dc994b084..73f461255de4 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -118,3 +118,4 @@ announce.tmpl
vote.tmpl
SessionManager.java
SessionHandler.java
+GangliaReporter.java
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index 945686de4996..804a178a5fe2 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -30,6 +30,8 @@
spark.plugins
+- spark.plugins.defaultList
+
+Both take a comma-separated list of class names that implement the
+org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it's
+possible for one list to be placed in the Spark default config file, allowing users to
+easily add other plugins from the command line without overwriting the config file's list. Duplicate
+plugins are ignored.
+
+Distribution of the jar files containing the plugin code is currently not done by Spark. The user
+or admin should make sure that the jar files are available to Spark applications, for example, by
+including the plugin jar with the Spark distribution. The exception to this rule is the YARN
+backend, where the --jars command line option (or equivalent config entry) can be
+used to make the plugin code available to both executors and cluster-mode drivers.
diff --git a/docs/pyspark-migration-guide.md b/docs/pyspark-migration-guide.md
index 889941c37bf4..1b8d1fc1c577 100644
--- a/docs/pyspark-migration-guide.md
+++ b/docs/pyspark-migration-guide.md
@@ -84,6 +84,9 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.
- Since Spark 3.0, `createDataFrame(..., verifySchema=True)` validates `LongType` as well in PySpark. Previously, `LongType` was not verified and resulted in `None` in case the value overflows. To restore this behavior, `verifySchema` can be set to `False` to disable the validation.
+ - Since Spark 3.0, `Column.getItem` is fixed such that it does not call `Column.apply`. Consequently, if `Column` is used as an argument to `getItem`, the indexing operator should be used.
+ For example, `map_col.getItem(col('id'))` should be replaced with `map_col[col('id')]`.
+
## Upgrading from PySpark 2.3 to 2.4
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md
index e4ce3e938b75..f99b06494934 100644
--- a/docs/sql-data-sources-hive-tables.md
+++ b/docs/sql-data-sources-hive-tables.md
@@ -88,17 +88,17 @@ creating table, you can create a table using storage handler at Hive side, and u
inputFormat, outputFormat
- These 2 options specify the name of a corresponding `InputFormat` and `OutputFormat` class as a string literal,
- e.g. `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`. These 2 options must be appeared in a pair, and you can not
- specify them if you already specified the `fileFormat` option.
+ These 2 options specify the name of a corresponding
InputFormat and OutputFormat class as a string literal,
+ e.g. org.apache.hadoop.hive.ql.io.orc.OrcInputFormat. These 2 options must be appeared in a pair, and you can not
+ specify them if you already specified the fileFormat option.
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index c3502cbdea8e..b0d37b11c711 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -60,7 +60,7 @@ the following case-insensitive options:
The JDBC table that should be read from or written into. Note that when using it in the read
path anything that is valid in a serde
- This option specifies the name of a serde class. When the `fileFormat` option is specified, do not specify this option
- if the given `fileFormat` already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
+ This option specifies the name of a serde class. When the
fileFormat option is specified, do not specify this option
+ if the given fileFormat already include the information of serde. Currently "sequencefile", "textfile" and "rcfile"
don't include the serde information and you can use this option with these 3 fileFormats.
FROM clause of a SQL query can be used.
For example, instead of a full table you could also use a subquery in parentheses. It is not
- allowed to specify `dbtable` and `query` options at the same time.
+ allowed to specify dbtable and query options at the same time.
@@ -72,10 +72,10 @@ the following case-insensitive options:
SELECT <columns> FROM (<user_specified_query>) spark_gen_alias
Below are a couple of restrictions while using this option.
-
+ dbtable and query options at the same time. query and partitionColumn options at the same time. When specifying
+ partitionColumn option is required, the subquery can be specified using dbtable option instead and
+ partition columns can be qualified using the subquery alias provided as part of dbtable.
Example:
spark.read.format("jdbc")
diff --git a/docs/sql-data-sources-parquet.md b/docs/sql-data-sources-parquet.md
index b5309870f485..53a1111cd828 100644
--- a/docs/sql-data-sources-parquet.md
+++ b/docs/sql-data-sources-parquet.md
@@ -280,12 +280,12 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
spark.sql.parquet.compression.codecsnappy
- Sets the compression codec used when writing Parquet files. If either `compression` or
- `parquet.compression` is specified in the table-specific options/properties, the precedence would be
- `compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
+ Sets the compression codec used when writing Parquet files. If either
compression or
+ parquet.compression is specified in the table-specific options/properties, the precedence would be
+ compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include:
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
- Note that `zstd` requires `ZStandardCodec` to be installed before Hadoop 2.9.0, `brotli` requires
- `BrotliCodec` to be installed.
+ Note that zstd requires ZStandardCodec to be installed before Hadoop 2.9.0, brotli requires
+ BrotliCodec to be installed.
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index d03ca663e8e3..a97a4b04ded6 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -9,9 +9,9 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -218,6 +218,8 @@ license: |
- Since Spark 3.0, the `size` function returns `NULL` for the `NULL` input. In Spark version 2.4 and earlier, this function gives `-1` for the same input. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.sizeOfNull` to `true`.
+ - Since Spark 3.0, the interval literal syntax does not allow multiple from-to units anymore. For example, `SELECT INTERVAL '1-1' YEAR TO MONTH '2-2' YEAR TO MONTH'` throws parser exception.
+
## Upgrading from Spark SQL 2.4 to 2.4.1
- The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was
diff --git a/docs/sql-pyspark-pandas-with-arrow.md b/docs/sql-pyspark-pandas-with-arrow.md
index 7f01483d4058..d638278b4235 100644
--- a/docs/sql-pyspark-pandas-with-arrow.md
+++ b/docs/sql-pyspark-pandas-with-arrow.md
@@ -178,6 +178,41 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
+### Cogrouped Map
+
+Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to
+each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:
+
+* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
+* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional Tuple
+representing the key). The output of the function is a `pandas.DataFrame`.
+* Combine the pandas.DataFrames from all groups into a new `DataFrame`.
+
+To use `groupBy().cogroup().apply()`, the user needs to define the following:
+* A Python function that defines the computation for each cogroup.
+* A `StructType` object or a string that defines the schema of the output `DataFrame`.
+
+The column labels of the returned `pandas.DataFrame` must either match the field names in the
+defined output schema if specified as strings, or match the field data types by position if not
+strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
+on how to label columns when constructing a `pandas.DataFrame`.
+
+Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
+memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
+is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.
+
+The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.
+
+
+
+### Examples
+{% highlight sql %}
+-- Assumes `employee` table created with partitioned by column `grade`
+-- +-------+--------+--+
+-- | name | grade |
+-- +-------+--------+--+
+-- | sam | 1 |
+-- | suj | 2 |
+-- +-------+--------+--+
+
+ -- Show the details of the table
+SHOW TABLE EXTENDED LIKE `employee`;
++--------+---------+-----------+---------------------------------------------------------------
+|database|tableName|isTemporary| information
++--------+---------+-----------+---------------------------------------------------------------
+|default |employee |false |Database: default
+ Table: employee
+ Owner: root
+ Created Time: Fri Aug 30 15:10:21 IST 2019
+ Last Access: Thu Jan 01 05:30:00 IST 1970
+ Created By: Spark 3.0.0-SNAPSHOT
+ Type: MANAGED
+ Provider: hive
+ Table Properties: [transient_lastDdlTime=1567158021]
+ Location: file:/opt/spark1/spark/spark-warehouse/employee
+ Serde Library: org.apache.hadoop.hive.serde2.lazy
+ .LazySimpleSerDe
+ InputFormat: org.apache.hadoop.mapred.TextInputFormat
+ OutputFormat: org.apache.hadoop.hive.ql.io
+ .HiveIgnoreKeyTextOutputFormat
+ Storage Properties: [serialization.format=1]
+ Partition Provider: Catalog
+ Partition Columns: [`grade`]
+ Schema: root
+ |-- name: string (nullable = true)
+ |-- grade: integer (nullable = true)
+
++--------+---------+-----------+---------------------------------------------------------------
+
+-- showing the multiple table details with pattern matching
+SHOW TABLE EXTENDED LIKE `employe*`;
++--------+---------+-----------+---------------------------------------------------------------
+|database|tableName|isTemporary| information
++--------+---------+-----------+---------------------------------------------------------------
+|default |employee |false |Database: default
+ Table: employee
+ Owner: root
+ Created Time: Fri Aug 30 15:10:21 IST 2019
+ Last Access: Thu Jan 01 05:30:00 IST 1970
+ Created By: Spark 3.0.0-SNAPSHOT
+ Type: MANAGED
+ Provider: hive
+ Table Properties: [transient_lastDdlTime=1567158021]
+ Location: file:/opt/spark1/spark/spark-warehouse/employee
+ Serde Library: org.apache.hadoop.hive.serde2.lazy
+ .LazySimpleSerDe
+ InputFormat: org.apache.hadoop.mapred.TextInputFormat
+ OutputFormat: org.apache.hadoop.hive.ql.io
+ .HiveIgnoreKeyTextOutputFormat
+ Storage Properties: [serialization.format=1]
+ Partition Provider: Catalog
+ Partition Columns: [`grade`]
+ Schema: root
+ |-- name: string (nullable = true)
+ |-- grade: integer (nullable = true)
+
+|default |employee1|false |Database: default
+ Table: employee1
+ Owner: root
+ Created Time: Fri Aug 30 15:22:33 IST 2019
+ Last Access: Thu Jan 01 05:30:00 IST 1970
+ Created By: Spark 3.0.0-SNAPSHOT
+ Type: MANAGED
+ Provider: hive
+ Table Properties: [transient_lastDdlTime=1567158753]
+ Location: file:/opt/spark1/spark/spark-warehouse/employee1
+ Serde Library: org.apache.hadoop.hive.serde2.lazy
+ .LazySimpleSerDe
+ InputFormat: org.apache.hadoop.mapred.TextInputFormat
+ OutputFormat: org.apache.hadoop.hive.ql.io
+ .HiveIgnoreKeyTextOutputFormat
+ Storage Properties: [serialization.format=1]
+ Partition Provider: Catalog
+ Schema: root
+ |-- name: string (nullable = true)
+
++--------+---------+----------+----------------------------------------------------------------
+
+-- show partition file system details
+SHOW TABLE EXTENDED IN `default` LIKE `employee` PARTITION (`grade=1`);
++--------+---------+-----------+---------------------------------------------------------------
+|database|tableName|isTemporary| information
++--------+---------+-----------+---------------------------------------------------------------
+|default |employee |false | Partition Values: [grade=1]
+ Location: file:/opt/spark1/spark/spark-warehouse/employee
+ /grade=1
+ Serde Library: org.apache.hadoop.hive.serde2.lazy
+ .LazySimpleSerDe
+ InputFormat: org.apache.hadoop.mapred.TextInputFormat
+ OutputFormat: org.apache.hadoop.hive.ql.io
+ .HiveIgnoreKeyTextOutputFormat
+ Storage Properties: [serialization.format=1]
+ Partition Parameters: {rawDataSize=-1, numFiles=1,
+ transient_lastDdlTime=1567158221, totalSize=4,
+ COLUMN_STATS_ACCURATE=false, numRows=-1}
+ Created Time: Fri Aug 30 15:13:41 IST 2019
+ Last Access: Thu Jan 01 05:30:00 IST 1970
+ Partition Statistics: 4 bytes
+ |
++--------+---------+-----------+---------------------------------------------------------------
+
+-- show partition file system details with regex fails as shown below
+SHOW TABLE EXTENDED IN `default` LIKE `empl*` PARTITION (`grade=1`);
+Error: Error running query: org.apache.spark.sql.catalyst.analysis.NoSuchTableException:
+ Table or view 'emplo*' not found in database 'default'; (state=,code=0)
+
+{% endhighlight %}
+### Related Statements
+- [CREATE TABLE](sql-ref-syntax-ddl-create-table.html)
+- [DESCRIBE TABLE](sql-ref-syntax-aux-describe-table.html)
diff --git a/docs/sql-ref-syntax-ddl-drop-table.md b/docs/sql-ref-syntax-ddl-drop-table.md
index a036e66c3906..f9129d5114fa 100644
--- a/docs/sql-ref-syntax-ddl-drop-table.md
+++ b/docs/sql-ref-syntax-ddl-drop-table.md
@@ -19,4 +19,69 @@ license: |
limitations under the License.
---
-**This page is under construction**
+### Description
+
+`DROP TABLE` deletes the table and removes the directory associated with the table from the file system
+if the table is not `EXTERNAL` table. If the table is not present it throws an exception.
+
+In case of an external table, only the associated metadata information is removed from the metastore database.
+
+### Syntax
+{% highlight sql %}
+DROP TABLE [IF EXISTS] [database_name.]table_name
+{% endhighlight %}
+
+### Parameter
+IN|FROM database_nameLIKE string_pattern
+
+ PARTITION(partition_spec)
+
+
+### Example
+{% highlight sql %}
+-- Assumes a table named `employeetable` exists.
+DROP TABLE employeetable;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+-- Assumes a table named `employeetable` exists in the `userdb` database
+DROP TABLE userdb.employeetable;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+-- Assumes a table named `employeetable` does not exists.
+-- Throws exception
+DROP TABLE employeetable;
+Error: org.apache.spark.sql.AnalysisException: Table or view not found: employeetable;
+(state=,code=0)
+
+-- Assumes a table named `employeetable` does not exists,Try with IF EXISTS
+-- this time it will not throw exception
+DROP TABLE IF EXISTS employeetable;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+{% endhighlight %}
+
+### Related Statements
+- [CREATE TABLE](sql-ref-syntax-ddl-create-table.html)
+- [CREATE DATABASE](sql-ref-syntax-ddl-create-database.html)
+- [DROP DATABASE](sql-ref-syntax-ddl-drop-database.html)
+
+
diff --git a/docs/sql-ref-syntax-ddl-drop-view.md b/docs/sql-ref-syntax-ddl-drop-view.md
index 9ad22500fd9e..f095a3456772 100644
--- a/docs/sql-ref-syntax-ddl-drop-view.md
+++ b/docs/sql-ref-syntax-ddl-drop-view.md
@@ -19,4 +19,63 @@ license: |
limitations under the License.
---
-**This page is under construction**
+### Description
+`DROP VIEW` removes the metadata associated with a specified view from the catalog.
+
+### Syntax
+{% highlight sql %}
+DROP VIEW [IF EXISTS] [database_name.]view_name
+{% endhighlight %}
+
+### Parameter
+IF EXISTSdatabase_nametable_name
+
+
+### Example
+{% highlight sql %}
+-- Assumes a view named `employeeView` exists.
+DROP VIEW employeeView;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+-- Assumes a view named `employeeView` exists in the `userdb` database
+DROP VIEW userdb.employeeView;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+-- Assumes a view named `employeeView` does not exists.
+-- Throws exception
+DROP VIEW employeeView;
+Error: org.apache.spark.sql.AnalysisException: Table or view not found: employeeView;
+(state=,code=0)
+
+-- Assumes a view named `employeeView` does not exists,Try with IF EXISTS
+-- this time it will not throw exception
+DROP VIEW IF EXISTS employeeView;
++---------+--+
+| Result |
++---------+--+
++---------+--+
+
+{% endhighlight %}
+
+### Related Statements
+- [CREATE VIEW](sql-ref-syntax-ddl-create-view.html)
+- [CREATE DATABASE](sql-ref-syntax-ddl-create-database.html)
+- [DROP DATABASE](sql-ref-syntax-ddl-drop-database.html)
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index badf0429545f..8c17de92f348 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -473,8 +473,8 @@ The following configurations are optional:
IF EXISTSdatabase_nameview_nameDesired minimum number of partitions to read from Kafka.
By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka.
If you set this option to a value greater than your topicPartitions, Spark will divvy up large
- Kafka partitions to smaller pieces. Please note that this configuration is like a `hint`: the
- number of Spark tasks will be **approximately** `minPartitions`. It can be less or more depending on
+ Kafka partitions to smaller pieces. Please note that this configuration is like a
hint: the
+ number of Spark tasks will be approximately minPartitions. It can be less or more depending on
rounding errors or Kafka partitions that didn't receive any new data.
@@ -482,7 +482,7 @@ The following configurations are optional:
string
spark-kafka-source
streaming and batch
- Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
+ Prefix of consumer group identifiers (
group.id) that are generated by structured streaming
queries. If "kafka.group.id" is set, this option will be ignored.
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 2a405f36fd5f..01679e5defe1 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
+ cleanSource: option to clean up completed files after processing.
+ Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
+ Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
+ NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
+ NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
For file-format-specific options, see the related methods in DataStreamReader
(Scala/Java/Python/Append, Update, Complete
Append mode uses watermark to drop old aggregation state. But the output of a
- windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
+ windowed aggregation is delayed the late threshold specified in withWatermark() as by
the modes semantics, rows can be added to the Result Table only once after they are
finalized (i.e. after watermark is crossed). See the
Late Data section for more details.
@@ -2324,7 +2331,7 @@ Here are the different kinds of triggers that are supported.
One-time micro-batch
- The query will execute *only one* micro-batch to process all the available data and then
+ The query will execute only one micro-batch to process all the available data and then
stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
process everything that is available since the last period, and then shutdown the
cluster. In some case, this may lead to significant cost savings.
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index de8d4f755de6..d5a3173ff9c0 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -258,6 +258,36 @@ def filter_func(batch_iter):
# $example off:map_iter_pandas_udf$
+def cogrouped_map_pandas_udf_example(spark):
+ # $example on:cogrouped_map_pandas_udf$
+ import pandas as pd
+
+ from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+ df1 = spark.createDataFrame(
+ [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
+ ("time", "id", "v1"))
+
+ df2 = spark.createDataFrame(
+ [(20000101, 1, "x"), (20000101, 2, "y")],
+ ("time", "id", "v2"))
+
+ @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
+ def asof_join(l, r):
+ return pd.merge_asof(l, r, on="time", by="id")
+
+ df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
+ # +--------+---+---+---+
+ # | time| id| v1| v2|
+ # +--------+---+---+---+
+ # |20000101| 1|1.0| x|
+ # |20000102| 1|3.0| x|
+ # |20000101| 2|2.0| y|
+ # |20000102| 2|4.0| y|
+ # +--------+---+---+---+
+ # $example off:cogrouped_map_pandas_udf$
+
+
if __name__ == "__main__":
spark = SparkSession \
.builder \
@@ -276,5 +306,7 @@ def filter_func(batch_iter):
grouped_agg_pandas_udf_example(spark)
print("Running pandas_udf map iterator example")
map_iter_pandas_udf_example(spark)
+ print("Running pandas_udf cogrouped map example")
+ cogrouped_map_pandas_udf_example(spark)
spark.stop()
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
index 5bdc1b5fe9f3..8b907065af1d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -25,9 +25,9 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecor
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection}
-import org.apache.spark.sql.types.{BinaryType, IntegerType, StringType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection}
+import org.apache.spark.sql.types.BinaryType
/**
* Writes out data in a single Spark task, without any concerns about how
@@ -116,66 +116,13 @@ private[kafka010] abstract class KafkaRowWriter(
}
private def createProjection = {
- val topicExpression = topic.map(Literal(_)).orElse {
- inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
- }.getOrElse {
- throw new IllegalStateException(s"topic option required when no " +
- s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
- }
- topicExpression.dataType match {
- case StringType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
- s"attribute unsupported type $t. ${KafkaWriter.TOPIC_ATTRIBUTE_NAME} " +
- s"must be a ${StringType.catalogString}")
- }
- val keyExpression = inputSchema.find(_.name == KafkaWriter.KEY_ATTRIBUTE_NAME)
- .getOrElse(Literal(null, BinaryType))
- keyExpression.dataType match {
- case StringType | BinaryType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.KEY_ATTRIBUTE_NAME} " +
- s"attribute unsupported type ${t.catalogString}")
- }
- val valueExpression = inputSchema
- .find(_.name == KafkaWriter.VALUE_ATTRIBUTE_NAME).getOrElse(
- throw new IllegalStateException("Required attribute " +
- s"'${KafkaWriter.VALUE_ATTRIBUTE_NAME}' not found")
- )
- valueExpression.dataType match {
- case StringType | BinaryType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.VALUE_ATTRIBUTE_NAME} " +
- s"attribute unsupported type ${t.catalogString}")
- }
- val headersExpression = inputSchema
- .find(_.name == KafkaWriter.HEADERS_ATTRIBUTE_NAME).getOrElse(
- Literal(CatalystTypeConverters.convertToCatalyst(null),
- KafkaRecordToRowConverter.headersType)
- )
- headersExpression.dataType match {
- case KafkaRecordToRowConverter.headersType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.HEADERS_ATTRIBUTE_NAME} " +
- s"attribute unsupported type ${t.catalogString}")
- }
- val partitionExpression =
- inputSchema.find(_.name == KafkaWriter.PARTITION_ATTRIBUTE_NAME)
- .getOrElse(Literal(null, IntegerType))
- partitionExpression.dataType match {
- case IntegerType => // good
- case t =>
- throw new IllegalStateException(s"${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
- s"attribute unsupported type $t. ${KafkaWriter.PARTITION_ATTRIBUTE_NAME} " +
- s"must be a ${IntegerType.catalogString}")
- }
UnsafeProjection.create(
Seq(
- topicExpression,
- Cast(keyExpression, BinaryType),
- Cast(valueExpression, BinaryType),
- headersExpression,
- partitionExpression
+ KafkaWriter.topicExpression(inputSchema, topic),
+ Cast(KafkaWriter.keyExpression(inputSchema), BinaryType),
+ Cast(KafkaWriter.valueExpression(inputSchema), BinaryType),
+ KafkaWriter.headersExpression(inputSchema),
+ KafkaWriter.partitionExpression(inputSchema)
),
inputSchema
)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index 9b0d11f137ce..5ef4b3a1c19d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.types.{BinaryType, IntegerType, MapType, StringType}
+import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType, StringType}
import org.apache.spark.util.Utils
/**
@@ -49,51 +49,14 @@ private[kafka010] object KafkaWriter extends Logging {
schema: Seq[Attribute],
kafkaParameters: ju.Map[String, Object],
topic: Option[String] = None): Unit = {
- schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
- if (topic.isEmpty) {
- throw new AnalysisException(s"topic option required when no " +
- s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
- s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
- } else {
- Literal.create(topic.get, StringType)
- }
- ).dataType match {
- case StringType => // good
- case _ =>
- throw new AnalysisException(s"Topic type must be a ${StringType.catalogString}")
- }
- schema.find(_.name == KEY_ATTRIBUTE_NAME).getOrElse(
- Literal(null, StringType)
- ).dataType match {
- case StringType | BinaryType => // good
- case _ =>
- throw new AnalysisException(s"$KEY_ATTRIBUTE_NAME attribute type " +
- s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
- }
- schema.find(_.name == VALUE_ATTRIBUTE_NAME).getOrElse(
- throw new AnalysisException(s"Required attribute '$VALUE_ATTRIBUTE_NAME' not found")
- ).dataType match {
- case StringType | BinaryType => // good
- case _ =>
- throw new AnalysisException(s"$VALUE_ATTRIBUTE_NAME attribute type " +
- s"must be a ${StringType.catalogString} or ${BinaryType.catalogString}")
- }
- schema.find(_.name == HEADERS_ATTRIBUTE_NAME).getOrElse(
- Literal(CatalystTypeConverters.convertToCatalyst(null),
- KafkaRecordToRowConverter.headersType)
- ).dataType match {
- case KafkaRecordToRowConverter.headersType => // good
- case _ =>
- throw new AnalysisException(s"$HEADERS_ATTRIBUTE_NAME attribute type " +
- s"must be a ${KafkaRecordToRowConverter.headersType.catalogString}")
- }
- schema.find(_.name == PARTITION_ATTRIBUTE_NAME).getOrElse(
- Literal(null, IntegerType)
- ).dataType match {
- case IntegerType => // good
- case _ =>
- throw new AnalysisException(s"$PARTITION_ATTRIBUTE_NAME attribute type " +
- s"must be an ${IntegerType.catalogString}")
+ try {
+ topicExpression(schema, topic)
+ keyExpression(schema)
+ valueExpression(schema)
+ headersExpression(schema)
+ partitionExpression(schema)
+ } catch {
+ case e: IllegalStateException => throw new AnalysisException(e.getMessage)
}
}
@@ -110,4 +73,53 @@ private[kafka010] object KafkaWriter extends Logging {
finallyBlock = writeTask.close())
}
}
+
+ def topicExpression(schema: Seq[Attribute], topic: Option[String] = None): Expression = {
+ topic.map(Literal(_)).getOrElse(
+ expression(schema, TOPIC_ATTRIBUTE_NAME, Seq(StringType)) {
+ throw new IllegalStateException(s"topic option required when no " +
+ s"'${TOPIC_ATTRIBUTE_NAME}' attribute is present. Use the " +
+ s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a topic.")
+ }
+ )
+ }
+
+ def keyExpression(schema: Seq[Attribute]): Expression = {
+ expression(schema, KEY_ATTRIBUTE_NAME, Seq(StringType, BinaryType)) {
+ Literal(null, BinaryType)
+ }
+ }
+
+ def valueExpression(schema: Seq[Attribute]): Expression = {
+ expression(schema, VALUE_ATTRIBUTE_NAME, Seq(StringType, BinaryType)) {
+ throw new IllegalStateException(s"Required attribute '${VALUE_ATTRIBUTE_NAME}' not found")
+ }
+ }
+
+ def headersExpression(schema: Seq[Attribute]): Expression = {
+ expression(schema, HEADERS_ATTRIBUTE_NAME, Seq(KafkaRecordToRowConverter.headersType)) {
+ Literal(CatalystTypeConverters.convertToCatalyst(null),
+ KafkaRecordToRowConverter.headersType)
+ }
+ }
+
+ def partitionExpression(schema: Seq[Attribute]): Expression = {
+ expression(schema, PARTITION_ATTRIBUTE_NAME, Seq(IntegerType)) {
+ Literal(null, IntegerType)
+ }
+ }
+
+ private def expression(
+ schema: Seq[Attribute],
+ attrName: String,
+ desired: Seq[DataType])(
+ default: => Expression): Expression = {
+ val expr = schema.find(_.name == attrName).getOrElse(default)
+ if (!desired.exists(_.sameType(expr.dataType))) {
+ throw new IllegalStateException(s"$attrName attribute unsupported type " +
+ s"${expr.dataType.catalogString}. $attrName must be a(n) " +
+ s"${desired.map(_.catalogString).mkString(" or ")}")
+ }
+ expr
+ }
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index cbf4952406c0..031f609cb92b 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.kafka010
import java.util.Locale
+import scala.reflect.ClassTag
+
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.util.Utils
@@ -192,24 +195,9 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)
- /* No topic field or topic option */
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = "CAST(null as STRING) as topic", "value"
- )
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- ex = writer.exception.get
- }
- } finally {
- writer.stop()
+ runAndVerifyException[StreamingQueryException](inputTopic, "null topic present in the data.") {
+ createKafkaWriter(input.toDF())(withSelectExpr = "CAST(null as STRING) as topic", "value")
}
- assert(ex.getCause.getCause.getMessage
- .toLowerCase(Locale.ROOT)
- .contains("null topic present in the data."))
}
test("streaming - write data with bad schema") {
@@ -226,24 +214,10 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
val topic = newTopic()
testUtils.createTopic(topic)
- val ex = intercept[AnalysisException] {
- /* No topic field or topic option */
- createKafkaWriter(input.toDF())(
- withSelectExpr = "value as key", "value"
- )
- }
- assert(ex.getMessage
- .toLowerCase(Locale.ROOT)
- .contains("topic option required when no 'topic' attribute is present"))
-
- val ex2 = intercept[AnalysisException] {
- /* No value field */
- createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "value as key"
- )
- }
- assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
- "required attribute 'value' not found"))
+ assertWrongSchema(topic, input, Seq("value as key", "value"),
+ "topic option required when no 'topic' attribute is present")
+ assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as key"),
+ "required attribute 'value' not found")
}
test("streaming - write data with valid schema but wrong types") {
@@ -258,43 +232,18 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value as STRING) value")
+ .toDF()
val topic = newTopic()
testUtils.createTopic(topic)
- val ex = intercept[AnalysisException] {
- /* topic field wrong type */
- createKafkaWriter(input.toDF())(
- withSelectExpr = s"CAST('1' as INT) as topic", "value"
- )
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
-
- val ex2 = intercept[AnalysisException] {
- /* value field wrong type */
- createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
- )
- }
- assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
- "value attribute type must be a string or binary"))
-
- val ex3 = intercept[AnalysisException] {
- /* key field wrong type */
- createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
- )
- }
- assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
- "key attribute type must be a string or binary"))
-
- val ex4 = intercept[AnalysisException] {
- /* partition field wrong type */
- createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "value as partition", "value"
- )
- }
- assert(ex4.getMessage.toLowerCase(Locale.ROOT).contains(
- "partition attribute type must be an int"))
+ assertWrongSchema(topic, input, Seq("CAST('1' as INT) as topic", "value"),
+ "topic must be a(n) string")
+ assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as value"),
+ "value must be a(n) string or binary")
+ assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "CAST(value as INT) as key", "value"),
+ "key must be a(n) string or binary")
+ assertWrongSchema(topic, input, Seq(s"'$topic' as topic", "value as partition", "value"),
+ "partition must be a(n) int")
}
test("streaming - write to non-existing topic") {
@@ -310,21 +259,9 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.load()
val topic = newTopic()
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
- testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
- eventually(timeout(streamingTimeout)) {
- assert(writer.exception.isDefined)
- }
- throw writer.exception.get
- }
- } finally {
- writer.stop()
+ runAndVerifyException[StreamingQueryException](inputTopic, "job aborted") {
+ createKafkaWriter(input.toDF(), withTopic = Some(topic))()
}
- assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
@@ -339,21 +276,10 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
.option("subscribe", inputTopic)
.load()
- val ex = intercept[IllegalArgumentException] {
- createKafkaWriter(
- input.toDF(),
- withOptions = Map("kafka.key.serializer" -> "foo"))()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'key.serializer' is not supported"))
-
- val ex2 = intercept[IllegalArgumentException] {
- createKafkaWriter(
- input.toDF(),
- withOptions = Map("kafka.value.serializer" -> "foo"))()
- }
- assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'value.serializer' is not supported"))
+ assertWrongOption(inputTopic, input.toDF(), Map("kafka.key.serializer" -> "foo"),
+ "kafka option 'key.serializer' is not supported")
+ assertWrongOption(inputTopic, input.toDF(), Map("kafka.value.serializer" -> "foo"),
+ "kafka option 'value.serializer' is not supported")
}
test("generic - write big data with small producer buffer") {
@@ -422,4 +348,48 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
withOptions.foreach(opt => stream.option(opt._1, opt._2))
stream.start()
}
+
+ private def runAndVerifyException[T <: Exception : ClassTag](
+ inputTopic: String,
+ expectErrorMsg: String)(
+ writerFn: => StreamingQuery): Unit = {
+ var writer: StreamingQuery = null
+ val ex: Exception = try {
+ intercept[T] {
+ writer = writerFn
+ testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+ eventually(timeout(streamingTimeout)) {
+ assert(writer.exception.isDefined)
+ }
+ throw writer.exception.get
+ }
+ } finally {
+ if (writer != null) writer.stop()
+ }
+ val rootException = ex match {
+ case e: StreamingQueryException => e.getCause.getCause
+ case e => e
+ }
+ assert(rootException.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
+ }
+
+ private def assertWrongSchema(
+ inputTopic: String,
+ input: DataFrame,
+ selectExpr: Seq[String],
+ expectErrorMsg: String): Unit = {
+ runAndVerifyException[AnalysisException](inputTopic, expectErrorMsg) {
+ createKafkaWriter(input)(withSelectExpr = selectExpr: _*)
+ }
+ }
+
+ private def assertWrongOption(
+ inputTopic: String,
+ input: DataFrame,
+ options: Map[String, String],
+ expectErrorMsg: String): Unit = {
+ runAndVerifyException[IllegalArgumentException](inputTopic, expectErrorMsg) {
+ createKafkaWriter(input, withOptions = options)()
+ }
+ }
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index aacb10f5197b..1705d76de758 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -211,38 +211,10 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
val topic = newTopic()
testUtils.createTopic(topic)
- /* No topic field or topic option */
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = "value as key", "value"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage
- .toLowerCase(Locale.ROOT)
- .contains("topic option required when no 'topic' attribute is present"))
-
- try {
- /* No value field */
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "value as key"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "required attribute 'value' not found"))
+ assertWrongSchema(input, Seq("value as key", "value"),
+ "topic option required when no 'topic' attribute is present")
+ assertWrongSchema(input, Seq(s"'$topic' as topic", "value as key"),
+ "required attribute 'value' not found")
}
test("streaming - write data with valid schema but wrong types") {
@@ -250,109 +222,31 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
val topic = newTopic()
testUtils.createTopic(topic)
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- /* topic field wrong type */
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = s"CAST('1' as INT) as topic", "value"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
-
- try {
- /* value field wrong type */
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "value attribute type must be a string or binary"))
-
- try {
- ex = intercept[StreamingQueryException] {
- /* key field wrong type */
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "key attribute type must be a string or binary"))
-
- try {
- ex = intercept[StreamingQueryException] {
- /* partition field wrong type */
- writer = createKafkaWriter(input.toDF())(
- withSelectExpr = s"'$topic' as topic", "value", "value as partition"
- )
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
- }
- assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
- "partition attribute type must be an int"))
+ assertWrongSchema(input, Seq("CAST('1' as INT) as topic", "value"),
+ "topic must be a(n) string")
+ assertWrongSchema(input, Seq(s"'$topic' as topic", "CAST(value as INT) as value"),
+ "value must be a(n) string or binary")
+ assertWrongSchema(input, Seq(s"'$topic' as topic", "CAST(value as INT) as key", "value"),
+ "key must be a(n) string or binary")
+ assertWrongSchema(input, Seq(s"'$topic' as topic", "value", "value as partition"),
+ "partition must be a(n) int")
}
test("streaming - write to non-existing topic") {
val input = MemoryStream[String]
- val topic = newTopic()
- var writer: StreamingQuery = null
- var ex: Exception = null
- try {
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
- input.addData("1", "2", "3", "4", "5")
- writer.processAllAvailable()
- }
- } finally {
- writer.stop()
+ runAndVerifyStreamingQueryException(input, "job aborted") {
+ createKafkaWriter(input.toDF(), withTopic = Some(newTopic()))()
}
- assert(ex.getCause.getCause.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
}
test("streaming - exception on config serializer") {
val input = MemoryStream[String]
- var writer: StreamingQuery = null
- var ex: Exception = null
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(
- input.toDF(),
- withOptions = Map("kafka.key.serializer" -> "foo"))()
- input.addData("1")
- writer.processAllAvailable()
- }
- assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'key.serializer' is not supported"))
-
- ex = intercept[StreamingQueryException] {
- writer = createKafkaWriter(
- input.toDF(),
- withOptions = Map("kafka.value.serializer" -> "foo"))()
- input.addData("1")
- writer.processAllAvailable()
- }
- assert(ex.getCause.getMessage.toLowerCase(Locale.ROOT).contains(
- "kafka option 'value.serializer' is not supported"))
+
+ assertWrongOption(input, Map("kafka.key.serializer" -> "foo"),
+ "kafka option 'key.serializer' is not supported")
+ assertWrongOption(input, Map("kafka.value.serializer" -> "foo"),
+ "kafka option 'value.serializer' is not supported")
}
private def createKafkaWriter(
@@ -379,6 +273,41 @@ class KafkaSinkStreamingSuite extends KafkaSinkSuiteBase with StreamTest {
}
stream.start()
}
+
+ private def runAndVerifyStreamingQueryException(
+ input: MemoryStream[String],
+ expectErrorMsg: String)(
+ writerFn: => StreamingQuery): Unit = {
+ var writer: StreamingQuery = null
+ val ex: Exception = try {
+ intercept[StreamingQueryException] {
+ writer = writerFn
+ input.addData("1", "2", "3", "4", "5")
+ writer.processAllAvailable()
+ }
+ } finally {
+ if (writer != null) writer.stop()
+ }
+ assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(expectErrorMsg))
+ }
+
+ private def assertWrongSchema(
+ input: MemoryStream[String],
+ selectExpr: Seq[String],
+ expectErrorMsg: String): Unit = {
+ runAndVerifyStreamingQueryException(input, expectErrorMsg) {
+ createKafkaWriter(input.toDF())(withSelectExpr = selectExpr: _*)
+ }
+ }
+
+ private def assertWrongOption(
+ input: MemoryStream[String],
+ options: Map[String, String],
+ expectErrorMsg: String): Unit = {
+ runAndVerifyStreamingQueryException(input, expectErrorMsg) {
+ createKafkaWriter(input.toDF(), withOptions = options)()
+ }
+ }
}
abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 88d6d0eea536..a449a8bb7213 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -237,9 +237,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
- }.map { offsetRange =>
+ }.toSeq.sortBy(-_.count()).map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
- s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
+ s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}\t" +
+ s"count: ${offsetRange.count()}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
diff --git a/external/spark-ganglia-lgpl/pom.xml b/external/spark-ganglia-lgpl/pom.xml
index a23d255f9187..db64b201abc2 100644
--- a/external/spark-ganglia-lgpl/pom.xml
+++ b/external/spark-ganglia-lgpl/pom.xml
@@ -39,10 +39,10 @@