diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index dc25adbdfd33..762b7398b4b0 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -572,6 +572,10 @@ Here are the details of all the sources in Spark.
maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
+ maxCachedFiles: maximum number of files to cache to be processed in subsequent batches (default: 10000). If files are available in the cache, they will be read from first before listing from the input source. +
+ discardCachedFilesRatio: ratio of cached files to max files to allow for listing from input source when there are fewer cached files than could be available to be read (default: 0.2). For example, if there are only 10 cached files remaining for a batch but the maxFilesPerTrigger is set to 100, the 10 cached files would be discarded and a new listing would be performed instead. +
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index ae0909559086..1915b2e91225 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -111,6 +111,30 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging matchedMode } + /** + * maximum number of files to cache to be processed in subsequent batches + */ + val maxCachedFiles: Int = parameters.get("maxCachedFiles").map { str => + Try(str.toInt).filter(_ >= 0).getOrElse { + throw new IllegalArgumentException( + s"Invalid value '$str' for option 'maxCachedFiles', must be an integer greater than or " + + "equal to 0") + } + }.getOrElse(10000) + + /** + * ratio of cached files to max files to allow for listing from input source when + * there are fewer cached files than could be available to be read + */ + val discardCachedFilesRatio: Float = parameters.get("discardCachedFilesRatio").map { str => + Try(str.toFloat).filter(x => 0 <= x && x <= 1).getOrElse { + throw new IllegalArgumentException( + s"Invalid value '$str' for option 'discardCachedFilesRatio', must be a positive float " + + "between 0 and 1" + ) + } + }.getOrElse(0.2f) + private def withBooleanParameter(name: String, default: Boolean) = { parameters.get(name).map { str => try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 51f310dcc04e..c98a85f062b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -108,6 +108,11 @@ class FileStreamSource( "the same and causes data lost.") } + + private val maxCachedFiles = sourceOptions.maxCachedFiles + + private val discardCachedFilesRatio = sourceOptions.discardCachedFilesRatio + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) @@ -151,7 +156,7 @@ class FileStreamSource( case files: ReadMaxFiles if !sourceOptions.latestFirst => // we can cache and reuse remaining fetched list of files in further batches val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles()) - if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) { + if (usFiles.size < files.maxFiles() * discardCachedFilesRatio) { // Discard unselected files if the number of files are smaller than threshold. // This is to avoid the case when the next batch would have too few files to read // whereas there're new files available. @@ -169,8 +174,8 @@ class FileStreamSource( } if (unselectedFiles != null && unselectedFiles.nonEmpty) { - logTrace(s"Taking first $MAX_CACHED_UNSEEN_FILES unread files.") - unreadFiles = unselectedFiles.take(MAX_CACHED_UNSEEN_FILES) + logTrace(s"Taking first $maxCachedFiles unread files.") + unreadFiles = unselectedFiles.take(maxCachedFiles) logTrace(s"${unreadFiles.size} unread files are available for further batches.") } else { unreadFiles = null @@ -368,9 +373,6 @@ object FileStreamSource { /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ type Timestamp = Long - val DISCARD_UNSEEN_FILES_RATIO = 0.2 - val MAX_CACHED_UNSEEN_FILES = 10000 - case class FileEntry( path: String, // uri-encoded path string timestamp: Timestamp, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 84cf20ede259..0b714d487b2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2241,7 +2241,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } // batch 5 will trigger list operation though the batch 4 should have 1 unseen file: - // 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO), + // 1 is smaller than the threshold (refer FileStreamOptions.discardCachedFilesRatio), // hence unseen files for batch 4 will be discarded. val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) .asInstanceOf[FileStreamSourceOffset] @@ -2293,6 +2293,97 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("Options for caching unread files") { + withCountListingLocalFileSystemAsLocalFileSystem { + withThreeTempDirs { case (src, meta, tmp) => + val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10", + "maxCachedFiles" -> "12", "discardCachedFilesRatio" -> "0.1") + val scheme = CountListingLocalFileSystem.scheme + val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text", + StructType(Nil), Seq.empty, meta.getCanonicalPath, options) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = source invokePrivate _metadataLog() + + def verifyBatch( + offset: FileStreamSourceOffset, + expectedBatchId: Long, + inputFiles: Seq[File], + expectedFileOffset: Int, + expectedFilesInBatch: Int, + expectedListingCount: Int): Unit = { + val batchId = offset.logOffset + assert(batchId === expectedBatchId) + + val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) + assert(files.forall(_.batchId == batchId)) + + val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath } + val expectedInputFiles = inputFiles.slice( + expectedFileOffset, + expectedFileOffset + expectedFilesInBatch + ) + .map(_.getCanonicalPath) + assert(actualInputFiles === expectedInputFiles) + + assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + + CountListingLocalFileSystem.resetCount() + + // provide 44 files in src, with sequential "last modified" to guarantee ordering + val inputFiles = (0 to 43).map { idx => + val f = createFile(idx.toString, new File(src, idx.toString), tmp) + f.setLastModified(idx * 10000) + f + } + + // first 3 batches only perform 1 listing + // batch 0 processes 10 (12 cached) + // batch 1 processes 10 from cache (2 cached) + // batch 2 processes 2 from cache (0 cached) since + // discardCachedFilesRatio is less than threshold + val offsetBatch0 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch0, expectedBatchId = 0, inputFiles, + expectedFileOffset = 0, expectedFilesInBatch = 10, expectedListingCount = 1) + val offsetBatch1 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch1, expectedBatchId = 1, inputFiles, + expectedFileOffset = 10, expectedFilesInBatch = 10, expectedListingCount = 1) + val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch2, expectedBatchId = 2, inputFiles, + expectedFileOffset = 20, expectedFilesInBatch = 2, expectedListingCount = 1) + + // next 3 batches perform another listing + // batch 3 processes 10 (12 cached) + // batch 4 processes 10 from cache (2 cached) + // batch 5 processes 2 from cache (0 cached) since + // discardCachedFilesRatio is less than threshold + val offsetBatch3 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch3, expectedBatchId = 3, inputFiles, + expectedFileOffset = 22, expectedFilesInBatch = 10, expectedListingCount = 2) + val offsetBatch4 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch4, expectedBatchId = 4, inputFiles, + expectedFileOffset = 32, expectedFilesInBatch = 10, expectedListingCount = 2) + val offsetBatch5 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + verifyBatch(offsetBatch5, expectedBatchId = 5, inputFiles, + expectedFileOffset = 42, expectedFilesInBatch = 2, expectedListingCount = 2) + + // validate no remaining files and another listing is performed + val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10)) + .asInstanceOf[FileStreamSourceOffset] + assert(5 === offsetBatch.logOffset) + assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled + .get(src.getCanonicalPath).map(_.get()).getOrElse(0)) + } + } + } + test("SPARK-31962: file stream source shouldn't allow modifiedBefore/modifiedAfter") { def formatTime(time: LocalDateTime): String = { time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))