Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,10 @@ Here are the details of all the sources in Spark.
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<br/>
<code>maxCachedFiles</code>: maximum number of files to cache to be processed in subsequent batches (default: 10000). If files are available in the cache, they will be read from first before listing from the input source.
<br/>
<code>discardCachedInputRatio</code>: ratio of cached files/bytes to max files/bytes to allow for listing from input source when there is less cached input than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the <code>maxFilesPerTrigger</code> is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. Similarly, if there are cached files that are 10 MB remaining for a batch, but the <code>maxBytesPerTrigger</code> is set to 100MB, the cached files would be discarded.
<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
matchedMode
}

/**
* maximum number of files to cache to be processed in subsequent batches
*/
val maxCachedFiles: Int = parameters.get("maxCachedFiles").map { str =>
Try(str.toInt).filter(_ >= 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxCachedFiles', must be an integer greater than or " +
"equal to 0")
}
}.getOrElse(10000)

/**
* ratio of cached input to max files/bytes to allow for listing from input source when
* there are fewer cached files/bytes than could be available to be read
*/
val discardCachedInputRatio: Float = parameters.get("discardCachedInputRatio").map { str =>
Try(str.toFloat).filter(x => 0 <= x && x <= 1).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'discardCachedInputRatio', must be a positive float " +
"between 0 and 1"
)
}
}.getOrElse(0.2f)

private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class FileStreamSource(
"the same and causes data lost.")
}


private val maxCachedFiles = sourceOptions.maxCachedFiles

private val discardCachedInputRatio = sourceOptions.discardCachedInputRatio

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
Expand Down Expand Up @@ -184,7 +189,7 @@ class FileStreamSource(
case files: ReadMaxFiles if !sourceOptions.latestFirst =>
// we can cache and reuse remaining fetched list of files in further batches
val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) {
if (usFiles.size < files.maxFiles() * discardCachedInputRatio) {
// Discard unselected files if the number of files are smaller than threshold.
// This is to avoid the case when the next batch would have too few files to read
// whereas there're new files available.
Expand All @@ -202,7 +207,7 @@ class FileStreamSource(
// we can cache and reuse remaining fetched list of files in further batches
val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
takeFilesUntilMax(newFiles, files.maxBytes())
if (rSize.toDouble < (files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
if (rSize.toDouble < (files.maxBytes() * discardCachedInputRatio)) {
// Discard unselected files if the total size of files is smaller than threshold.
// This is to avoid the case when the next batch would have too small of a size of
// files to read whereas there're new files available.
Expand All @@ -221,8 +226,8 @@ class FileStreamSource(
}

if (unselectedFiles != null && unselectedFiles.nonEmpty) {
logTrace(s"Taking first $MAX_CACHED_UNSEEN_FILES unread files.")
unreadFiles = unselectedFiles.take(MAX_CACHED_UNSEEN_FILES)
logTrace(s"Taking first $maxCachedFiles unread files.")
unreadFiles = unselectedFiles.take(maxCachedFiles)
logTrace(s"${unreadFiles.size} unread files are available for further batches.")
} else {
unreadFiles = null
Expand Down Expand Up @@ -426,9 +431,6 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

val DISCARD_UNSEEN_INPUT_RATIO = 0.2
val MAX_CACHED_UNSEEN_FILES = 10000

case class FileEntry(
path: String, // uri-encoded path string
timestamp: Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2305,7 +2305,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

// batch 5 will trigger list operation though the batch 4 should have 1 unseen file:
// 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO),
// 1 is smaller than the threshold (refer FileStreamOptions.discardCachedInputRatio),
// hence unseen files for batch 4 will be discarded.
val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
Expand Down Expand Up @@ -2357,6 +2357,97 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("Options for caching unread files") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10",
"maxCachedFiles" -> "12", "discardCachedInputRatio" -> "0.1")
val scheme = CountListingLocalFileSystem.scheme
val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
val metadataLog = source invokePrivate _metadataLog()

def verifyBatch(
offset: FileStreamSourceOffset,
expectedBatchId: Long,
inputFiles: Seq[File],
expectedFileOffset: Int,
expectedFilesInBatch: Int,
expectedListingCount: Int): Unit = {
val batchId = offset.logOffset
assert(batchId === expectedBatchId)

val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
assert(files.forall(_.batchId == batchId))

val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath }
val expectedInputFiles = inputFiles.slice(
expectedFileOffset,
expectedFileOffset + expectedFilesInBatch
)
.map(_.getCanonicalPath)
assert(actualInputFiles === expectedInputFiles)

assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}

CountListingLocalFileSystem.resetCount()

// provide 44 files in src, with sequential "last modified" to guarantee ordering
val inputFiles = (0 to 43).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(idx * 10000)
f
}

// first 3 batches only perform 1 listing
// batch 0 processes 10 (12 cached)
// batch 1 processes 10 from cache (2 cached)
// batch 2 processes 2 from cache (0 cached) since
// discardCachedInputRatio is less than threshold
val offsetBatch0 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch0, expectedBatchId = 0, inputFiles,
expectedFileOffset = 0, expectedFilesInBatch = 10, expectedListingCount = 1)
val offsetBatch1 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch1, expectedBatchId = 1, inputFiles,
expectedFileOffset = 10, expectedFilesInBatch = 10, expectedListingCount = 1)
val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch2, expectedBatchId = 2, inputFiles,
expectedFileOffset = 20, expectedFilesInBatch = 2, expectedListingCount = 1)

// next 3 batches perform another listing
// batch 3 processes 10 (12 cached)
// batch 4 processes 10 from cache (2 cached)
// batch 5 processes 2 from cache (0 cached) since
// discardCachedInputRatio is less than threshold
val offsetBatch3 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch3, expectedBatchId = 3, inputFiles,
expectedFileOffset = 22, expectedFilesInBatch = 10, expectedListingCount = 2)
val offsetBatch4 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch4, expectedBatchId = 4, inputFiles,
expectedFileOffset = 32, expectedFilesInBatch = 10, expectedListingCount = 2)
val offsetBatch5 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch5, expectedBatchId = 5, inputFiles,
expectedFileOffset = 42, expectedFilesInBatch = 2, expectedListingCount = 2)

// validate no remaining files and another listing is performed
val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
assert(5 === offsetBatch.logOffset)
assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}
}
}

test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") {
def formatTime(time: LocalDateTime): String = {
time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))
Expand Down