@@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
385385 } finally {
386386 lock.release()
387387 }
388- if (targetFile.exists && ! Files .equal(cachedFile, targetFile)) {
389- if (conf.getBoolean(" spark.files.overwrite" , false )) {
390- targetFile.delete()
391- logInfo((s " File $targetFile exists and does not match contents of $url, " +
392- s " replacing it with $url" ))
393- } else {
394- throw new SparkException (s " File $targetFile exists and does not match contents of $url" )
395- }
396- }
397- Files .copy(cachedFile, targetFile)
388+ copyFile(
389+ url,
390+ cachedFile,
391+ targetFile,
392+ conf.getBoolean(" spark.files.overwrite" , false )
393+ )
398394 } else {
399395 doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
400396 }
@@ -411,6 +407,104 @@ private[spark] object Utils extends Logging {
411407 FileUtil .chmod(targetFile.getAbsolutePath, " a+x" )
412408 }
413409
410+ /**
411+ * Download `in` to `tempFile`, then move it to `destFile`.
412+ *
413+ * If `destFile` already exists:
414+ * - no-op if its contents equal those of `sourceFile`,
415+ * - throw an exception if `fileOverwrite` is false,
416+ * - attempt to overwrite it otherwise.
417+ *
418+ * @param url URL that `sourceFile` originated from, for logging purposes.
419+ * @param in InputStream to download.
420+ * @param tempFile File path to download `in` to.
421+ * @param destFile File path to move `tempFile` to.
422+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
423+ * `sourceFile`
424+ */
425+ private def downloadFile (
426+ url : String ,
427+ in : InputStream ,
428+ tempFile : File ,
429+ destFile : File ,
430+ fileOverwrite : Boolean ): Unit = {
431+
432+ try {
433+ val out = new FileOutputStream (tempFile)
434+ Utils .copyStream(in, out, closeStreams = true )
435+ copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true )
436+ } finally {
437+ // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to
438+ // `destFile`.
439+ if (tempFile.exists()) {
440+ tempFile.delete()
441+ }
442+ }
443+ }
444+
445+ /**
446+ * Copy `sourceFile` to `destFile`.
447+ *
448+ * If `destFile` already exists:
449+ * - no-op if its contents equal those of `sourceFile`,
450+ * - throw an exception if `fileOverwrite` is false,
451+ * - attempt to overwrite it otherwise.
452+ *
453+ * @param url URL that `sourceFile` originated from, for logging purposes.
454+ * @param sourceFile File path to copy/move from.
455+ * @param destFile File path to copy/move to.
456+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
457+ * `sourceFile`
458+ * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
459+ * `destFile`.
460+ */
461+ private def copyFile (
462+ url : String ,
463+ sourceFile : File ,
464+ destFile : File ,
465+ fileOverwrite : Boolean ,
466+ removeSourceFile : Boolean = false ): Unit = {
467+
468+ if (destFile.exists) {
469+ if (! Files .equal(sourceFile, destFile)) {
470+ if (fileOverwrite) {
471+ logInfo(
472+ s " File $destFile exists and does not match contents of $url, replacing it with $url"
473+ )
474+ if (! destFile.delete()) {
475+ throw new SparkException (
476+ " Failed to delete %s while attempting to overwrite it with %s" .format(
477+ destFile.getAbsolutePath,
478+ sourceFile.getAbsolutePath
479+ )
480+ )
481+ }
482+ } else {
483+ throw new SparkException (
484+ s " File $destFile exists and does not match contents of $url" )
485+ }
486+ } else {
487+ // Do nothing if the file contents are the same, i.e. this file has been copied
488+ // previously.
489+ logInfo(
490+ " %s has been previously copied to %s" .format(
491+ sourceFile.getAbsolutePath,
492+ destFile.getAbsolutePath
493+ )
494+ )
495+ return
496+ }
497+ }
498+
499+ // The file does not exist in the target directory. Copy or move it there.
500+ if (removeSourceFile) {
501+ Files .move(sourceFile, destFile)
502+ } else {
503+ logInfo(s " Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}" )
504+ Files .copy(sourceFile, destFile)
505+ }
506+ }
507+
414508 /**
415509 * Download a file to target directory. Supports fetching the file in a variety of ways,
416510 * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
@@ -449,67 +543,17 @@ private[spark] object Utils extends Logging {
449543 uc.setReadTimeout(timeout)
450544 uc.connect()
451545 val in = uc.getInputStream()
452- val out = new FileOutputStream (tempFile)
453- Utils .copyStream(in, out, closeStreams = true )
454- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
455- if (fileOverwrite) {
456- targetFile.delete()
457- logInfo((" File %s exists and does not match contents of %s, " +
458- " replacing it with %s" ).format(targetFile, url, url))
459- } else {
460- tempFile.delete()
461- throw new SparkException (
462- " File " + targetFile + " exists and does not match contents of" + " " + url)
463- }
464- }
465- Files .move(tempFile, targetFile)
546+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
466547 case " file" =>
467548 // In the case of a local file, copy the local file to the target directory.
468549 // Note the difference between uri vs url.
469550 val sourceFile = if (uri.isAbsolute) new File (uri) else new File (url)
470- var shouldCopy = true
471- if (targetFile.exists) {
472- if (! Files .equal(sourceFile, targetFile)) {
473- if (fileOverwrite) {
474- targetFile.delete()
475- logInfo((" File %s exists and does not match contents of %s, " +
476- " replacing it with %s" ).format(targetFile, url, url))
477- } else {
478- throw new SparkException (
479- " File " + targetFile + " exists and does not match contents of" + " " + url)
480- }
481- } else {
482- // Do nothing if the file contents are the same, i.e. this file has been copied
483- // previously.
484- logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
485- + targetFile.getAbsolutePath)
486- shouldCopy = false
487- }
488- }
489-
490- if (shouldCopy) {
491- // The file does not exist in the target directory. Copy it there.
492- logInfo(" Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
493- Files .copy(sourceFile, targetFile)
494- }
551+ copyFile(url, sourceFile, targetFile, fileOverwrite)
495552 case _ =>
496553 // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
497554 val fs = getHadoopFileSystem(uri, hadoopConf)
498555 val in = fs.open(new Path (uri))
499- val out = new FileOutputStream (tempFile)
500- Utils .copyStream(in, out, closeStreams = true )
501- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
502- if (fileOverwrite) {
503- targetFile.delete()
504- logInfo((" File %s exists and does not match contents of %s, " +
505- " replacing it with %s" ).format(targetFile, url, url))
506- } else {
507- tempFile.delete()
508- throw new SparkException (
509- " File " + targetFile + " exists and does not match contents of" + " " + url)
510- }
511- }
512- Files .move(tempFile, targetFile)
556+ downloadFile(url, in, tempFile, targetFile, fileOverwrite)
513557 }
514558 }
515559
0 commit comments