Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
9 changes: 9 additions & 0 deletions .github/workflows/test_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
workflow: ${{ github.event.workflow_run.workflow_id }}
commit: ${{ github.event.workflow_run.head_commit.id }}
- name: Check if JUnit report XML files exist
run: |
if ls **/target/test-reports/*.xml > /dev/null 2>&1; then
echo '::set-output name=FILE_EXISTS::true'
else
echo '::set-output name=FILE_EXISTS::false'
fi
id: check-junit-file
- name: Publish test report
if: steps.check-junit-file.outputs.FILE_EXISTS == 'true'
uses: scacap/action-surefire-report@v1
with:
check_name: Report test results
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ exportMethods("%<=>%",
"variance",
"var_pop",
"var_samp",
"vector_to_array",
"weekofyear",
"when",
"window",
Expand Down
38 changes: 38 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,22 @@ NULL
#' head(tmp)}
NULL

#' ML functions for Column operations
#'
#' ML functions defined for \code{Column}.
#'
#' @param x Column to compute on.
#' @param ... additional argument(s).
#' @name column_ml_functions
#' @rdname column_ml_functions
#' @family ml functions
#' @examples
#' \dontrun{
#' df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
#' head(select(df, vector_to_array(df$features)))
#' }
NULL

#' @details
#' \code{lit}: A new Column is created to represent the literal value.
#' If the parameter is a Column, it is returned unchanged.
Expand Down Expand Up @@ -4458,3 +4474,25 @@ setMethod("timestamp_seconds",
)
column(jc)
})

#' @details
#' \code{vector_to_array} Converts a column of MLlib sparse/dense vectors into
#' a column of dense arrays.
#'
#' @param dtype The data type of the output array. Valid values: "float64" or "float32".
#'
#' @rdname column_ml_functions
#' @aliases vector_to_array vector_to_array,Column-method
#' @note vector_to_array since 3.1.0
setMethod("vector_to_array",
signature(x = "Column"),
function(x, dtype = c("float64", "float32")) {
dtype <- match.arg(dtype)
jc <- callJStatic(
"org.apache.spark.ml.functions",
"vector_to_array",
x@jc,
dtype
)
column(jc)
})
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,10 @@ setGeneric("var_pop", function(x) { standardGeneric("var_pop") })
#' @name NULL
setGeneric("var_samp", function(x) { standardGeneric("var_samp") })

#' @rdname column_ml_functions
#' @name NULL
setGeneric("vector_to_array", function(x, ...) { standardGeneric("vector_to_array") })

#' @rdname column_datetime_functions
#' @name NULL
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,8 @@ test_that("column functions", {
date_trunc("quarter", c) + current_date() + current_timestamp()
c25 <- overlay(c1, c2, c3, c3) + overlay(c1, c2, c3) + overlay(c1, c2, 1) +
overlay(c1, c2, 3, 4)
c26 <- timestamp_seconds(c1)
c26 <- timestamp_seconds(c1) + vector_to_array(c) +
vector_to_array(c, "float32") + vector_to_array(c, "float64")
c27 <- nth_value("x", 1L) + nth_value("y", 2, TRUE) +
nth_value(column("v"), 3) + nth_value(column("z"), 4L, FALSE)

Expand Down
1 change: 0 additions & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ def get_hive_profiles(hive_version):
"""

sbt_maven_hive_profiles = {
"hive1.2": ["-Phive-1.2"],
"hive2.3": ["-Phive-2.3"],
}

Expand Down
6 changes: 1 addition & 5 deletions dev/test-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ export LC_ALL=C
HADOOP_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkubernetes -Pyarn -Phive"
MVN="build/mvn"
HADOOP_HIVE_PROFILES=(
hadoop-2.7-hive-1.2
hadoop-2.7-hive-2.3
hadoop-3.2-hive-2.3
)
Expand Down Expand Up @@ -71,12 +70,9 @@ for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do
if [[ $HADOOP_HIVE_PROFILE == **hadoop-3.2-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-3.2
HIVE_PROFILE=hive-2.3
elif [[ $HADOOP_HIVE_PROFILE == **hadoop-2.7-hive-2.3** ]]; then
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-2.3
else
HADOOP_PROFILE=hadoop-2.7
HIVE_PROFILE=hive-1.2
HIVE_PROFILE=hive-2.3
fi
echo "Performing Maven install for $HADOOP_HIVE_PROFILE"
$MVN $HADOOP_MODULE_PROFILES -P$HADOOP_PROFILE -P$HIVE_PROFILE jar:jar jar:test-jar install:install clean -q
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ license: |

- In Spark 3.1, incomplete interval literals, e.g. `INTERVAL '1'`, `INTERVAL '1 DAY 2'` will fail with IllegalArgumentException. In Spark 3.0, they result `NULL`s.

- In Spark 3.1, we remove the built-in Hive 1.2. You need to migrate your custom SerDes to Hive 2.3. See [HIVE-15167](https://issues.apache.org/jira/browse/HIVE-15167) for more details.

## Upgrading from Spark SQL 3.0 to 3.0.1

- In Spark 3.0, JSON datasource and JSON function `schema_of_json` infer TimestampType from string values if they match to the pattern defined by the JSON option `timestampFormat`. Since version 3.0.1, the timestamp type inference is disabled by default. Set the JSON option `inferTimestamp` to `true` to enable such type inference.
Expand Down
25 changes: 0 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2970,13 +2970,9 @@
<sourceDirectories>
<directory>${basedir}/src/main/java</directory>
<directory>${basedir}/src/main/scala</directory>
<directory>${basedir}/v${hive.version.short}/src/main/java</directory>
<directory>${basedir}/v${hive.version.short}/src/main/scala</directory>
</sourceDirectories>
<testSourceDirectories>
<directory>${basedir}/src/test/java</directory>
<directory>${basedir}/v${hive.version.short}/src/test/java</directory>
<directory>${basedir}/v${hive.version.short}/src/test/scala</directory>
</testSourceDirectories>
<configLocation>dev/checkstyle.xml</configLocation>
<outputFile>${basedir}/target/checkstyle-output.xml</outputFile>
Expand Down Expand Up @@ -3148,27 +3144,6 @@
<!-- Default hadoop profile. Uses global properties. -->
</profile>

<profile>
<id>hive-1.2</id>
<properties>
<hive.group>org.spark-project.hive</hive.group>
<hive.classifier></hive.classifier>
<!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark2</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2</hive.version.short>
<hive.parquet.scope>${hive.deps.scope}</hive.parquet.scope>
<hive.storage.version>2.6.0</hive.storage.version>
<hive.storage.scope>provided</hive.storage.scope>
<hive.common.scope>provided</hive.common.scope>
<hive.llap.scope>provided</hive.llap.scope>
<hive.serde.scope>provided</hive.serde.scope>
<hive.shims.scope>provided</hive.shims.scope>
<orc.classifier>nohive</orc.classifier>
<datanucleus-core.version>3.2.10</datanucleus-core.version>
</properties>
</profile>

<profile>
<id>hive-2.3</id>
<!-- Default hive profile. Uses global properties. -->
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import types

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.rdd import RDD, RDDBarrier
from pyspark.files import SparkFiles
from pyspark.status import StatusTracker, SparkJobInfo, SparkStageInfo
Expand Down Expand Up @@ -113,6 +112,8 @@ def wrapper(self, *args, **kwargs):
return func(self, **kwargs)
return wrapper

# To avoid circular dependencies
from pyspark.context import SparkContext

# for back compatibility
from pyspark.sql import SQLContext, HiveContext, Row # noqa: F401
Expand Down
12 changes: 11 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from py4j.protocol import Py4JError
from py4j.java_gateway import is_instance_of

from pyspark import accumulators
from pyspark import accumulators, since
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast, BroadcastPickleRegistry
from pyspark.conf import SparkConf
Expand Down Expand Up @@ -956,6 +956,16 @@ def setCheckpointDir(self, dirName):
"""
self._jsc.sc().setCheckpointDir(dirName)

@since(3.1)
def getCheckpointDir(self):
"""
Return the directory where RDDs are checkpointed. Returns None if no
checkpoint directory has been set.
"""
if not self._jsc.sc().getCheckpointDir().isEmpty():
return self._jsc.sc().getCheckpointDir().get()
return None

def _getJavaStorageLevel(self, storageLevel):
"""
Returns a Java StorageLevel based on a pyspark.StorageLevel.
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class SparkContext:
def addFile(self, path: str, recursive: bool = ...) -> None: ...
def addPyFile(self, path: str) -> None: ...
def setCheckpointDir(self, dirName: str) -> None: ...
def getCheckpointDir(self) -> Optional[str]: ...
def setJobGroup(
self, groupId: str, description: str, interruptOnCancel: bool = ...
) -> None: ...
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_basic_checkpointing(self):

self.assertFalse(flatMappedRDD.isCheckpointed())
self.assertTrue(flatMappedRDD.getCheckpointFile() is None)
self.assertFalse(self.sc.getCheckpointDir() is None)

flatMappedRDD.checkpoint()
result = flatMappedRDD.collect()
Expand All @@ -51,6 +52,8 @@ def test_basic_checkpointing(self):
self.assertEqual(flatMappedRDD.collect(), result)
self.assertEqual("file:" + self.checkpointDir.name,
os.path.dirname(os.path.dirname(flatMappedRDD.getCheckpointFile())))
self.assertEqual(self.sc.getCheckpointDir(),
os.path.dirname(flatMappedRDD.getCheckpointFile()))

def test_checkpoint_and_restore(self):
parCollection = self.sc.parallelize([1, 2, 3, 4])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ private[spark] object KubernetesVolumeUtils {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_HOSTPATH_TYPE)
KubernetesHostPathVolumeConf(options(pathKey))

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
val storageClassKey =
s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_STORAGE_CLASS_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
verifyOptionKey(options, claimNameKey, KUBERNETES_VOLUMES_PVC_TYPE)
KubernetesPVCVolumeConf(
options(claimNameKey),
options.get(storageClassKey),
Expand All @@ -87,6 +89,8 @@ private[spark] object KubernetesVolumeUtils {
case KUBERNETES_VOLUMES_NFS_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
val serverKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SERVER_KEY"
verifyOptionKey(options, pathKey, KUBERNETES_VOLUMES_NFS_TYPE)
verifyOptionKey(options, serverKey, KUBERNETES_VOLUMES_NFS_TYPE)
KubernetesNFSVolumeConf(
options(pathKey),
options(serverKey))
Expand All @@ -95,4 +99,10 @@ private[spark] object KubernetesVolumeUtils {
throw new IllegalArgumentException(s"Kubernetes Volume type `$volumeType` is not supported")
}
}

private def verifyOptionKey(options: Map[String, String], key: String, msg: String): Unit = {
if (!options.isDefinedAt(key)) {
throw new NoSuchElementException(key + s" is required for $msg")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(e.getMessage.contains("hostPath.volumeName.options.path"))
}

test("SPARK-33063: Fails on missing option key in persistentVolumeClaim") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")

val e = intercept[NoSuchElementException] {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.")
}
assert(e.getMessage.contains("persistentVolumeClaim.volumeName.options.claimName"))
}

test("Parses read-only nfs volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.nfs.volumeName.mount.path", "/path")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class LocalityPlacementStrategySuite extends SparkFunSuite {
}
}

val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 256 * 1024)
thread.start()
thread.join()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1060,10 +1060,12 @@ trait ComplexTypeMergingExpression extends Expression {
s" The input types found are\n\t${inputTypesForMerging.mkString("\n\t")}")
}

override def dataType: DataType = {
private lazy val internalDataType: DataType = {
dataTypeCheck
inputTypesForMerging.reduceLeft(TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(_, _).get)
}

override def dataType: DataType = internalDataType
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,12 @@ case class ApproximatePercentile(
override def nullable: Boolean = true

// The result type is the same as the input type.
override def dataType: DataType = {
private lazy val internalDataType: DataType = {
if (returnPercentileArray) ArrayType(child.dataType, false) else child.dataType
}

override def dataType: DataType = internalDataType

override def prettyName: String =
getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("percentile_approx")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ case class MapEntries(child: Expression)

@transient private lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType]

override def dataType: DataType = {
private lazy val internalDataType: DataType = {
ArrayType(
StructType(
StructField("key", childDataType.keyType, false) ::
Expand All @@ -380,6 +380,8 @@ case class MapEntries(child: Expression)
false)
}

override def dataType: DataType = internalDataType

override protected def nullSafeEval(input: Any): Any = {
val childMap = input.asInstanceOf[MapData]
val keys = childMap.keyArray()
Expand Down Expand Up @@ -3504,13 +3506,16 @@ object ArrayUnion {
since = "2.4.0")
case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBinaryLike
with ComplexTypeMergingExpression {
override def dataType: DataType = {

private lazy val internalDataType: DataType = {
dataTypeCheck
ArrayType(elementType,
left.dataType.asInstanceOf[ArrayType].containsNull &&
right.dataType.asInstanceOf[ArrayType].containsNull)
}

override def dataType: DataType = internalDataType

@transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = {
if (TypeUtils.typeWithProperEquals(elementType)) {
(array1, array2) =>
Expand Down Expand Up @@ -3747,11 +3752,13 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina
case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryLike
with ComplexTypeMergingExpression {

override def dataType: DataType = {
private lazy val internalDataType: DataType = {
dataTypeCheck
left.dataType
}

override def dataType: DataType = internalDataType

@transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
if (TypeUtils.typeWithProperEquals(elementType)) {
(array1, array2) =>
Expand Down
Loading