Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ exportMethods("arrange",

exportClasses("Column")

exportMethods("%in%",
exportMethods("%<=>%",
"%in%",
"abs",
"acos",
"add_months",
Expand Down Expand Up @@ -291,6 +292,7 @@ exportMethods("%in%",
"nanvl",
"negate",
"next_day",
"not",
"ntile",
"otherwise",
"over",
Expand Down
55 changes: 53 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ operators <- list(
"+" = "plus", "-" = "minus", "*" = "multiply", "/" = "divide", "%%" = "mod",
"==" = "equalTo", ">" = "gt", "<" = "lt", "!=" = "notEqual", "<=" = "leq", ">=" = "geq",
# we can not override `&&` and `||`, so use `&` and `|` instead
"&" = "and", "|" = "or", #, "!" = "unary_$bang"
"^" = "pow"
"&" = "and", "|" = "or", "^" = "pow"
)
column_functions1 <- c("asc", "desc", "isNaN", "isNull", "isNotNull")
column_functions2 <- c("like", "rlike", "getField", "getItem", "contains")
Expand Down Expand Up @@ -302,3 +301,55 @@ setMethod("otherwise",
jc <- callJMethod(x@jc, "otherwise", value)
column(jc)
})

#' \%<=>\%
#'
#' Equality test that is safe for null values.
#'
#' Can be used, unlike standard equality operator, to perform null-safe joins.
#' Equivalent to Scala \code{Column.<=>} and \code{Column.eqNullSafe}.
#'
#' @param x a Column
#' @param value a value to compare
#' @rdname eq_null_safe
#' @name %<=>%
#' @aliases %<=>%,Column-method
#' @export
#' @examples
#' \dontrun{
#' df1 <- createDataFrame(data.frame(
#' x = c(1, NA, 3, NA), y = c(2, 6, 3, NA)
#' ))
#'
#' head(select(df1, df1$x == df1$y, df1$x %<=>% df1$y))
#'
#' df2 <- createDataFrame(data.frame(y = c(3, NA)))
#' count(join(df1, df2, df1$y == df2$y))
#'
#' count(join(df1, df2, df1$y %<=>% df2$y))
#' }
#' @note \%<=>\% since 2.3.0
setMethod("%<=>%",
signature(x = "Column", value = "ANY"),
function(x, value) {
value <- if (class(value) == "Column") { value@jc } else { value }
jc <- callJMethod(x@jc, "eqNullSafe", value)
column(jc)
})

#' !
#'
#' Inversion of boolean expression.
#'
#' @rdname not
#' @name not
#' @aliases !,Column-method
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(data.frame(x = c(-1, 0, 1)))
#'
#' head(select(df, !column("x") > 0))
#' }
#' @note ! since 2.3.0
setMethod("!", signature(x = "Column"), function(x) not(x))
31 changes: 31 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -3859,3 +3859,34 @@ setMethod("posexplode_outer",
jc <- callJStatic("org.apache.spark.sql.functions", "posexplode_outer", x@jc)
column(jc)
})

#' not
#'
#' Inversion of boolean expression.
#'
#' \code{not} and \code{!} cannot be applied directly to numerical column.
#' To achieve R-like truthiness column has to be casted to \code{BooleanType}.
#'
#' @param x Column to compute on
#' @rdname not
#' @name not
#' @aliases not,Column-method
#' @export
#' @examples \dontrun{
#' df <- createDataFrame(data.frame(
#' is_true = c(TRUE, FALSE, NA),
#' flag = c(1, 0, 1)
#' ))
#'
#' head(select(df, not(df$is_true)))
#'
#' # Explicit cast is required when working with numeric column
#' head(select(df, not(cast(df$flag, "boolean"))))
#' }
#' @note not since 2.3.0
setMethod("not",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "not", x@jc)
column(jc)
})
10 changes: 9 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,10 @@ setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") })
#' @export
setGeneric("over", function(x, window) { standardGeneric("over") })

#' @rdname eq_null_safe
#' @export
setGeneric("%<=>%", function(x, value) { standardGeneric("%<=>%") })

###################### WindowSpec Methods ##########################

#' @rdname partitionBy
Expand Down Expand Up @@ -1154,6 +1158,10 @@ setGeneric("nanvl", function(y, x) { standardGeneric("nanvl") })
#' @export
setGeneric("negate", function(x) { standardGeneric("negate") })

#' @rdname not
#' @export
setGeneric("not", function(x) { standardGeneric("not") })

#' @rdname next_day
#' @export
setGeneric("next_day", function(y, x) { standardGeneric("next_day") })
Expand Down Expand Up @@ -1510,7 +1518,7 @@ setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml")

#' @rdname awaitTermination
#' @export
setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
setGeneric("awaitTermination", function(x, timeout = NULL) { standardGeneric("awaitTermination") })

#' @rdname isActive
#' @export
Expand Down
14 changes: 10 additions & 4 deletions R/pkg/R/streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ setMethod("isActive",
#' immediately.
#'
#' @param x a StreamingQuery.
#' @param timeout time to wait in milliseconds
#' @return TRUE if query has terminated within the timeout period.
#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
#' is called or an error has occured.
#' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
#' specified.
#' @rdname awaitTermination
#' @name awaitTermination
#' @aliases awaitTermination,StreamingQuery-method
Expand All @@ -182,8 +184,12 @@ setMethod("isActive",
#' @note experimental
setMethod("awaitTermination",
signature(x = "StreamingQuery"),
function(x, timeout) {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
function(x, timeout = NULL) {
if (is.null(timeout)) {
invisible(handledCallJMethod(x@ssq, "awaitTermination"))
} else {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
}
})

#' stopQuery
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ test_that("Check masked functions", {
# Check that we are not masking any new function from base, stats, testthat unexpectedly
# NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it
# hard for users to use base R functions. Please check when in doubt.
namesOfMaskedCompletely <- c("cov", "filter", "sample")
namesOfMaskedCompletely <- c("cov", "filter", "sample", "not")
namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var",
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window", "as.data.frame", "union")
"summary", "transform", "drop", "window", "as.data.frame", "union", "not")
if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) {
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
}
Expand Down
20 changes: 20 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,8 @@ test_that("column operators", {
c3 <- (c + c2 - c2) * c2 %% c2
c4 <- (c > c2) & (c2 <= c3) | (c == c2) & (c2 != c3)
c5 <- c2 ^ c3 ^ c4
c6 <- c2 %<=>% c3
c7 <- !c6
})

test_that("column functions", {
Expand All @@ -1348,6 +1350,7 @@ test_that("column functions", {
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
c21 <- posexplode_outer(c) + explode_outer(c)
c22 <- not(c)

# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
Expand Down Expand Up @@ -1488,6 +1491,13 @@ test_that("column functions", {
lapply(
list(list(x = 1, y = -1, z = -2), list(x = 2, y = 3, z = 5)),
as.environment))

df <- as.DataFrame(data.frame(is_true = c(TRUE, FALSE, NA)))
expect_equal(
collect(select(df, alias(not(df$is_true), "is_false"))),
data.frame(is_false = c(FALSE, TRUE, NA))
)

})

test_that("column binary mathfunctions", {
Expand Down Expand Up @@ -1973,6 +1983,16 @@ test_that("filter() on a DataFrame", {
filtered6 <- where(df, df$age %in% c(19, 30))
expect_equal(count(filtered6), 2)

# test suites for %<=>%
dfNa <- read.json(jsonPathNa)
expect_equal(count(filter(dfNa, dfNa$age %<=>% 60)), 1)
expect_equal(count(filter(dfNa, !(dfNa$age %<=>% 60))), 5 - 1)
expect_equal(count(filter(dfNa, dfNa$age %<=>% NULL)), 3)
expect_equal(count(filter(dfNa, !(dfNa$age %<=>% NULL))), 5 - 3)
# match NA from two columns
expect_equal(count(filter(dfNa, dfNa$age %<=>% dfNa$height)), 2)
expect_equal(count(filter(dfNa, !(dfNa$age %<=>% dfNa$height))), 5 - 2)

# Test stats::filter is working
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
})
Expand Down
1 change: 1 addition & 0 deletions R/pkg/inst/tests/testthat/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ test_that("read.stream, write.stream, awaitTermination, stopQuery", {

stopQuery(q)
expect_true(awaitTermination(q, 1))
expect_error(awaitTermination(q), NA)
})

test_that("print from explain, lastProgress, status, isActive", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<td><span title="{{duration}}" class="durationClass">{{duration}}</span></td>
<td>{{sparkUser}}</td>
<td>{{lastUpdated}}</td>
<td><a href="{{uiroot}}/api/v1/applications/{{id}}/{{num}}/logs" class="btn btn-info btn-mini">Download</a></td>
<td><a href="{{log}}" class="btn btn-info btn-mini">Download</a></td>
{{/attempts}}
</tr>
{{/applications}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ $(document).ready(function() {
attempt["startTime"] = formatDate(attempt["startTime"]);
attempt["endTime"] = formatDate(attempt["endTime"]);
attempt["lastUpdated"] = formatDate(attempt["lastUpdated"]);
attempt["log"] = uiRoot + "/api/v1/applications/" + id + "/" +
(attempt.hasOwnProperty("attemptId") ? attempt["attemptId"] + "/" : "") + "logs";

var app_clone = {"id" : id, "name" : name, "num" : num, "attempts" : [attempt]};
array.push(app_clone);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager(
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
delta
} else {
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]

// Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var requestedTotalExecutors = 0

// Number of executors requested from the cluster manager that have not registered yet
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0
Expand Down Expand Up @@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* */
protected def reset(): Unit = {
val executors = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys
Expand Down Expand Up @@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")

val response = synchronized {
requestedTotalExecutors += numAdditionalExecutors
numPendingExecutors += numAdditionalExecutors
logDebug(s"Number of pending executors is now $numPendingExecutors")
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}

// Account for executors pending to be added or removed
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
doRequestTotalExecutors(requestedTotalExecutors)
}

defaultAskTimeout.awaitResult(response)
Expand Down Expand Up @@ -524,6 +538,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

val response = synchronized {
this.requestedTotalExecutors = numExecutors
this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount

Expand Down Expand Up @@ -589,8 +604,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
doRequestTotalExecutors(requestedTotalExecutors)
} else {
numPendingExecutors += knownExecutors.size
Future.successful(true)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
val xml = XML.loadString(s"""<span class="description-input">$desc</span>""")

// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span")
val allowedNodeLabels = Set("a", "span", "br")
val illegalNodes = xml \\ "_" filterNot { case node: Node =>
allowedNodeLabels.contains(node.label)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,13 @@ class StandaloneDynamicAllocationSuite
test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
assert(apps.head.getExecutorLimit === 2)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
Expand All @@ -380,12 +381,13 @@ class StandaloneDynamicAllocationSuite
test("the pending replacement executors should not be lost (SPARK-10515)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
assert(apps.head.getExecutorLimit === 2)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
Expand Down
Loading