Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class FileStreamSource(

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

private var warnedIgnoringCleanSourceOption: Boolean = false

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
Expand All @@ -267,10 +269,22 @@ class FileStreamSource(
val logOffset = FileStreamSourceOffset(end).logOffset

sourceCleaner.foreach { cleaner =>
val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
val validFileEntities = files.filter(_.batchId == logOffset)
logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
validFileEntities.foreach(cleaner.clean)
sourceHasMetadata match {
case Some(true) if !warnedIgnoringCleanSourceOption =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that it's called more than once? Such case case _ => will win.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes missed that. Nice finding.

logWarning("Ignoring 'cleanSource' option since source path refers to the output" +
" directory of FileStreamSink.")
warnedIgnoringCleanSourceOption = true

case Some(false) =>
val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
val validFileEntities = files.filter(_.batchId == logOffset)
logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
validFileEntities.foreach(cleaner.clean)

case _ =>
logWarning("Ignoring 'cleanSource' option since Spark hasn't figured out whether " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just put logWarning here - I was about to throw IllegalStateException here since it doesn't sound feasible to have some files from commit() and FileStreamSource still cannot decide, but there might be some edge-case so avoided being aggressive here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about throwing an UnsupportedOperationException here:

new MetadataLogFileIndex(sparkSession, qualifiedBasePath,

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only "odd" case I can imagine to reach here is,

  1. the query executed which wrote the commit log of the last batch and stopped before writing the offset for next batch.
  2. the query is restarted, and constructNextBatch is called.
  3. somehow the source files are all deleted between 1) and 2), hence FileStreamSource doesn't see any file and cannot decide when fetchAllFiles is called.
  4. constructNextBatch will call commit for previous batch the query executed before.

It's obviously very odd case as the content of source directory are modified (maybe) manually which we don't support the case (so throwing exception would be OK), but I'm not fully sure there's no another edge-cases.

Btw, where do you recommend to add the exception? L287, or L205? If you're suggesting to add the exception in L205, I'm not sure I follow. If I'm understanding correctly, the case if the logic reaches case _ won't reach L205.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not yet see which place is the suggestion refers to.

L205, I'm not sure I follow

+1

L287: As I see this is more or less the should never happen case. The question is whether we can consider edge cases which may hit this. If we miss a valid case and we're throwing exception here we may block a query to start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3. somehow the source files are all deleted between 1) and 2)

This should be a user error.

My general point is we should make sure the data files and the metadata in _spark_metadata are consistent and we should prevent from cleaning up data files that are still tracked. Logging a warning without really deleting files is a solution, however, most of users won't be able to notice this warning from their logs. Hence we should detect this earlier. There is already a variable sourceHasMetadata tracking whether the source is reading from a file stream sink or not. We can check the options and throw an exception when flipping it. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK I guess I got your point now. I'm also in favor of being "fail-fast" and the suggestion fits it. Thanks! Just updated.

"source path refers to the output directory of FileStreamSink or not.")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
Expand Down Expand Up @@ -149,6 +150,20 @@ abstract class FileStreamSourceTest
}
}

case class AddFilesToFileStreamSinkLog(
fs: FileSystem,
srcDir: Path,
sinkLog: FileStreamSinkLog,
batchId: Int)(
pathFilter: Path => Boolean) extends ExternalAction {
override def runAction(): Unit = {
val statuses = fs.listStatus(srcDir, new PathFilter {
override def accept(path: Path): Boolean = pathFilter(path)
})
sinkLog.add(batchId, statuses.map { s => SinkFileStatus(s) })
}
}

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStream(
format: String,
Expand Down Expand Up @@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

test("remove completed files when remove option is enabled") {
def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
assert(!files.exists(_.startsWith(fileName)))
}

def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = {
assert(files.exists(_.startsWith(fileName)))
}

withTempDirs { case (src, tmp) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
Expand All @@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("keep1"),
AssertOnQuery("input file removed") { _: StreamExecution =>
// it doesn't rename any file yet
assertFileIsNotRemoved(src.list(), "keep1")
assertFileIsNotRemoved(src, "keep1")
true
},
AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
CheckAnswer("keep1", "keep2"),
AssertOnQuery("input file removed") { _: StreamExecution =>
val files = src.list()

// it renames input file for first batch, but not for second batch yet
assertFileIsRemoved(files, "keep1")
assertFileIsNotRemoved(files, "ke ep2 %")
assertFileIsRemoved(src, "keep1")
assertFileIsNotRemoved(src, "ke ep2 %")

true
},
AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
CheckAnswer("keep1", "keep2", "keep3"),
AssertOnQuery("input file renamed") { _: StreamExecution =>
val files = src.list()

// it renames input file for second batch, but not third batch yet
assertFileIsRemoved(files, "ke ep2 %")
assertFileIsNotRemoved(files, "keep3")
assertFileIsRemoved(src, "ke ep2 %")
assertFileIsNotRemoved(src, "keep3")

true
}
Expand Down Expand Up @@ -1739,6 +1742,58 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

Seq("delete", "archive").foreach { cleanOption =>
test(s"skip $cleanOption when source path refers the output dir of FileStreamSink") {
withThreeTempDirs { case (src, tmp, archiveDir) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
// Force deleting the old logs
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
) {
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
"cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)

val fileStream = createFileStream("text", src.getCanonicalPath, options = option)
val filtered = fileStream.filter($"value" contains "keep")

// create FileStreamSinkLog under source directory
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
new File(src, FileStreamSink.metadataDir).getCanonicalPath)
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val srcPath = new Path(src.getCanonicalPath)
val fileSystem = srcPath.getFileSystem(hadoopConf)

// Here we will just check whether the source file is removed or not, as we cover
// functionality test of "archive" in other UT.
testStream(filtered)(
AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path =>
path.getName.startsWith("keep1")
},
CheckAnswer("keep1"),
AssertOnQuery("input file removed") { _: StreamExecution =>
// it doesn't remove any files for recent batch yet
assertFileIsNotRemoved(src, "keep1")
true
},
AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 1) { path =>
path.getName.startsWith("ke ep2 %")
},
CheckAnswer("keep1", "keep2"),
AssertOnQuery("input file removed") { _: StreamExecution =>
// it doesn't remove any file in src since it's the output dir of FileStreamSink
assertFileIsNotRemoved(src, "keep1")
// it doesn't remove any files for recent batch yet
assertFileIsNotRemoved(src, "ke ep2 %")
true
}
)
}
}
}
}

class FakeFileSystem(scheme: String) extends FileSystem {
override def exists(f: Path): Boolean = true

Expand Down Expand Up @@ -1797,6 +1852,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = {
assert(!sourceDir.list().exists(_.startsWith(fileName)))
}

private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit = {
assert(sourceDir.list().exists(_.startsWith(fileName)))
}

private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
assert(sourceDir.exists())
assert(sourceDir.list().exists(_.startsWith(filePrefix)))
Expand Down