Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,12 @@ Here are the details of all the sources in Spark.
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<br/>
<code>inputRetention</code>: Maximum age of a file that can be found in this directory, before it is ignored. (e.g. 14d, default: None)<br/>
This is the "hard" limit of input data retention - input files older than the max age will be ignored regardless of source options (while `maxFileAgeMs` depends on the condition), as well as entries in checkpoint metadata will be purged based on this.<br/>
Unlike `maxFileAgeMs`, the max age is specified with respect to the timestamp of the current system, to provide consistent behavior regardless of metadata entries.<br/>
NOTE 1: Please be careful to set the value if the query replays from the old input files.<br/>
NOTE 2: Please make sure the timestamp is in sync between nodes which run the query.<br/>
<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

/**
* Maximum age of a file that can be found in this directory, before it is ignored.
*
* This is the "hard" limit of input data retention - input files older than the max age will be
* ignored regardless of source options (while `maxFileAgeMs` depends on the condition), as well
* as entries in checkpoint metadata will be purged based on this.
*
* Unlike `maxFileAgeMs`, the max age is specified with respect to the timestamp of the current
* system, to provide consistent behavior regardless of metadata entries.
*
* NOTE 1: Please be careful to set the value if the query replays from the old input files.
* NOTE 2: Please make sure the timestamp is in sync between nodes which run the query.
*/
val inputRetentionMs = parameters.get("inputRetention").map(Utils.timeStringAsMs)

/**
* The archive directory to move completed files. The option will be only effective when
* "cleanSource" is set to "archive".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimi
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}

/**
* A very simple source that reads files from the given directory as they appear.
Expand Down Expand Up @@ -71,8 +71,13 @@ class FileStreamSource(
Map()
}}

/** exposed for testing */
var clockForRetention: Clock = new SystemClock
private val inputRetentionMs = sourceOptions.inputRetentionMs.getOrElse(Long.MaxValue)

private val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath,
clockForRetention, inputRetentionMs)
private var metadataLogCurrentOffset = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
Expand Down Expand Up @@ -295,9 +300,15 @@ class FileStreamSource(
case Some(false) => allFiles = allFilesUsingInMemoryFileIndex()
}

val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
val curTime = clockForRetention.getTimeMillis()
val files = allFiles
.filter { file =>
curTime - file.getModificationTime <= inputRetentionMs
}
.sortBy(_.getModificationTime)(fileSortOrder)
.map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
if (listingTimeMs > 2000) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Clock

class FileStreamSourceLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
path: String,
clock: Clock,
inputRetentionMs: Long)
extends CompactibleFileStreamLog[FileEntry](metadataLogVersion, sparkSession, path) {

import CompactibleFileStreamLog._
Expand Down Expand Up @@ -61,6 +64,11 @@ class FileStreamSourceLog(
}
}

override def shouldRetain(log: FileEntry): Boolean = {
val curTime = clock.getTimeMillis()
curTime - log.timestamp <= inputRetentionMs
}

override def add(batchId: Long, logs: Array[FileEntry]): Boolean = {
if (super.add(batchId, logs)) {
if (isCompactionBatch(batchId, compactInterval)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.{StructType, _}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ManualClock, SystemClock, Utils}

abstract class FileStreamSourceTest
extends StreamTest with SharedSparkSession with PrivateMethodTester {
Expand Down Expand Up @@ -1461,7 +1461,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

private def readLogFromResource(dir: String): Seq[FileEntry] = {
val input = getClass.getResource(s"/structured-streaming/$dir")
val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString)
val log = new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, input.toString,
new SystemClock, Long.MaxValue)
log.allFiles()
}

Expand Down Expand Up @@ -1614,7 +1615,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
// add the metadata entries as a pre-req
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath,
new SystemClock, Long.MaxValue)
assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L, 0))))
assert(metadataLog.add(1, Array(FileEntry(s"$scheme:///file2", 200L, 0))))

Expand All @@ -1625,6 +1627,87 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("old files should not be included as input files if input retention is specified") {
withTempDir { dir =>
/** Create a text file with a single data item */
def createFile(data: Int, timestamp: Long): File = {
val file = stringToFile(new File(dir, s"$data.txt"), data.toString)
file.setLastModified(timestamp)
file
}

def validate(
files: Seq[File],
inputRetention: Long,
timeToAdvance: Long,
limit: ReadLimit,
expectedFiles: Seq[File]): Unit = {
val df = createFileStream("text", dir.getAbsolutePath,
options = Map("inputRetention" -> inputRetention.toString, "maxFileAge" -> "7d"))
val fileSource = getSourceFromFileStream(df)
val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
val metadataLog = fileSource invokePrivate _metadataLog()

val clock = new ManualClock
fileSource.clockForRetention = clock
clock.advance(timeToAdvance)

val offset = fileSource.latestOffset(FileStreamSourceOffset(-1L), limit)
.asInstanceOf[FileStreamSourceOffset]

val inputFiles = metadataLog.get(offset.logOffset)
assert(inputFiles.nonEmpty)

val inputFileNames = inputFiles.get.map { f => new File(f.path).getName }.toSet
val expectedFileNames = expectedFiles.map(_.getName).toSet
assert(inputFileNames === expectedFileNames)
}

// files will have various modified times between 1000 to 10000
val files = (1 to 10).map { idx =>
createFile(idx, 1000 * idx)
}

// we initialize FileStreamSource per case as the test will be affected by SeenFilesMap
validate(files, 5000, 10000, ReadLimit.allAvailable(),
files.filter(_.lastModified() >= 5000))
validate(files, 5000, 10000, ReadLimit.maxFiles(2),
files.filter(_.lastModified() >= 5000).take(2))
}
}

test("filter out outdated entries in file stream source log when compacting") {
withTempDir { dir =>
val clock = new ManualClock
val metadataLog =
new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath,
clock, 10000)

val files1 = (1 to 3).map { idx =>
FileEntry(s"1_$idx", idx * 1000, 1)
}

clock.advance(4000) // clock time = 4000

files1.foreach { f => assert(metadataLog.shouldRetain(f)) }

val files2 = (1 to 3).map { idx =>
FileEntry(s"2_$idx", 10000 + idx * 1000, 2)
}

val allFiles = files1 ++ files2

allFiles.foreach { f => assert(metadataLog.shouldRetain(f)) }

clock.advance(10000) // clock time = 14000

// only files in batch 1 will be evicted

files1.foreach { f => assert(!metadataLog.shouldRetain(f)) }
files2.foreach { f => assert(metadataLog.shouldRetain(f)) }
}
}

test("SPARK-26629: multiple file sources work with restarts when a source does not have data") {
withTempDirs { case (dir, tmp) =>
val sourceDir1 = new File(dir, "source1")
Expand Down