Skip to content

Commit 0b9dc3a

Browse files
committed
merge with master to fix conflict
2 parents 1145e52 + 2e62560 commit 0b9dc3a

File tree

42 files changed

+669
-354
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+669
-354
lines changed

R/pkg/R/install.R

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
#' }
5555
#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
5656
#' and force re-install Spark (in case the local directory or file is corrupted)
57-
#' @return \code{install.spark} returns the local directory where Spark is found or installed
57+
#' @return the (invisible) local directory where Spark is found or installed
5858
#' @rdname install.spark
5959
#' @name install.spark
6060
#' @aliases install.spark
@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
115115
} else {
116116
if (releaseUrl != "") {
117117
message("Downloading from alternate URL:\n- ", releaseUrl)
118-
downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl))
118+
success <- downloadUrl(releaseUrl, packageLocalPath)
119+
if (!success) {
120+
unlink(packageLocalPath)
121+
stop(paste0("Fetch failed from ", releaseUrl))
122+
}
119123
} else {
120124
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
121125
}
122126
}
123127

124128
message(sprintf("Installing to %s", localDir))
125-
untar(tarfile = packageLocalPath, exdir = localDir)
126-
if (!tarExists || overwrite) {
129+
# There are two ways untar can fail - untar could stop() on errors like incomplete block on file
130+
# or, tar command can return failure code
131+
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
132+
error = function(e) {
133+
message(e)
134+
message()
135+
FALSE
136+
},
137+
warning = function(w) {
138+
# Treat warning as error, add an empty line with message()
139+
message(w)
140+
message()
141+
FALSE
142+
})
143+
if (!tarExists || overwrite || !success) {
127144
unlink(packageLocalPath)
128145
}
146+
if (!success) stop("Extract archive failed.")
129147
message("DONE.")
130148
Sys.setenv(SPARK_HOME = packageLocalDir)
131149
message(paste("SPARK_HOME set to", packageLocalDir))
@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
135153
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
136154
# step 1: use user-provided url
137155
if (!is.null(mirrorUrl)) {
138-
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
139-
message(msg)
156+
message("Use user-provided mirror site: ", mirrorUrl)
140157
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
141158
packageName, packageLocalPath)
142159
if (success) {
@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
156173
packageName, packageLocalPath)
157174
if (success) return()
158175
} else {
159-
message("Unable to find preferred mirror site.")
176+
message("Unable to download from preferred mirror site: ", mirrorUrl)
160177
}
161178

162179
# step 3: use backup option
@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
165182
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
166183
packageName, packageLocalPath)
167184
if (success) {
168-
return(packageLocalPath)
185+
return()
169186
} else {
187+
# remove any partially downloaded file
188+
unlink(packageLocalPath)
189+
message("Unable to download from default mirror site: ", mirrorUrl)
170190
msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
171191
"Please check network connection, Hadoop version,",
172192
"or provide other mirror sites."),
@@ -201,14 +221,20 @@ directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
201221
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
202222
packageRemotePath)
203223
message(msg)
204-
downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl))
224+
downloadUrl(packageRemotePath, packageLocalPath)
205225
}
206226

207-
downloadUrl <- function(remotePath, localPath, errorMessage) {
227+
downloadUrl <- function(remotePath, localPath) {
208228
isFail <- tryCatch(download.file(remotePath, localPath),
209229
error = function(e) {
210-
message(errorMessage)
211-
print(e)
230+
message(e)
231+
message()
232+
TRUE
233+
},
234+
warning = function(w) {
235+
# Treat warning as error, add an empty line with message()
236+
message(w)
237+
message()
212238
TRUE
213239
})
214240
!isFail
@@ -234,10 +260,9 @@ sparkCachePath <- function() {
234260
if (.Platform$OS.type == "windows") {
235261
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
236262
if (is.na(winAppPath)) {
237-
msg <- paste("%LOCALAPPDATA% not found.",
263+
stop(paste("%LOCALAPPDATA% not found.",
238264
"Please define the environment variable",
239-
"or restart and enter an installation path in localDir.")
240-
stop(msg)
265+
"or restart and enter an installation path in localDir."))
241266
} else {
242267
path <- file.path(winAppPath, "Apache", "Spark", "Cache")
243268
}

core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder {
4141
* The thread variable for the name of the current file being read. This is used by
4242
* the InputFileName function in Spark SQL.
4343
*/
44-
private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] {
45-
override protected def initialValue(): FileBlock = new FileBlock
46-
}
44+
private[this] val inputBlock: InheritableThreadLocal[FileBlock] =
45+
new InheritableThreadLocal[FileBlock] {
46+
override protected def initialValue(): FileBlock = new FileBlock
47+
}
4748

4849
/**
4950
* Returns the holding file name or empty string if it is unknown.

core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark._
2323
import org.apache.spark.internal.Logging
2424
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
25+
import org.apache.spark.util.{RpcUtils, ThreadUtils}
2526

2627
private sealed trait OutputCommitCoordinationMessage extends Serializable
2728

@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
8889
val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
8990
coordinatorRef match {
9091
case Some(endpointRef) =>
91-
endpointRef.askWithRetry[Boolean](msg)
92+
ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
93+
RpcUtils.askRpcTimeout(conf).duration)
9294
case None =>
9395
logError(
9496
"canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
165167
authorizedCommitters(partition) = attemptNumber
166168
true
167169
case existingCommitter =>
168-
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
169-
s"partition=$partition; existingCommitter = $existingCommitter")
170-
false
170+
// Coordinator should be idempotent when receiving AskPermissionToCommit.
171+
if (existingCommitter == attemptNumber) {
172+
logWarning(s"Authorizing duplicate request to commit for " +
173+
s"attemptNumber=$attemptNumber to commit for stage=$stage," +
174+
s" partition=$partition; existingCommitter = $existingCommitter." +
175+
s" This can indicate dropped network traffic.")
176+
true
177+
} else {
178+
logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " +
179+
s"partition=$partition; existingCommitter = $existingCommitter")
180+
false
181+
}
171182
}
172183
case None =>
173184
logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" +

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
189189
assert(
190190
!outputCommitCoordinator.canCommit(stage, partition, nonAuthorizedCommitter + 3))
191191
}
192+
193+
test("Duplicate calls to canCommit from the authorized committer gets idempotent responses.") {
194+
val rdd = sc.parallelize(Seq(1), 1)
195+
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
196+
0 until rdd.partitions.size)
197+
}
192198
}
193199

194200
/**
@@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: String) {
221227
if (ctx.attemptNumber == 0) failingOutputCommitter else successfulOutputCommitter)
222228
}
223229

230+
// Receiver should be idempotent for AskPermissionToCommitOutput
231+
def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
232+
val ctx = TaskContext.get()
233+
val canCommit1 = SparkEnv.get.outputCommitCoordinator
234+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
235+
val canCommit2 = SparkEnv.get.outputCommitCoordinator
236+
.canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
237+
assert(canCommit1 && canCommit2)
238+
}
239+
224240
private def runCommitWithProvidedCommitter(
225241
ctx: TaskContext,
226242
iter: Iterator[Int],

dev/create-release/known_translations

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ anabranch - Bill Chambers
177177
ashangit - Nicolas Fraison
178178
avulanov - Alexander Ulanov
179179
biglobster - Liang Ke
180-
cenyuhai - Cen Yu Hai
180+
cenyuhai - Yuhai Cen
181181
codlife - Jianfei Wang
182182
david-weiluo-ren - Weiluo (David) Ren
183183
dding3 - Ding Ding
@@ -198,7 +198,8 @@ petermaxlee - Peter Lee
198198
phalodi - Sandeep Purohit
199199
pkch - pkch
200200
priyankagargnitk - Priyanka Garg
201-
sharkdtu - Sharkd Tu
201+
sharkdtu - Xiaogang Tu
202202
shenh062326 - Shen Hong
203203
aokolnychyi - Anton Okolnychyi
204204
linbojin - Linbo Jin
205+
lw-lin - Liwei Lin

0 commit comments

Comments
 (0)