Skip to content
Merged

sync #20

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3726aab
[SPARK-32177][WEBUI] Remove the weird line from near the Spark logo o…
sarutak Jul 6, 2020
0e33b5e
[SPARK-32178][TESTS] Disable test-dependencies.sh from Jenkins jobs
dongjoon-hyun Jul 6, 2020
dea7bc4
[SPARK-32100][CORE][TESTS][FOLLOWUP] Reduce the required test resources
dongjoon-hyun Jul 6, 2020
59a7087
[SPARK-32145][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] ThriftCLIService.Get…
yaooqinn Jul 6, 2020
1d18096
[SPARK-32162][PYTHON][TESTS] Improve error message of Pandas grouped …
BryanCutler Jul 6, 2020
3fe3365
[SPARK-32172][CORE] Use createDirectory instead of mkdir
sidedoorleftroad Jul 6, 2020
5d296ed
[SPARK-32167][SQL] Fix GetArrayStructFields to respect inner field's …
cloud-fan Jul 7, 2020
2e23da2
[SPARK-31975][SQL] Show AnalysisException when WindowFunction is used…
ulysses-you Jul 7, 2020
75d3428
[SPARK-32209][SQL] Re-use GetTimestamp in ParseToDate
MaxGekk Jul 7, 2020
8d5c094
[SPARK-32164][ML] GeneralizedLinearRegressionSummary optimization
zhengruifeng Jul 7, 2020
4bbc343
[SPARK-31317][SQL] Add withField method to Column
fqaiser94 Jul 7, 2020
eb8eda7
[SPARK-32211][SQL] Pin mariadb-plugin-gssapi-server version to fix Ma…
gaborgsomogyi Jul 7, 2020
90b9099
[SPARK-32163][SQL] Nested pruning should work even with cosmetic vari…
viirya Jul 7, 2020
1261fac
[SPARK-31710][SQL][FOLLOWUP] Allow cast numeric to timestamp by default
MaxGekk Jul 7, 2020
8b0a54e
[SPARK-32057][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] ExecuteStatement: ca…
alismess-db Jul 8, 2020
3659611
[SPARK-32124][CORE][FOLLOW-UP] Use the invalid value Int.MinValue to …
xuanyuanking Jul 8, 2020
b5297c4
[SPARK-20680][SQL] Spark-sql do not support for creating table with v…
LantaoJin Jul 8, 2020
65286ae
[SPARK-30703][SQL][FOLLOWUP] Update SqlBase.g4 invalid comment
ulysses-you Jul 8, 2020
371b35d
[SPARK-32214][SQL] The type conversion function generated in makeFrom…
sarutak Jul 8, 2020
8e7fc04
[SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryS…
zhli1142015 Jul 8, 2020
f60b3b7
[MINOR][INFRA][R] Show the installed packages in R in a prettier way
HyukjinKwon Jul 8, 2020
17997a5
[SPARK-32233][TESTS] Disable SBT unidoc generation testing in Jenkins
dongjoon-hyun Jul 8, 2020
3bb1ac5
[SPARK-32168][SQL] Fix hidden partitioning correctness bug in SQL ove…
rdblue Jul 8, 2020
d1d16d1
[SPARK-31723][CORE][TEST] Reenable one test case in HistoryServerSuite
Jul 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ install:
# Install maven and dependencies
- ps: .\dev\appveyor-install-dependencies.ps1
# Required package for R unit tests
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival'); packageVersion('arrow')"
- cmd: Rscript -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
- cmd: Rscript -e "pkg_list <- as.data.frame(installed.packages()[,c(1, 3:4)]); pkg_list[is.na(pkg_list$Priority), 1:2, drop = FALSE]"

build_script:
# '-Djna.nosys=true' is required to avoid kernel32.dll load failure.
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ a:not([href]):hover {
padding: 0;
}

.navbar-brand a:hover {
text-decoration: none;
}

.navbar .navbar-nav .nav-link {
height: 50px;
padding: 10px 15px 10px;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ case class FetchFailed(
extends TaskFailedReason {
override def toErrorString: String = {
val bmAddressString = if (bmAddress == null) "null" else bmAddress.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndex, " +
val mapIndexString = if (mapIndex == Int.MinValue) "Unknown" else mapIndex.toString
s"FetchFailed($bmAddressString, shuffleId=$shuffleId, mapIndex=$mapIndexString, " +
s"mapId=$mapId, reduceId=$reduceId, message=\n$message\n)"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,29 @@ private class HistoryServerDiskManager(

// Go through the recorded store directories and remove any that may have been removed by
// external code.
val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
!new File(info.path).exists()
}.toSeq
val (existences, orphans) = listing
.view(classOf[ApplicationStoreInfo])
.asScala
.toSeq
.partition { info =>
new File(info.path).exists()
}

orphans.foreach { info =>
listing.delete(info.getClass(), info.path)
}

// Reading level db would trigger table file compaction, then it may cause size of level db
// directory changed. When service restarts, "currentUsage" is calculated from real directory
// size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
// sum of "ApplicationStoreInfo.size".
existences.foreach { info =>
val fileSize = sizeOf(new File(info.path))
if (fileSize != info.size) {
listing.write(info.copy(size = fileSize))
}
}

logInfo("Initialized disk manager: " +
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
s"max usage = ${Utils.bytesToString(maxUsage)}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.{File, IOException}
import java.nio.file.Files
import java.util.UUID

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -69,8 +70,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
if (!newDir.exists()) {
Files.createDirectory(newDir.toPath)
}
subDirs(dirId)(subDirId) = newDir
newDir
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1078,8 +1078,12 @@ private[spark] object JsonProtocol {
val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
val shuffleId = (json \ "Shuffle ID").extract[Int]
val mapId = (json \ "Map ID").extract[Long]
val mapIndex = (json \ "Map Index") match {
case JNothing => 0
val mapIndex = json \ "Map Index" match {
case JNothing =>
// Note, we use the invalid value Int.MinValue here to fill the map index for backward
// compatibility. Otherwise, the fetch failed event will be dropped when the history
// server loads the event log written by the Spark version before 3.0.
Int.MinValue
case x => x.extract[Int]
}
val reduceId = (json \ "Reduce ID").extract[Int]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
assert(manager.approximateSize(50L, true) > 50L)
}

test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
val manager = mockManager()
val leaseA = manager.lease(2)
doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
val dstA = leaseA.commit("app1", None)
assert(manager.free() === 0)
assert(manager.committed() === 3)
// Listing store tracks dstA now.
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3)

// Simulate: service restarts, new disk manager (manager1) is initialized.
val manager1 = mockManager()
// Simulate: event KVstore compaction before restart, directory size reduces.
doReturn(2L).when(manager1).sizeOf(meq(dstA))
doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
manager1.initialize()
// "ApplicationStoreInfo.size" is updated for dstA.
assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2)
assert(manager1.free() === 1)
// If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException"
// would be thrown.
val leaseB = manager1.lease(2)
assert(manager1.free() === 1)
doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
val dstB = leaseB.commit("app2", None)
assert(manager1.committed() === 2)
// Listing store tracks dstB only, dstA is evicted by "makeRoom()".
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2)

val manager2 = mockManager()
// Simulate: cache entities are written after replaying, directory size increases.
doReturn(3L).when(manager2).sizeOf(meq(dstB))
doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
manager2.initialize()
// "ApplicationStoreInfo.size" is updated for dstB.
assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3)
assert(manager2.free() === 0)
val leaseC = manager2.lease(2)
doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
val dstC = leaseC.commit("app3", None)
assert(manager2.free() === 1)
assert(manager2.committed() === 2)
// Listing store tracks dstC only, dstB is evicted by "makeRoom()".
assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
all (directSiteRelativeLinks) should not startWith (knoxBaseUrl)
}

// TODO (SPARK-31723): re-enable it
ignore("static relative links are prefixed with uiRoot (spark.ui.proxyBase)") {
test("static relative links are prefixed with uiRoot (spark.ui.proxyBase)") {
val uiRoot = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("/testwebproxybase")
val page = new HistoryPage(server)
val request = mock[HttpServletRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
private val conf = new org.apache.spark.SparkConf()
.setAppName(getClass.getName)
.set(SPARK_MASTER, "local-cluster[20,1,512]")
.set(SPARK_MASTER, "local-cluster[5,1,512]")
.set(EXECUTOR_MEMORY, "512m")
.set(DYN_ALLOCATION_ENABLED, true)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 20)
.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)
.set(WORKER_DECOMMISSION_ENABLED, true)

test("Worker decommission and executor idle timeout") {
sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
val rdd1 = sc.parallelize(1 to 10, 2)
val rdd2 = rdd1.map(x => (1, x))
val rdd3 = rdd2.reduceByKey(_ + _)
Expand All @@ -54,10 +54,10 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte
}
}

test("Decommission 19 executors from 20 executors in total") {
test("Decommission 4 executors from 5 executors in total") {
sc = new SparkContext(conf)
withSpark(sc) { sc =>
TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
TestUtils.waitUntilExecutorsUp(sc, 5, 60000)
val rdd1 = sc.parallelize(1 to 100000, 200)
val rdd2 = rdd1.map(x => (x % 100, x))
val rdd3 = rdd2.reduceByKey(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
.removeField({ _._1 == "Map Index" })
val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
0, 19, "ignored")
Int.MinValue, 19, "ignored")
assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
}

Expand Down
5 changes: 3 additions & 2 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ def build_spark_assembly_sbt(extra_profiles, checkstyle=False):
if checkstyle:
run_java_style_checks(build_profiles)

build_spark_unidoc_sbt(extra_profiles)
if not os.environ.get("AMPLAB_JENKINS"):
build_spark_unidoc_sbt(extra_profiles)


def build_apache_spark(build_tool, extra_profiles):
Expand Down Expand Up @@ -648,7 +649,7 @@ def main():
# if "DOCS" in changed_modules and test_env == "amplab_jenkins":
# build_spark_documentation()

if any(m.should_run_build_tests for m in test_modules):
if any(m.should_run_build_tests for m in test_modules) and test_env != "amplab_jenkins":
run_build_tests()

# spark build
Expand Down
2 changes: 0 additions & 2 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ license: |

- In Spark 3.1, `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` will fail if the specified datetime pattern is invalid. In Spark 3.0 or earlier, they result `NULL`.

- In Spark 3.1, casting numeric to timestamp will be forbidden by default. It's strongly recommended to use dedicated functions: TIMESTAMP_SECONDS, TIMESTAMP_MILLIS and TIMESTAMP_MICROS. Or you can set `spark.sql.legacy.allowCastNumericToTimestamp` to true to work around it. See more details in SPARK-31710.

## Upgrading from Spark SQL 3.0 to 3.0.1

- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

dpkg-divert --add /bin/systemctl && ln -sT /bin/true /bin/systemctl
apt update
apt install -y mariadb-plugin-gssapi-server
apt install -y mariadb-plugin-gssapi-server=1:10.4.12+maria~bionic
echo "gssapi_keytab_path=/docker-entrypoint-initdb.d/mariadb.keytab" >> /etc/mysql/mariadb.conf.d/auth_gssapi.cnf
echo "gssapi_principal_name=mariadb/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM" >> /etc/mysql/mariadb.conf.d/auth_gssapi.cnf
docker-entrypoint.sh mysqld
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,41 @@ class GeneralizedLinearRegressionSummary private[regression] (

private[regression] lazy val link: Link = familyLink.link

/**
* summary row containing:
* numInstances, weightSum, deviance, rss, weighted average of label - offset.
*/
private lazy val glrSummary = {
val devUDF = udf { (label: Double, pred: Double, weight: Double) =>
family.deviance(label, pred, weight)
}
val devCol = sum(devUDF(label, prediction, weight))

val rssCol = if (model.getFamily.toLowerCase(Locale.ROOT) != Binomial.name &&
model.getFamily.toLowerCase(Locale.ROOT) != Poisson.name) {
val rssUDF = udf { (label: Double, pred: Double, weight: Double) =>
(label - pred) * (label - pred) * weight / family.variance(pred)
}
sum(rssUDF(label, prediction, weight))
} else {
lit(Double.NaN)
}

val avgCol = if (model.getFitIntercept &&
(!model.hasOffsetCol || (model.hasOffsetCol && family == Gaussian && link == Identity))) {
sum((label - offset) * weight) / sum(weight)
} else {
lit(Double.NaN)
}

predictions
.select(count(label), sum(weight), devCol, rssCol, avgCol)
.head()
}

/** Number of instances in DataFrame predictions. */
@Since("2.2.0")
lazy val numInstances: Long = predictions.count()

lazy val numInstances: Long = glrSummary.getLong(0)

/**
* Name of features. If the name cannot be retrieved from attributes,
Expand Down Expand Up @@ -1335,9 +1366,7 @@ class GeneralizedLinearRegressionSummary private[regression] (
*/
if (!model.hasOffsetCol ||
(model.hasOffsetCol && family == Gaussian && link == Identity)) {
val agg = predictions.agg(sum(weight.multiply(
label.minus(offset))), sum(weight)).first()
link.link(agg.getDouble(0) / agg.getDouble(1))
link.link(glrSummary.getDouble(4))
} else {
// Create empty feature column and fit intercept only model using param setting from model
val featureNull = "feature_" + java.util.UUID.randomUUID.toString
Expand All @@ -1362,12 +1391,7 @@ class GeneralizedLinearRegressionSummary private[regression] (
* The deviance for the fitted model.
*/
@Since("2.0.0")
lazy val deviance: Double = {
predictions.select(label, prediction, weight).rdd.map {
case Row(label: Double, pred: Double, weight: Double) =>
family.deviance(label, pred, weight)
}.sum()
}
lazy val deviance: Double = glrSummary.getDouble(2)

/**
* The dispersion of the fitted model.
Expand All @@ -1381,14 +1405,14 @@ class GeneralizedLinearRegressionSummary private[regression] (
model.getFamily.toLowerCase(Locale.ROOT) == Poisson.name) {
1.0
} else {
val rss = pearsonResiduals.agg(sum(pow(col("pearsonResiduals"), 2.0))).first().getDouble(0)
val rss = glrSummary.getDouble(3)
rss / degreesOfFreedom
}

/** Akaike Information Criterion (AIC) for the fitted model. */
@Since("2.0.0")
lazy val aic: Double = {
val weightSum = predictions.select(weight).agg(sum(weight)).first().getDouble(0)
val weightSum = glrSummary.getDouble(1)
val t = predictions.select(
label, prediction, weight).rdd.map {
case Row(label: Double, pred: Double, weight: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ class LinearRegressionSummary private[regression] (
}

/** Number of instances in DataFrame predictions */
lazy val numInstances: Long = predictions.count()
lazy val numInstances: Long = metrics.count

/** Degrees of freedom */
@Since("2.2.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ class RegressionMetrics @Since("2.0.0") (
1 - SSerr / SStot
}
}

private[spark] def count: Long = summary.count
}
1 change: 1 addition & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ object SparkParallelTestGrouping {
"org.apache.spark.ml.classification.LogisticRegressionSuite",
"org.apache.spark.ml.classification.LinearSVCSuite",
"org.apache.spark.sql.SQLQueryTestSuite",
"org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperationSuite",
"org.apache.spark.sql.hive.thriftserver.ThriftServerQueryTestSuite",
"org.apache.spark.sql.hive.thriftserver.SparkSQLEnvSuite",
"org.apache.spark.sql.hive.thriftserver.ui.ThriftServerPageSuite",
Expand Down
Loading