Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class FileStreamSource(
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized {
val newFiles = if (unreadFiles != null) {
val newFiles = if (unreadFiles != null && unreadFiles.nonEmpty) {
logDebug(s"Reading from unread files - ${unreadFiles.size} files are available.")
unreadFiles
} else {
Expand Down Expand Up @@ -227,13 +227,16 @@ class FileStreamSource(
case _: ReadAllAvailable => (newFiles, null)
}

if (unselectedFiles != null && unselectedFiles.nonEmpty) {
// need to ensure that if maxCachedFiles is set to 0 that the next batch will be forced to
// list files again
if (unselectedFiles != null && unselectedFiles.nonEmpty && maxCachedFiles > 0) {
logTrace(s"Taking first $maxCachedFiles unread files.")
unreadFiles = unselectedFiles.take(maxCachedFiles)
logTrace(s"${unreadFiles.size} unread files are available for further batches.")
} else {
unreadFiles = null
logTrace(s"No unread file is available for further batches.")
logTrace(s"No unread file is available for further batches or maxCachedFiles has been set " +
s" to 0 to disable caching.")
}

batchFiles.foreach { case NewFileEntry(p, _, timestamp) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2448,6 +2448,71 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-48802: Ensure maxCachedFiles set to 0 forces each batch to list files") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10",
"maxCachedFiles" -> "0")
val scheme = CountListingLocalFileSystem.scheme
val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
val metadataLog = source invokePrivate _metadataLog()

def verifyBatch(
offset: FileStreamSourceOffset,
expectedBatchId: Long,
inputFiles: Seq[File],
expectedFileOffset: Int,
expectedFilesInBatch: Int,
expectedListingCount: Int): Unit = {
val batchId = offset.logOffset
assert(batchId === expectedBatchId)

val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
assert(files.forall(_.batchId == batchId))

val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath }
val expectedInputFiles = inputFiles.slice(
expectedFileOffset,
expectedFileOffset + expectedFilesInBatch
)
.map(_.getCanonicalPath)
assert(actualInputFiles === expectedInputFiles)

assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}

CountListingLocalFileSystem.resetCount()

// provide 20 files in src, with sequential "last modified" to guarantee ordering
val inputFiles = (0 to 19).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(idx * 10000)
f
}

// each batch should perform a listing since maxCachedFiles is set to 0
val offsetBatch0 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch0, expectedBatchId = 0, inputFiles,
expectedFileOffset = 0, expectedFilesInBatch = 10, expectedListingCount = 1)
val offsetBatch1 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch1, expectedBatchId = 1, inputFiles,
expectedFileOffset = 10, expectedFilesInBatch = 10, expectedListingCount = 2)

// validate no remaining files and another listing is performed
val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
assert(1 === offsetBatch.logOffset)
assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}
}
}

test("SPARK-48314: Don't cache unread files when using Trigger.AvailableNow") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
Expand Down