From 69de12ae77ca66ac5ce95aebdbc4f518f50b4189 Mon Sep 17 00:00:00 2001 From: Mike Dias Date: Thu, 14 Feb 2019 12:31:39 +1100 Subject: [PATCH 1/2] Add an option on FileStreamSource to include modified files --- .../structured-streaming-programming-guide.md | 2 ++ .../streaming/FileStreamOptions.scala | 10 ++++++ .../streaming/FileStreamSource.scala | 11 +++++-- .../sql/streaming/FileStreamSourceSuite.scala | 33 +++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index e76b53dbb4dc..f490d4fb53a5 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -524,6 +524,8 @@ Here are the details of all the sources in Spark.
latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
+ includeModifiedFiles: whether to include modified files to be processed, useful when the source producer eventually overrides files with new content (default:false) +
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
"file:///dataset.txt"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 1d57cb084df9..4164745fb62c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -74,6 +74,16 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * Whether to include modified files to be processed. + * + * The default behavior is `false` where only the filename is considered to determine if a file + * should be processed. When this is set to `true` the file timestamp is also tested if is greater + * than last time it was processed, as an indication that it's modified and have different + * content. It is useful when the source producer eventually overrides files with new content. + */ + val includeModifiedFiles: Boolean = withBooleanParameter("includeModifiedFiles", false) + private def withBooleanParameter(name: String, default: Boolean) = { parameters.get(name).map { str => try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 103fa7ce9066..b63c8dbdab93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -91,7 +91,7 @@ class FileStreamSource( /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) + val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly, sourceOptions.includeModifiedFiles) metadataLog.allFiles().foreach { entry => seenFiles.add(entry.path, entry.timestamp) @@ -278,7 +278,7 @@ object FileStreamSource { * To prevent the hash map from growing indefinitely, a purge function is available to * remove files "maxAgeMs" older than the latest file. */ - class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) { + class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean, includeModifiedFiles: Boolean = false) { require(maxAgeMs >= 0) /** Mapping from file to its timestamp. */ @@ -307,9 +307,14 @@ object FileStreamSource { * if it is new enough that we are still tracking, and we have not seen it before. */ def isNewFile(path: String, timestamp: Timestamp): Boolean = { + val stripedPath = stripPathIfNecessary(path) // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that // is older than (latestTimestamp - maxAgeMs) but has not been purged yet. - timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path)) + if (includeModifiedFiles) { + timestamp >= lastPurgeTimestamp && timestamp > map.getOrDefault(stripedPath, 0) + } else { + timestamp >= lastPurgeTimestamp && !map.containsKey(stripedPath) + } } /** Removes aged entries and returns the number of files removed. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9235c6d7c896..2721f04d7ba8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -101,6 +101,15 @@ abstract class FileStreamSourceTest } } + case class AppendTextFileData(content: String, file: File, src: File = null) + extends AddFileData { + + override def addData(source: FileStreamSource): Unit = { + stringToFile(file, content) + logInfo(s"Appended text '$content' to file $file") + } + } + case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData { override def addData(source: FileStreamSource): Unit = { AddOrcFileData.writeToFile(data, src, tmp) @@ -534,6 +543,30 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-26875: Include modified files for processing") { + withTempDirs { case (src, tmp) => + val textStream: DataFrame = + createFileStream("text", src.getCanonicalPath, + options = Map("includeModifiedFiles" -> "true")) + + val modifiedFile = new File(src, "modified.txt") + + testStream(textStream)( + // add data into the file, should process as usual + AppendTextFileData("a\nb", modifiedFile), + CheckAnswer("a", "b"), + + // Unfortunately since a lot of file system does not have modification time granularity + // finer grained than 1 sec, we need to use 1 sec here. + AssertOnQuery { _ => Thread.sleep(1000); true }, + + // modify the file, should consider the new content as well + AppendTextFileData("c\nd", modifiedFile), + CheckAnswer("a", "b", "c", "d") + ) + } + } + // =============== JSON file stream tests ================ test("read from json files") { From 9eff64ab7c5a7a94d9f65617f3f84c616120ee00 Mon Sep 17 00:00:00 2001 From: Mike Dias Date: Thu, 14 Feb 2019 13:46:58 +1100 Subject: [PATCH 2/2] Clarifying the effects of append and override a file --- docs/structured-streaming-programming-guide.md | 2 ++ .../spark/sql/execution/streaming/FileStreamOptions.scala | 4 ++++ .../spark/sql/streaming/FileStreamSourceSuite.scala | 8 ++++---- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index f490d4fb53a5..6c5dc0cdc780 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -526,6 +526,8 @@ Here are the details of all the sources in Spark.
includeModifiedFiles: whether to include modified files to be processed, useful when the source producer eventually overrides files with new content (default:false)
+ Note that the whole file content will be processed in case of modification. If the modification is an append and not an override, the previous processed content will be processed again, changing the semantics to "at-least-once". +
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
"file:///dataset.txt"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 4164745fb62c..ad53ce4f01e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -81,6 +81,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging * should be processed. When this is set to `true` the file timestamp is also tested if is greater * than last time it was processed, as an indication that it's modified and have different * content. It is useful when the source producer eventually overrides files with new content. + * + * Is important to note that the whole file content will be processed in case of modification. If + * the modification is an append and not an override, the previous processed content will be + * processed again, changing the semantics to "at-least-once". */ val includeModifiedFiles: Boolean = withBooleanParameter("includeModifiedFiles", false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 2721f04d7ba8..ed8eeaa59534 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -101,7 +101,7 @@ abstract class FileStreamSourceTest } } - case class AppendTextFileData(content: String, file: File, src: File = null) + case class OverrideTextFileData(content: String, file: File, src: File = null) extends AddFileData { override def addData(source: FileStreamSource): Unit = { @@ -553,15 +553,15 @@ class FileStreamSourceSuite extends FileStreamSourceTest { testStream(textStream)( // add data into the file, should process as usual - AppendTextFileData("a\nb", modifiedFile), + OverrideTextFileData("a\nb", modifiedFile), CheckAnswer("a", "b"), // Unfortunately since a lot of file system does not have modification time granularity // finer grained than 1 sec, we need to use 1 sec here. AssertOnQuery { _ => Thread.sleep(1000); true }, - // modify the file, should consider the new content as well - AppendTextFileData("c\nd", modifiedFile), + // override the file, should consider the new content as well + OverrideTextFileData("c\nd", modifiedFile), CheckAnswer("a", "b", "c", "d") ) }