Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
// It will fail if there is an existing file (someone has committed the batch)
logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
fileManager.rename(tempPath, batchIdToPath(batchId))

// SPARK-17475: HDFSMetadataLog should not leak CRC files
// If the underlying filesystem didn't rename the CRC file, delete it.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this specific to streaming, or would other parts of spark benefit if this behavior were changed in the file manager?

Copy link
Contributor Author

@frreiss frreiss Sep 12, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe HDFSMetadataLog is only called from Structured Streaming classes currently. The FileManager trait here is part of HDFSMetadataLog.

Copy link
Member

@jodersky jodersky Sep 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, looks good otherwise

val crcPath = new Path(tempPath.getParent(), s".${tempPath.getName()}.crc")
if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
return
} catch {
case e: IOException if isFileAlreadyExistsException(e) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
assert(metadataLog.get(1).isEmpty)
assert(metadataLog.get(2).isDefined)
assert(metadataLog.getLatest().get._1 == 2)

// There should be exactly one file, called "2", in the metadata directory.
// This check also tests for regressions of SPARK-17475
val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq
assert(allFiles.size == 1)
assert(allFiles(0).getName() == "2")
}
}

Expand Down