diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 2a405f36fd5f..d399b37ab210 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
+ cleanSource: option to clean up completed files after processing.
+ Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
+ When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here. This will ensure archived files are never included as new source files.
+ Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
+ NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
+ NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.
+ NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
For file-format-specific options, see the related methods in DataStreamReader
(Scala/Java/Python/
try {
@@ -86,3 +112,14 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}.getOrElse(default)
}
}
+
+object CleanSourceMode extends Enumeration {
+ val ARCHIVE, DELETE, OFF = Value
+
+ def fromString(value: Option[String]): CleanSourceMode.Value = value.map { v =>
+ CleanSourceMode.values.find(_.toString == v.toUpperCase(Locale.ROOT))
+ .getOrElse(throw new IllegalArgumentException(
+ s"Invalid mode for clean source option $value." +
+ s" Must be one of ${CleanSourceMode.values.mkString(",")}"))
+ }.getOrElse(OFF)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 92eef6af2238..35d486c7c743 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.streaming
import java.net.URI
import java.util.concurrent.TimeUnit._
-import org.apache.hadoop.fs.{FileStatus, Path}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
@@ -53,6 +56,9 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contain glob patterns
}
+ private val sourceCleaner: Option[FileStreamSourceCleaner] = FileStreamSourceCleaner(
+ fs, qualifiedBasePath, sourceOptions, hadoopConf)
+
private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
Map("basePath" -> path)
@@ -258,8 +264,14 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
- // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
- // and the value of the maxFileAge parameter.
+ val logOffset = FileStreamSourceOffset(end).logOffset
+
+ sourceCleaner.foreach { cleaner =>
+ val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
+ val validFileEntities = files.filter(_.batchId == logOffset)
+ logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
+ validFileEntities.foreach(cleaner.clean)
+ }
}
override def stop(): Unit = {}
@@ -267,7 +279,6 @@ class FileStreamSource(
object FileStreamSource {
-
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long
@@ -330,4 +341,96 @@ object FileStreamSource {
def size: Int = map.size()
}
+
+ private[sql] trait FileStreamSourceCleaner {
+ def clean(entry: FileEntry): Unit
+ }
+
+ private[sql] object FileStreamSourceCleaner {
+ def apply(
+ fileSystem: FileSystem,
+ sourcePath: Path,
+ option: FileStreamOptions,
+ hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match {
+ case CleanSourceMode.ARCHIVE =>
+ require(option.sourceArchiveDir.isDefined)
+ val path = new Path(option.sourceArchiveDir.get)
+ val archiveFs = path.getFileSystem(hadoopConf)
+ val qualifiedArchivePath = archiveFs.makeQualified(path)
+ Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath))
+
+ case CleanSourceMode.DELETE =>
+ Some(new SourceFileRemover(fileSystem))
+
+ case _ => None
+ }
+ }
+
+ private[sql] class SourceFileArchiver(
+ fileSystem: FileSystem,
+ sourcePath: Path,
+ baseArchiveFileSystem: FileSystem,
+ baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
+ assertParameters()
+
+ private def assertParameters(): Unit = {
+ require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " +
+ s"on a different file system than the source files. source path: $sourcePath" +
+ s" / base archive path: $baseArchivePath")
+
+ /**
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so set this requirement to eliminate the cases
+ * where the archive path can be matched with source path. For example, when file is moved
+ * to archive directory, destination path will retain input file's path as suffix, so
+ * destination path can't be matched with source path if archive directory's depth is longer
+ * than 2, as neither file nor parent directory of destination path can be matched with
+ * source path.
+ */
+ require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
+ "subdirectories from root directory. e.g. '/data/archive'")
+ }
+
+ override def clean(entry: FileEntry): Unit = {
+ val curPath = new Path(new URI(entry.path))
+ val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath)
+
+ try {
+ logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
+ if (!fileSystem.exists(newPath.getParent)) {
+ fileSystem.mkdirs(newPath.getParent)
+ }
+
+ logDebug(s"Archiving completed file $curPath to $newPath")
+ if (!fileSystem.rename(curPath, newPath)) {
+ logWarning(s"Fail to move $curPath to $newPath / skip moving file.")
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
+ }
+ }
+ }
+
+ private[sql] class SourceFileRemover(fileSystem: FileSystem)
+ extends FileStreamSourceCleaner with Logging {
+
+ override def clean(entry: FileEntry): Unit = {
+ val curPath = new Path(new URI(entry.path))
+ try {
+ logDebug(s"Removing completed file $curPath")
+
+ if (!fileSystem.delete(curPath, false)) {
+ logWarning(s"Failed to remove $curPath / skip removing file.")
+ }
+ } catch {
+ case NonFatal(e) =>
+ // Log to error but swallow exception to avoid process being stopped
+ logWarning(s"Fail to remove $curPath / skip removing file.", e)
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 75a0ae7cfe06..7291fa6dfe02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -20,23 +20,26 @@ package org.apache.spark.sql.streaming
import java.io.File
import java.net.URI
+import scala.collection.mutable
import scala.util.Random
-import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.util.Progressable
import org.scalatest.PrivateMethodTester
-import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap}
+import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver}
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{StructType, _}
import org.apache.spark.util.Utils
abstract class FileStreamSourceTest
@@ -177,7 +180,6 @@ abstract class FileStreamSourceTest
}
}
-
protected def withTempDirs(body: (File, File) => Unit): Unit = {
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
@@ -189,6 +191,19 @@ abstract class FileStreamSourceTest
}
}
+ protected def withThreeTempDirs(body: (File, File, File) => Unit): Unit = {
+ val src = Utils.createTempDir(namePrefix = "streaming.src")
+ val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
+ val archive = Utils.createTempDir(namePrefix = "streaming.archive")
+ try {
+ body(src, tmp, archive)
+ } finally {
+ Utils.deleteRecursively(src)
+ Utils.deleteRecursively(tmp)
+ Utils.deleteRecursively(archive)
+ }
+ }
+
val valueSchema = new StructType().add("value", StringType)
}
@@ -1386,9 +1401,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
latestFirst: Boolean,
firstBatch: String,
secondBatch: String,
- maxFileAge: Option[String] = None): Unit = {
+ maxFileAge: Option[String] = None,
+ cleanSource: CleanSourceMode.Value = CleanSourceMode.OFF,
+ archiveDir: Option[String] = None): Unit = {
val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++
- maxFileAge.map("maxFileAge" -> _)
+ maxFileAge.map("maxFileAge" -> _) ++
+ Seq("cleanSource" -> cleanSource.toString) ++
+ archiveDir.map("sourceArchiveDir" -> _)
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
@@ -1547,7 +1566,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
val actions = Seq(
AddTextFileData(source1Content, sourceDir1, tmp),
AddTextFileData(source2Content, sourceDir2, tmp)
- ).filter(_.content != null) // don't write to a source dir if no content specified
+ ).filter(_.content != null) // don't write to a source dir if no content specified
StreamProgressLockedActions(actions, desc = actions.mkString("[ ", " | ", " ]"))
}
@@ -1596,6 +1615,204 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}
+
+ test("remove completed files when remove option is enabled") {
+ def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
+ assert(!files.exists(_.startsWith(fileName)))
+ }
+
+ def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = {
+ assert(files.exists(_.startsWith(fileName)))
+ }
+
+ withTempDirs { case (src, tmp) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+ "cleanSource" -> "delete")
+
+ val fileStream = createFileStream("text", src.getCanonicalPath, options = option)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ testStream(filtered)(
+ AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
+ CheckAnswer("keep1"),
+ AssertOnQuery("input file removed") { _: StreamExecution =>
+ // it doesn't rename any file yet
+ assertFileIsNotRemoved(src.list(), "keep1")
+ true
+ },
+ AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
+ CheckAnswer("keep1", "keep2"),
+ AssertOnQuery("input file removed") { _: StreamExecution =>
+ val files = src.list()
+
+ // it renames input file for first batch, but not for second batch yet
+ assertFileIsRemoved(files, "keep1")
+ assertFileIsNotRemoved(files, "ke ep2 %")
+
+ true
+ },
+ AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
+ CheckAnswer("keep1", "keep2", "keep3"),
+ AssertOnQuery("input file renamed") { _: StreamExecution =>
+ val files = src.list()
+
+ // it renames input file for second batch, but not third batch yet
+ assertFileIsRemoved(files, "ke ep2 %")
+ assertFileIsNotRemoved(files, "keep3")
+
+ true
+ }
+ )
+ }
+ }
+ }
+
+ test("move completed files to archive directory when archive option is enabled") {
+ withThreeTempDirs { case (src, tmp, archiveDir) =>
+ withSQLConf(
+ SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
+ // Force deleting the old logs
+ SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
+ ) {
+ val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
+ "cleanSource" -> "archive", "sourceArchiveDir" -> archiveDir.getAbsolutePath)
+
+ val fileStream = createFileStream("text", s"${src.getCanonicalPath}/*/*",
+ options = option)
+ val filtered = fileStream.filter($"value" contains "keep")
+
+ // src/k %1
+ // file: src/k %1/keep1
+ val dirForKeep1 = new File(src, "k %1")
+ // src/k %1/k 2
+ // file: src/k %1/k 2/keep2
+ val dirForKeep2 = new File(dirForKeep1, "k 2")
+ // src/k3
+ // file: src/k3/keep3
+ val dirForKeep3 = new File(src, "k3")
+
+ val expectedMovedDir1 = new File(archiveDir.getAbsolutePath + dirForKeep1.toURI.getPath)
+ val expectedMovedDir2 = new File(archiveDir.getAbsolutePath + dirForKeep2.toURI.getPath)
+ val expectedMovedDir3 = new File(archiveDir.getAbsolutePath + dirForKeep3.toURI.getPath)
+
+ testStream(filtered)(
+ AddTextFileData("keep1", dirForKeep1, tmp, tmpFilePrefix = "keep1"),
+ CheckAnswer("keep1"),
+ AssertOnQuery("input file archived") { _: StreamExecution =>
+ // it doesn't rename any file yet
+ assertFileIsNotMoved(dirForKeep1, expectedMovedDir1, "keep1")
+ true
+ },
+ AddTextFileData("keep2", dirForKeep2, tmp, tmpFilePrefix = "keep2 %"),
+ CheckAnswer("keep1", "keep2"),
+ AssertOnQuery("input file archived") { _: StreamExecution =>
+ // it renames input file for first batch, but not for second batch yet
+ assertFileIsMoved(dirForKeep1, expectedMovedDir1, "keep1")
+ assertFileIsNotMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+ true
+ },
+ AddTextFileData("keep3", dirForKeep3, tmp, tmpFilePrefix = "keep3"),
+ CheckAnswer("keep1", "keep2", "keep3"),
+ AssertOnQuery("input file archived") { _: StreamExecution =>
+ // it renames input file for second batch, but not third batch yet
+ assertFileIsMoved(dirForKeep2, expectedMovedDir2, "keep2 %")
+ assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep3")
+
+ true
+ },
+ AddTextFileData("keep4", dirForKeep3, tmp, tmpFilePrefix = "keep4"),
+ CheckAnswer("keep1", "keep2", "keep3", "keep4"),
+ AssertOnQuery("input file archived") { _: StreamExecution =>
+ // it renames input file for third batch, but not fourth batch yet
+ assertFileIsMoved(dirForKeep3, expectedMovedDir3, "keep3")
+ assertFileIsNotMoved(dirForKeep3, expectedMovedDir3, "keep4")
+
+ true
+ }
+ )
+ }
+ }
+ }
+
+ class FakeFileSystem(scheme: String) extends FileSystem {
+ override def exists(f: Path): Boolean = true
+
+ override def mkdirs(f: Path, permission: FsPermission): Boolean = true
+
+ override def rename(src: Path, dst: Path): Boolean = true
+
+ override def getUri: URI = URI.create(s"${scheme}:///")
+
+ override def open(f: Path, bufferSize: Int): FSDataInputStream = throw new NotImplementedError
+
+ override def create(
+ f: Path,
+ permission: FsPermission,
+ overwrite: Boolean,
+ bufferSize: Int,
+ replication: Short,
+ blockSize: Long,
+ progress: Progressable): FSDataOutputStream = throw new NotImplementedError
+
+ override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream =
+ throw new NotImplementedError
+
+ override def delete(f: Path, recursive: Boolean): Boolean = throw new NotImplementedError
+
+ override def listStatus(f: Path): Array[FileStatus] = throw new NotImplementedError
+
+ override def setWorkingDirectory(new_dir: Path): Unit = throw new NotImplementedError
+
+ override def getWorkingDirectory: Path = new Path("/somewhere")
+
+ override def getFileStatus(f: Path): FileStatus = throw new NotImplementedError
+ }
+
+ test("SourceFileArchiver - base archive path depth <= 2") {
+ val fakeFileSystem = new FakeFileSystem("fake")
+
+ val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
+ val baseArchiveDirPath = new Path("/hello")
+
+ intercept[IllegalArgumentException] {
+ new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem, baseArchiveDirPath)
+ }
+ }
+
+ test("SourceFileArchiver - different filesystems between source and archive") {
+ val fakeFileSystem = new FakeFileSystem("fake")
+ val fakeFileSystem2 = new FakeFileSystem("fake2")
+
+ val sourcePatternPath = new Path("/hello*/h{e,f}ll?")
+ val baseArchiveDirPath = new Path("/hello")
+
+ intercept[IllegalArgumentException] {
+ new SourceFileArchiver(fakeFileSystem, sourcePatternPath, fakeFileSystem2,
+ baseArchiveDirPath)
+ }
+ }
+
+ private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
+ assert(sourceDir.exists())
+ assert(sourceDir.list().exists(_.startsWith(filePrefix)))
+ if (!expectedDir.exists()) {
+ // OK
+ } else {
+ assert(!expectedDir.list().exists(_.startsWith(filePrefix)))
+ }
+ }
+
+ private def assertFileIsMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
+ assert(sourceDir.exists())
+ assert(!sourceDir.list().exists(_.startsWith(filePrefix)))
+ assert(expectedDir.exists())
+ assert(expectedDir.list().exists(_.startsWith(filePrefix)))
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {