Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]

@transient private var fileSystem: FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]

/**
Expand Down Expand Up @@ -80,6 +79,7 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
// even after master fails, as the checkpoint data of `time` does not refer to those files
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
var fileSystem: FileSystem = null
filesToDelete.foreach {
case (time, file) =>
try {
Expand Down