@@ -385,16 +385,12 @@ private[spark] object Utils extends Logging {
385385 } finally {
386386 lock.release()
387387 }
388- if (targetFile.exists && ! Files .equal(cachedFile, targetFile)) {
389- if (conf.getBoolean(" spark.files.overwrite" , false )) {
390- targetFile.delete()
391- logInfo((s " File $targetFile exists and does not match contents of $url, " +
392- s " replacing it with $url" ))
393- } else {
394- throw new SparkException (s " File $targetFile exists and does not match contents of $url" )
395- }
396- }
397- Files .copy(cachedFile, targetFile)
388+ copyFile(
389+ url,
390+ cachedFile,
391+ targetFile,
392+ conf.getBoolean(" spark.files.overwrite" , false )
393+ )
398394 } else {
399395 doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
400396 }
@@ -411,6 +407,85 @@ private[spark] object Utils extends Logging {
411407 FileUtil .chmod(targetFile.getAbsolutePath, " a+x" )
412408 }
413409
410+ /**
411+ * Download a file from `in` to `tempFile`, then move it to `destFile`, checking whether
412+ * `destFile` already exists, has the same contents as the downloaded file, and can be
413+ * overwritten.
414+ *
415+ * @param url URL that `sourceFile` originated from, for logging purposes.
416+ * @param in InputStream to download.
417+ * @param tempFile File path to download `in` to.
418+ * @param destFile File path to move `tempFile` to.
419+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
420+ * `sourceFile`
421+ */
422+ private def downloadStreamAndMove (
423+ url : String ,
424+ in : InputStream ,
425+ tempFile : File ,
426+ destFile : File ,
427+ fileOverwrite : Boolean ): Unit = {
428+
429+ val out = new FileOutputStream (tempFile)
430+ Utils .copyStream(in, out, closeStreams = true )
431+ copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true )
432+
433+ }
434+
435+ /**
436+ * Copy file from `sourceFile` to `destFile`, checking whether `destFile` already exists, has
437+ * the same contents as the downloaded file, and can be overwritten. Optionally removes
438+ * `sourceFile` by moving instead of copying.
439+ *
440+ * @param url URL that `sourceFile` originated from, for logging purposes.
441+ * @param sourceFile File path to copy/move from.
442+ * @param destFile File path to copy/move to.
443+ * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match
444+ * `sourceFile`
445+ * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to
446+ * `destFile`.
447+ */
448+ private def copyFile (
449+ url : String ,
450+ sourceFile : File ,
451+ destFile : File ,
452+ fileOverwrite : Boolean ,
453+ removeSourceFile : Boolean = false ): Unit = {
454+
455+ var shouldCopy = true
456+ if (destFile.exists) {
457+ if (! Files .equal(sourceFile, destFile)) {
458+ if (fileOverwrite) {
459+ destFile.delete()
460+ logInfo(
461+ s " File $destFile exists and does not match contents of $url, replacing it with $url"
462+ )
463+ } else {
464+ throw new SparkException (
465+ s " File $destFile exists and does not match contents of $url" )
466+ }
467+ } else {
468+ // Do nothing if the file contents are the same, i.e. this file has been copied
469+ // previously.
470+ logInfo(
471+ s " ${sourceFile.getAbsolutePath} has been previously copied to " +
472+ destFile.getAbsolutePath
473+ )
474+ shouldCopy = false
475+ }
476+ }
477+
478+ if (shouldCopy) {
479+ // The file does not exist in the target directory. Copy or move it there.
480+ if (removeSourceFile) {
481+ Files .move(sourceFile, destFile)
482+ } else {
483+ logInfo(s " Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}" )
484+ Files .copy(sourceFile, destFile)
485+ }
486+ }
487+ }
488+
414489 /**
415490 * Download a file to target directory. Supports fetching the file in a variety of ways,
416491 * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
@@ -449,67 +524,17 @@ private[spark] object Utils extends Logging {
449524 uc.setReadTimeout(timeout)
450525 uc.connect()
451526 val in = uc.getInputStream()
452- val out = new FileOutputStream (tempFile)
453- Utils .copyStream(in, out, closeStreams = true )
454- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
455- if (fileOverwrite) {
456- targetFile.delete()
457- logInfo((" File %s exists and does not match contents of %s, " +
458- " replacing it with %s" ).format(targetFile, url, url))
459- } else {
460- tempFile.delete()
461- throw new SparkException (
462- " File " + targetFile + " exists and does not match contents of" + " " + url)
463- }
464- }
465- Files .move(tempFile, targetFile)
527+ downloadStreamAndMove(url, in, tempFile, targetFile, fileOverwrite)
466528 case " file" =>
467529 // In the case of a local file, copy the local file to the target directory.
468530 // Note the difference between uri vs url.
469531 val sourceFile = if (uri.isAbsolute) new File (uri) else new File (url)
470- var shouldCopy = true
471- if (targetFile.exists) {
472- if (! Files .equal(sourceFile, targetFile)) {
473- if (fileOverwrite) {
474- targetFile.delete()
475- logInfo((" File %s exists and does not match contents of %s, " +
476- " replacing it with %s" ).format(targetFile, url, url))
477- } else {
478- throw new SparkException (
479- " File " + targetFile + " exists and does not match contents of" + " " + url)
480- }
481- } else {
482- // Do nothing if the file contents are the same, i.e. this file has been copied
483- // previously.
484- logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
485- + targetFile.getAbsolutePath)
486- shouldCopy = false
487- }
488- }
489-
490- if (shouldCopy) {
491- // The file does not exist in the target directory. Copy it there.
492- logInfo(" Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
493- Files .copy(sourceFile, targetFile)
494- }
532+ copyFile(url, sourceFile, targetFile, fileOverwrite, removeSourceFile = true )
495533 case _ =>
496534 // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
497535 val fs = getHadoopFileSystem(uri, hadoopConf)
498536 val in = fs.open(new Path (uri))
499- val out = new FileOutputStream (tempFile)
500- Utils .copyStream(in, out, closeStreams = true )
501- if (targetFile.exists && ! Files .equal(tempFile, targetFile)) {
502- if (fileOverwrite) {
503- targetFile.delete()
504- logInfo((" File %s exists and does not match contents of %s, " +
505- " replacing it with %s" ).format(targetFile, url, url))
506- } else {
507- tempFile.delete()
508- throw new SparkException (
509- " File " + targetFile + " exists and does not match contents of" + " " + url)
510- }
511- }
512- Files .move(tempFile, targetFile)
537+ downloadStreamAndMove(url, in, tempFile, targetFile, fileOverwrite)
513538 }
514539 }
515540
0 commit comments