Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ Here are the details of all the sources in Spark.
<br/>
<code>latestFirst</code>: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
<br/>
<code>includeModifiedFiles</code>: whether to include modified files to be processed, useful when the source producer eventually overrides files with new content (default:false)
<br/>
Note that the whole file content will be processed in case of modification. If the modification is an append and not an override, the previous processed content will be processed again, changing the semantics to "at-least-once".
<br/>
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

/**
* Whether to include modified files to be processed.
*
* The default behavior is `false` where only the filename is considered to determine if a file
* should be processed. When this is set to `true` the file timestamp is also tested if is greater
* than last time it was processed, as an indication that it's modified and have different
* content. It is useful when the source producer eventually overrides files with new content.
*
* Is important to note that the whole file content will be processed in case of modification. If
* the modification is an append and not an override, the previous processed content will be
* processed again, changing the semantics to "at-least-once".
*/
val includeModifiedFiles: Boolean = withBooleanParameter("includeModifiedFiles", false)

/**
* The archive directory to move completed files. The option will be only effective when
* "cleanSource" is set to "archive".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class FileStreamSource(

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly, sourceOptions.includeModifiedFiles)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
Expand Down Expand Up @@ -290,7 +290,7 @@ object FileStreamSource {
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean, includeModifiedFiles: Boolean = false) {
require(maxAgeMs >= 0)

/** Mapping from file to its timestamp. */
Expand Down Expand Up @@ -319,9 +319,14 @@ object FileStreamSource {
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
val stripedPath = stripPathIfNecessary(path)
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
if (includeModifiedFiles) {
timestamp >= lastPurgeTimestamp && timestamp > map.getOrDefault(stripedPath, 0)
} else {
timestamp >= lastPurgeTimestamp && !map.containsKey(stripedPath)
}
}

/** Removes aged entries and returns the number of files removed. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ abstract class FileStreamSourceTest
}
}

case class OverrideTextFileData(content: String, file: File, src: File = null)
extends AddFileData {

override def addData(source: FileStreamSource): Unit = {
stringToFile(file, content)
logInfo(s"Appended text '$content' to file $file")
}
}

case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddOrcFileData.writeToFile(data, src, tmp)
Expand Down Expand Up @@ -569,6 +578,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-26875: Include modified files for processing") {
withTempDirs { case (src, tmp) =>
val textStream: DataFrame =
createFileStream("text", src.getCanonicalPath,
options = Map("includeModifiedFiles" -> "true"))

val modifiedFile = new File(src, "modified.txt")

testStream(textStream)(
// add data into the file, should process as usual
OverrideTextFileData("a\nb", modifiedFile),
CheckAnswer("a", "b"),

// Unfortunately since a lot of file system does not have modification time granularity
// finer grained than 1 sec, we need to use 1 sec here.
AssertOnQuery { _ => Thread.sleep(1000); true },

// override the file, should consider the new content as well
OverrideTextFileData("c\nd", modifiedFile),
CheckAnswer("a", "b", "c", "d")
)
}
}

// =============== JSON file stream tests ================

test("read from json files") {
Expand Down