Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,11 @@ class FileStreamSource(
}
}

val shouldCache = !sourceOptions.latestFirst && allFilesForTriggerAvailableNow == null

// Obey user's setting to limit the number of files in this batch trigger.
val (batchFiles, unselectedFiles) = limit match {
case files: ReadMaxFiles if !sourceOptions.latestFirst =>
case files: ReadMaxFiles if shouldCache =>
// we can cache and reuse remaining fetched list of files in further batches
val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
if (usFiles.size < files.maxFiles() * discardCachedInputRatio) {
Expand All @@ -200,10 +202,10 @@ class FileStreamSource(
}

case files: ReadMaxFiles =>
// implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
// don't use the cache, just take files for the next batch
(newFiles.take(files.maxFiles()), null)

case files: ReadMaxBytes if !sourceOptions.latestFirst =>
case files: ReadMaxBytes if shouldCache =>
// we can cache and reuse remaining fetched list of files in further batches
val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
takeFilesUntilMax(newFiles, files.maxBytes())
Expand All @@ -218,8 +220,8 @@ class FileStreamSource(
}

case files: ReadMaxBytes =>
// don't use the cache, just take files for the next batch
val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, files.maxBytes())
// implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
(bFiles, null)

case _: ReadAllAvailable => (newFiles, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,51 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-48314: Don't cache unread files when using Trigger.AvailableNow") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "5",
"maxCachedFiles" -> "2")
val scheme = CountListingLocalFileSystem.scheme
val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
val metadataLog = source invokePrivate _metadataLog()

// provide 20 files in src, with sequential "last modified" to guarantee ordering
(0 to 19).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(idx * 10000)
f
}

source.prepareForTriggerAvailableNow()
CountListingLocalFileSystem.resetCount()

var offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
.asInstanceOf[FileStreamSourceOffset]
var files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry])

// All files are already tracked in allFilesForTriggerAvailableNow
assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
// Should be 5 files in the batch based on maxFiles limit
assert(files.length == 5)

// Reading again leverages the files already tracked in allFilesForTriggerAvailableNow,
// so no more listings need to happen
offset = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
.asInstanceOf[FileStreamSourceOffset]
files = metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry])

assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
// Should be 5 files in the batch since cached files are ignored
assert(files.length == 5)
}
}
}

test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") {
def formatTime(time: LocalDateTime): String = {
time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
Expand Down