diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index a371f4f50f9f0..6d267f2dbeca0 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -547,6 +547,12 @@ Here are the details of all the sources in Spark.
maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
+ inputRetention: Maximum age of a file that can be found in this directory, before it is ignored. (e.g. 14d, default: None)
+ This is the "hard" limit of input data retention - input files older than the max age will be ignored regardless of source options (while `maxFileAgeMs` depends on the condition), as well as entries in checkpoint metadata will be purged based on this.
+ Unlike `maxFileAgeMs`, the max age is specified with respect to the timestamp of the current system, to provide consistent behavior regardless of metadata entries.
+ NOTE 1: Please be careful to set the value if the query replays from the old input files.
+ NOTE 2: Please make sure the timestamp is in sync between nodes which run the query.
+
cleanSource: option to clean up completed files after processing.
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 712ed1585bc8a..55d36a86ccd86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -76,6 +76,21 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * Maximum age of a file that can be found in this directory, before it is ignored. + * + * This is the "hard" limit of input data retention - input files older than the max age will be + * ignored regardless of source options (while `maxFileAgeMs` depends on the condition), as well + * as entries in checkpoint metadata will be purged based on this. + * + * Unlike `maxFileAgeMs`, the max age is specified with respect to the timestamp of the current + * system, to provide consistent behavior regardless of metadata entries. + * + * NOTE 1: Please be careful to set the value if the query replays from the old input files. + * NOTE 2: Please make sure the timestamp is in sync between nodes which run the query. + */ + val inputRetentionMs = parameters.get("inputRetention").map(Utils.timeStringAsMs) + /** * The archive directory to move completed files. The option will be only effective when * "cleanSource" is set to "archive". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 03d86e42e4db7..5e3f72ff5ea9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimi import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** * A very simple source that reads files from the given directory as they appear. @@ -71,8 +71,13 @@ class FileStreamSource( Map() }} + /** exposed for testing */ + var clockForRetention: Clock = new SystemClock + private val inputRetentionMs = sourceOptions.inputRetentionMs.getOrElse(Long.MaxValue) + private val metadataLog = - new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath) + new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath, + clockForRetention, inputRetentionMs) private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L) /** Maximum number of new files to be considered in each batch */ @@ -295,9 +300,15 @@ class FileStreamSource( case Some(false) => allFiles = allFilesUsingInMemoryFileIndex() } - val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => - (status.getPath.toUri.toString, status.getModificationTime) - } + val curTime = clockForRetention.getTimeMillis() + val files = allFiles + .filter { file => + curTime - file.getModificationTime <= inputRetentionMs + } + .sortBy(_.getModificationTime)(fileSortOrder) + .map { status => + (status.getPath.toUri.toString, status.getModificationTime) + } val endTime = System.nanoTime val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) if (listingTimeMs > 2000) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index 88a2326c9a02c..69035d88d3493 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -28,11 +28,14 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Clock class FileStreamSourceLog( metadataLogVersion: Int, sparkSession: SparkSession, - path: String) + path: String, + clock: Clock, + inputRetentionMs: Long) extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) { import CompactibleFileStreamLog._ @@ -61,6 +64,11 @@ class FileStreamSourceLog( } } + override def shouldRetain(log: FileEntry): Boolean = { + val curTime = clock.getTimeMillis() + curTime - log.timestamp <= inputRetentionMs + } + override def add(batchId: Long, logs: Array[FileEntry]): Boolean = { if (super.add(batchId, logs)) { if (isCompactionBatch(batchId, compactInterval)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index cf9664a9764be..39a7493937bac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.types.{StructType, _} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ManualClock, SystemClock, Utils} abstract class FileStreamSourceTest extends StreamTest with SharedSparkSession with PrivateMethodTester { @@ -1461,7 +1461,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { private def readLogFromResource(dir: String): Seq[FileEntry] = { val input = getClass.getResource(s"/structured-streaming/$dir") - val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString) + val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString, + new SystemClock, Long.MaxValue) log.allFiles() } @@ -1614,7 +1615,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest { // add the metadata entries as a pre-req val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir val metadataLog = - new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath) + new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath, + new SystemClock, Long.MaxValue) assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0)))) assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0)))) @@ -1625,6 +1627,87 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("old files should not be included as input files if input retention is specified") { + withTempDir { dir => + /** Create a text file with a single data item */ + def createFile(data: Int, timestamp: Long): File = { + val file = stringToFile(new File(dir, s"$data.txt"), data.toString) + file.setLastModified(timestamp) + file + } + + def validate( + files: Seq[File], + inputRetention: Long, + timeToAdvance: Long, + limit: ReadLimit, + expectedFiles: Seq[File]): Unit = { + val df = createFileStream("text", dir.getAbsolutePath, + options = Map("inputRetention" -> inputRetention.toString, "maxFileAge" -> "7d")) + val fileSource = getSourceFromFileStream(df) + val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog")) + val metadataLog = fileSource invokePrivate _metadataLog() + + val clock = new ManualClock + fileSource.clockForRetention = clock + clock.advance(timeToAdvance) + + val offset = fileSource.latestOffset(FileStreamSourceOffset(-1L), limit) + .asInstanceOf[FileStreamSourceOffset] + + val inputFiles = metadataLog.get(offset.logOffset) + assert(inputFiles.nonEmpty) + + val inputFileNames = inputFiles.get.map { f => new File(f.path).getName }.toSet + val expectedFileNames = expectedFiles.map(_.getName).toSet + assert(inputFileNames === expectedFileNames) + } + + // files will have various modified times between 1000 to 10000 + val files = (1 to 10).map { idx => + createFile(idx, 1000 * idx) + } + + // we initialize FileStreamSource per case as the test will be affected by SeenFilesMap + validate(files, 5000, 10000, ReadLimit.allAvailable(), + files.filter(_.lastModified() >= 5000)) + validate(files, 5000, 10000, ReadLimit.maxFiles(2), + files.filter(_.lastModified() >= 5000).take(2)) + } + } + + test("filter out outdated entries in file stream source log when compacting") { + withTempDir { dir => + val clock = new ManualClock + val metadataLog = + new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath, + clock, 10000) + + val files1 = (1 to 3).map { idx => + FileEntry(s"1_$idx", idx * 1000, 1) + } + + clock.advance(4000) // clock time = 4000 + + files1.foreach { f => assert(metadataLog.shouldRetain(f)) } + + val files2 = (1 to 3).map { idx => + FileEntry(s"2_$idx", 10000 + idx * 1000, 2) + } + + val allFiles = files1 ++ files2 + + allFiles.foreach { f => assert(metadataLog.shouldRetain(f)) } + + clock.advance(10000) // clock time = 14000 + + // only files in batch 1 will be evicted + + files1.foreach { f => assert(!metadataLog.shouldRetain(f)) } + files2.foreach { f => assert(metadataLog.shouldRetain(f)) } + } + } + test("SPARK-26629: multiple file sources work with restarts when a source does not have data") { withTempDirs { case (dir, tmp) => val sourceDir1 = new File(dir, "source1")