Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ Here are the details of all the sources in Spark.
"s3a://a/b/c/dataset.txt"<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -389,20 +389,63 @@ object FileStreamSource {
s"on a different file system than the source files. source path: $sourcePath" +
s" / base archive path: $baseArchivePath")

/**
* FileStreamSource reads the files which one of below conditions is met:
* 1) file itself is matched with source path
* 2) parent directory is matched with source path
*
* Checking with glob pattern is costly, so set this requirement to eliminate the cases
* where the archive path can be matched with source path. For example, when file is moved
* to archive directory, destination path will retain input file's path as suffix, so
* destination path can't be matched with source path if archive directory's depth is longer
* than 2, as neither file nor parent directory of destination path can be matched with
* source path.
*/
require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
"subdirectories from root directory. e.g. '/data/archive'")
require(!isBaseArchivePathMatchedAgainstSourcePattern, "Base archive path cannot be set to" +
" the path where archived path can possibly match with source pattern. Ensure the base " +
"archive path doesn't match with source pattern in depth, where the depth is minimum of" +
" depth on both paths.")
}

private def getAncestorEnsuringDepth(path: Path, depth: Int): Path = {
var newPath = path
while (newPath.depth() > depth) {
newPath = newPath.getParent
}
newPath
}

private def isBaseArchivePathMatchedAgainstSourcePattern: Boolean = {
// We should disallow end users to set base archive path which path matches against source
// pattern to avoid checking each source file. There're couple of cases which allow
// FileStreamSource to read any depth of subdirectory under the source pattern, so we should
// consider all three cases 1) both has same depth 2) base archive path is longer than source
// pattern 3) source pattern is longer than base archive path. To handle all cases, we take
// min of depth for both paths, and check the match.

val minDepth = math.min(sourcePath.depth(), baseArchivePath.depth())

val sourcePathMinDepth = getAncestorEnsuringDepth(sourcePath, minDepth)
val baseArchivePathMinDepth = getAncestorEnsuringDepth(baseArchivePath, minDepth)

val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePathMinDepth)

var matched = true

// pathToCompare should have same depth as sourceGlobFilters.length
var pathToCompare = baseArchivePathMinDepth
var index = 0
do {
// GlobFilter only matches against its name, not full path so it's safe to compare
if (!sourceGlobFilters(index).accept(pathToCompare)) {
matched = false
} else {
pathToCompare = pathToCompare.getParent
index += 1
}
} while (matched && !pathToCompare.isRoot)

matched
}

private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
val filters = new scala.collection.mutable.MutableList[GlobFilter]()

var currentPath = sourcePath
while (!currentPath.isRoot) {
filters += new GlobFilter(currentPath.getName)
currentPath = currentPath.getParent
}

filters.toList
}

override def clean(entry: FileEntry): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1814,15 +1814,29 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError
}

test("SourceFileArchiver - base archive path depth <= 2") {
test("SourceFileArchiver - fail when base archive path matches source pattern") {
val fakeFileSystem = new FakeFileSystem("fake")

val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
val baseArchiveDirPath = new Path("/hello")

intercept[IllegalArgumentException] {
new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath)
def assertThrowIllegalArgumentException(sourcePatttern: Path, baseArchivePath: Path): Unit = {
intercept[IllegalArgumentException] {
new SourceFileArchiver(fakeFileSystem, sourcePatttern, fakeFileSystem, baseArchivePath)
}
}

// 1) prefix of base archive path matches source pattern (baseArchiveDirPath has more depths)
val sourcePatternPath = new Path("/hello*/spar?")
val baseArchiveDirPath = new Path("/hello/spark/structured/streaming")
assertThrowIllegalArgumentException(sourcePatternPath, baseArchiveDirPath)

// 2) prefix of source pattern matches base archive path (source pattern has more depths)
val sourcePatternPath2 = new Path("/hello*/spar?/structured/streaming")
val baseArchiveDirPath2 = new Path("/hello/spark/structured")
assertThrowIllegalArgumentException(sourcePatternPath2, baseArchiveDirPath2)

// 3) source pattern matches base archive path (both have same depth)
val sourcePatternPath3 = new Path("/hello*/spar?/structured/*")
val baseArchiveDirPath3 = new Path("/hello/spark/structured/streaming")
assertThrowIllegalArgumentException(sourcePatternPath3, baseArchiveDirPath3)
}

test("SourceFileArchiver - different filesystems between source and archive") {
Expand Down