Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions common/tags/src/test/java/org/apache/spark/tags/ChromeUITest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.tags;

import java.lang.annotation.*;

import org.scalatest.TagAnnotation;

@TagAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface ChromeUITest { }
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,20 @@ private class HistoryServerDiskManager(
* being used so that it's not evicted when running out of designated space.
*/
def openStore(appId: String, attemptId: Option[String]): Option[File] = {
var newSize: Long = 0
val storePath = active.synchronized {
val path = appStorePath(appId, attemptId)
if (path.isDirectory()) {
active(appId -> attemptId) = sizeOf(path)
newSize = sizeOf(path)
active(appId -> attemptId) = newSize
Some(path)
} else {
None
}
}

storePath.foreach { path =>
updateAccessTime(appId, attemptId)
updateApplicationStoreInfo(appId, attemptId, newSize)
}

storePath
Expand Down Expand Up @@ -238,10 +240,11 @@ private class HistoryServerDiskManager(
new File(appStoreDir, fileName)
}

private def updateAccessTime(appId: String, attemptId: Option[String]): Unit = {
private def updateApplicationStoreInfo(
appId: String, attemptId: Option[String], newSize: Long): Unit = {
val path = appStorePath(appId, attemptId)
val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId, attemptId,
sizeOf(path))
val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId,
attemptId, newSize)
listing.write(info)
}

Expand Down Expand Up @@ -297,7 +300,7 @@ private class HistoryServerDiskManager(
s"exceeded ($current > $max)")
}

updateAccessTime(appId, attemptId)
updateApplicationStoreInfo(appId, attemptId, newSize)

active.synchronized {
active(appId -> attemptId) = newSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
// Pass partitionId message in
val message: String = context.partitionId().toString
val messages: Array[String] = context.allGather(message)
messages.toList.iterator
Iterator.single(messages.toList)
}
// Take a sorted list of all the partitionId messages
val messages = rdd2.collect().head
// All the task partitionIds are shared
for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
val messages = rdd2.collect()
// All the task partitionIds are shared across all tasks
assert(messages.length === 4)
assert(messages.forall(_ == List("0", "1", "2", "3")))
}

test("throw exception if we attempt to synchronize with different blocking calls") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import org.openqa.selenium.WebDriver
import org.openqa.selenium.chrome.{ChromeDriver, ChromeOptions}

import org.apache.spark.tags.ChromeUITest

/**
* Selenium tests for the Spark Web UI with Chrome.
*/
@ChromeUITest
class ChromeUISeleniumSuite extends RealBrowserUISeleniumSuite("webdriver.chrome.driver") {

override var webDriver: WebDriver = _

override def beforeAll(): Unit = {
super.beforeAll()
val chromeOptions = new ChromeOptions
chromeOptions.addArguments("--headless", "--disable-gpu")
webDriver = new ChromeDriver(chromeOptions)
}

override def afterAll(): Unit = {
try {
if (webDriver != null) {
webDriver.quit()
}
} finally {
super.afterAll()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ui

import org.openqa.selenium.{By, WebDriver}
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.scalatestplus.selenium.WebBrowser

import org.apache.spark._
import org.apache.spark.LocalSparkContext.withSpark
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_KILL_ENABLED, UI_PORT}
import org.apache.spark.util.CallSite

/**
* Selenium tests for the Spark Web UI with real web browsers.
*/
abstract class RealBrowserUISeleniumSuite(val driverProp: String)
extends SparkFunSuite with WebBrowser with Matchers with BeforeAndAfterAll {

implicit var webDriver: WebDriver
private val driverPropPrefix = "spark.test."

override def beforeAll() {
super.beforeAll()
assume(
sys.props(driverPropPrefix + driverProp) !== null,
"System property " + driverPropPrefix + driverProp +
" should be set to the corresponding driver path.")
sys.props(driverProp) = sys.props(driverPropPrefix + driverProp)
}

override def afterAll(): Unit = {
sys.props.remove(driverProp)
super.afterAll()
}

test("SPARK-31534: text for tooltip should be escaped") {
withSpark(newSparkContext()) { sc =>
sc.setLocalProperty(CallSite.LONG_FORM, "collect at <console>:25")
sc.setLocalProperty(CallSite.SHORT_FORM, "collect at <console>:25")
sc.parallelize(1 to 10).collect

eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")

val jobDesc =
webDriver.findElement(By.cssSelector("div[class='application-timeline-content']"))
jobDesc.getAttribute("data-title") should include ("collect at &lt;console&gt;:25")

goToUi(sc, "/jobs/job/?id=0")
webDriver.get(sc.ui.get.webUrl.stripSuffix("/") + "/jobs/job/?id=0")
val stageDesc = webDriver.findElement(By.cssSelector("div[class='job-timeline-content']"))
stageDesc.getAttribute("data-title") should include ("collect at &lt;console&gt;:25")

// Open DAG Viz.
webDriver.findElement(By.id("job-dag-viz")).click()
val nodeDesc = webDriver.findElement(By.cssSelector("g[class='node_0 node']"))
nodeDesc.getAttribute("name") should include ("collect at &lt;console&gt;:25")
}
}
}

/**
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(
killEnabled: Boolean = true,
master: String = "local",
additionalConfs: Map[String, String] = Map.empty): SparkContext = {
val conf = new SparkConf()
.setMaster(master)
.setAppName("test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(UI_KILL_ENABLED, killEnabled)
.set(MEMORY_OFFHEAP_SIZE.key, "64m")
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
}

def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}

def goToUi(ui: SparkUI, path: String): Unit = {
go to (ui.webUrl.stripSuffix("/") + path)
}
}
27 changes: 0 additions & 27 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -773,33 +773,6 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}

test("SPARK-31534: text for tooltip should be escaped") {
withSpark(newSparkContext()) { sc =>
sc.setLocalProperty(CallSite.LONG_FORM, "collect at <console>:25")
sc.setLocalProperty(CallSite.SHORT_FORM, "collect at <console>:25")
sc.parallelize(1 to 10).collect

val driver = webDriver.asInstanceOf[HtmlUnitDriver]
driver.setJavascriptEnabled(true)

eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val jobDesc =
driver.findElement(By.cssSelector("div[class='application-timeline-content']"))
jobDesc.getAttribute("data-title") should include ("collect at &lt;console&gt;:25")

goToUi(sc, "/jobs/job/?id=0")
val stageDesc = driver.findElement(By.cssSelector("div[class='job-timeline-content']"))
stageDesc.getAttribute("data-title") should include ("collect at &lt;console&gt;:25")

// Open DAG Viz.
driver.findElement(By.id("job-dag-viz")).click()
val nodeDesc = driver.findElement(By.cssSelector("g[class='node_0 node']"))
nodeDesc.getAttribute("name") should include ("collect at &lt;console&gt;:25")
}
}
}

def goToUi(sc: SparkContext, path: String): Unit = {
goToUi(sc.ui.get, path)
}
Expand Down
5 changes: 5 additions & 0 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from sparktestsupport.toposort import toposort_flatten
import sparktestsupport.modules as modules

always_excluded_tags = [
"org.apache.spark.tags.ChromeUITest"
]

# -------------------------------------------------------------------------------------------------
# Functions for traversing module dependency graph
Expand Down Expand Up @@ -606,6 +609,8 @@ def main():
print("[info] Found the following changed modules:",
", ".join(x.name for x in changed_modules))

excluded_tags.extend(always_excluded_tags)

# setup environment variables
# note - the 'root' module doesn't collect environment variables for all modules. Because the
# environment variables should not be set if a module is not changed, even if running the 'root'
Expand Down
1 change: 1 addition & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -961,3 +961,4 @@ Below are the scenarios in which Hive and Spark generate different results:
* `SQRT(n)` If n < 0, Hive returns null, Spark SQL returns NaN.
* `ACOS(n)` If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN.
* `ASIN(n)` If n < -1 or n > 1, Hive returns null, Spark SQL returns NaN.
* `CAST(n AS TIMESTAMP)` If n is integral numbers, Hive treats n as milliseconds, Spark SQL treats n as seconds.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@
things breaking.
-->
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.test.webdriver.chrome.driver></spark.test.webdriver.chrome.driver>

<CodeCacheSize>1g</CodeCacheSize>
</properties>
Expand Down Expand Up @@ -2512,6 +2513,7 @@
<spark.ui.enabled>false</spark.ui.enabled>
<spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
<spark.unsafe.exceptionOnMemoryLeak>true</spark.unsafe.exceptionOnMemoryLeak>
<spark.test.webdriver.chrome.driver>${spark.test.webdriver.chrome.driver}</spark.test.webdriver.chrome.driver>
<!-- Needed by sql/hive tests. -->
<test.src.tables>__not_used__</test.src.tables>
</systemProperties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ object FunctionRegistry {
expression[MakeInterval]("make_interval"),
expression[DatePart]("date_part"),
expression[Extract]("extract"),
expression[SecondsToTimestamp]("timestamp_seconds"),
expression[MillisToTimestamp]("timestamp_millis"),
expression[MicrosToTimestamp]("timestamp_micros"),

// collection functions
expression[CreateArray]("array"),
Expand Down
Loading