Skip to content

Commit 8e39c16

Browse files
committed
code review feedback
1 parent 788ed41 commit 8e39c16

File tree

1 file changed

+54
-35
lines changed

1 file changed

+54
-35
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 54 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,12 @@ private[spark] object Utils extends Logging {
408408
}
409409

410410
/**
411-
* Download a file from `in` to `tempFile`, then move it to `destFile`, checking whether
412-
* `destFile` already exists, has the same contents as the downloaded file, and can be
413-
* overwritten.
411+
* Download `in` to `tempFile`, then move it to `destFile`.
412+
*
413+
* If `destFile` already exists:
414+
* - no-op if its contents equal those of `sourceFile`,
415+
* - throw an exception if `fileOverwrite` is false,
416+
* - attempt to overwrite it otherwise.
414417
*
415418
* @param url URL that `sourceFile` originated from, for logging purposes.
416419
* @param in InputStream to download.
@@ -419,23 +422,33 @@ private[spark] object Utils extends Logging {
419422
* @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
420423
* `sourceFile`
421424
*/
422-
private def downloadStreamAndMove(
423-
url: String,
424-
in: InputStream,
425-
tempFile: File,
426-
destFile: File,
427-
fileOverwrite: Boolean): Unit = {
428-
429-
val out = new FileOutputStream(tempFile)
430-
Utils.copyStream(in, out, closeStreams = true)
431-
copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
425+
private def downloadFile(
426+
url: String,
427+
in: InputStream,
428+
tempFile: File,
429+
destFile: File,
430+
fileOverwrite: Boolean): Unit = {
432431

432+
try {
433+
val out = new FileOutputStream(tempFile)
434+
Utils.copyStream(in, out, closeStreams = true)
435+
copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true)
436+
} finally {
437+
// Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
438+
// `destFile`.
439+
if (tempFile.exists()) {
440+
tempFile.delete()
441+
}
442+
}
433443
}
434444

435445
/**
436-
* Copy file from `sourceFile` to `destFile`, checking whether `destFile` already exists, has
437-
* the same contents as the downloaded file, and can be overwritten. Optionally removes
438-
* `sourceFile` by moving instead of copying.
446+
* Copy `sourceFile` to `destFile`.
447+
*
448+
* If `destFile` already exists:
449+
* - no-op if its contents equal those of `sourceFile`,
450+
* - throw an exception if `fileOverwrite` is false,
451+
* - attempt to overwrite it otherwise.
439452
*
440453
* @param url URL that `sourceFile` originated from, for logging purposes.
441454
* @param sourceFile File path to copy/move from.
@@ -446,20 +459,26 @@ private[spark] object Utils extends Logging {
446459
* `destFile`.
447460
*/
448461
private def copyFile(
449-
url: String,
450-
sourceFile: File,
451-
destFile: File,
452-
fileOverwrite: Boolean,
453-
removeSourceFile: Boolean = false): Unit = {
462+
url: String,
463+
sourceFile: File,
464+
destFile: File,
465+
fileOverwrite: Boolean,
466+
removeSourceFile: Boolean = false): Unit = {
454467

455-
var shouldCopy = true
456468
if (destFile.exists) {
457469
if (!Files.equal(sourceFile, destFile)) {
458470
if (fileOverwrite) {
459-
destFile.delete()
460471
logInfo(
461472
s"File $destFile exists and does not match contents of $url, replacing it with $url"
462473
)
474+
if (!destFile.delete()) {
475+
throw new SparkException(
476+
"Failed to delete %s while attempting to overwrite it with %s".format(
477+
destFile.getAbsolutePath,
478+
sourceFile.getAbsolutePath
479+
)
480+
)
481+
}
463482
} else {
464483
throw new SparkException(
465484
s"File $destFile exists and does not match contents of $url")
@@ -468,21 +487,21 @@ private[spark] object Utils extends Logging {
468487
// Do nothing if the file contents are the same, i.e. this file has been copied
469488
// previously.
470489
logInfo(
471-
s"${sourceFile.getAbsolutePath} has been previously copied to " +
490+
"%s has been previously copied to %s".format(
491+
sourceFile.getAbsolutePath,
472492
destFile.getAbsolutePath
493+
)
473494
)
474-
shouldCopy = false
495+
return
475496
}
476497
}
477498

478-
if (shouldCopy) {
479-
// The file does not exist in the target directory. Copy or move it there.
480-
if (removeSourceFile) {
481-
Files.move(sourceFile, destFile)
482-
} else {
483-
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
484-
Files.copy(sourceFile, destFile)
485-
}
499+
// The file does not exist in the target directory. Copy or move it there.
500+
if (removeSourceFile) {
501+
Files.move(sourceFile, destFile)
502+
} else {
503+
logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}")
504+
Files.copy(sourceFile, destFile)
486505
}
487506
}
488507

@@ -524,7 +543,7 @@ private[spark] object Utils extends Logging {
524543
uc.setReadTimeout(timeout)
525544
uc.connect()
526545
val in = uc.getInputStream()
527-
downloadStreamAndMove(url, in, tempFile, targetFile, fileOverwrite)
546+
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
528547
case "file" =>
529548
// In the case of a local file, copy the local file to the target directory.
530549
// Note the difference between uri vs url.
@@ -534,7 +553,7 @@ private[spark] object Utils extends Logging {
534553
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
535554
val fs = getHadoopFileSystem(uri, hadoopConf)
536555
val in = fs.open(new Path(uri))
537-
downloadStreamAndMove(url, in, tempFile, targetFile, fileOverwrite)
556+
downloadFile(url, in, tempFile, targetFile, fileOverwrite)
538557
}
539558
}
540559

0 commit comments

Comments
 (0)