Skip to content
Closed
7 changes: 7 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark.
"s3://a/dataset.txt"<br/>
"s3n://a/b/dataset.txt"<br/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd drop s3n & s3 refs as they have gone from deprecated to deceased

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like beyond of this PR: we can address it in separate PR. Could you raise another one?

"s3a://a/b/c/dataset.txt"<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/>
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.util.Locale

import scala.util.Try

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -74,6 +76,30 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

/**
* The archive directory to move completed files. The option will be only effective when
* "cleanSource" is set to "archive".
*
* Note that the completed file will be moved to this archive directory with respecting to
* its own path.
*
* For example, if the path of source file is "/a/b/dataset.txt", and the path of archive
* directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt".
*/
val sourceArchiveDir: Option[String] = parameters.get("sourceArchiveDir")

/**
* Defines how to clean up completed files. Available options are "archive", "delete", "off".
*/
val cleanSource: CleanSourceMode.Value = {
val matchedMode = CleanSourceMode.fromString(parameters.get("cleanSource"))
if (matchedMode == CleanSourceMode.ARCHIVE && sourceArchiveDir.isEmpty) {
throw new IllegalArgumentException("Archive mode must be used with 'sourceArchiveDir' " +
"option.")
}
matchedMode
}

private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
Expand All @@ -86,3 +112,14 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}.getOrElse(default)
}
}

object CleanSourceMode extends Enumeration {
val ARCHIVE, DELETE, OFF = Value

def fromString(value: Option[String]): CleanSourceMode.Value = value.map { v =>
CleanSourceMode.values.find(_.toString == v.toUpperCase(Locale.ROOT))
.getOrElse(throw new IllegalArgumentException(
s"Invalid mode for clean source option $value." +
s" Must be one of ${CleanSourceMode.values.mkString(",")}"))
}.getOrElse(OFF)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package org.apache.spark.sql.execution.streaming
import java.net.URI
import java.util.concurrent.TimeUnit._

import org.apache.hadoop.fs.{FileStatus, Path}
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -53,6 +56,9 @@ class FileStreamSource(
fs.makeQualified(new Path(path)) // can contain glob patterns
}

private val sourceCleaner: Option[FileStreamSourceCleaner] = FileStreamSourceCleaner(
fs, qualifiedBasePath, sourceOptions, hadoopConf)

private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
Map("basePath" -> path)
Expand Down Expand Up @@ -258,16 +264,21 @@ class FileStreamSource(
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
// and the value of the maxFileAge parameter.
val logOffset = FileStreamSourceOffset(end).logOffset

sourceCleaner.foreach { cleaner =>
val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2)
val validFileEntities = files.filter(_.batchId == logOffset)
logDebug(s"completed file entries: ${validFileEntities.mkString(",")}")
validFileEntities.foreach(cleaner.clean)
}
}

override def stop(): Unit = {}
}


object FileStreamSource {

/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

Expand Down Expand Up @@ -330,4 +341,96 @@ object FileStreamSource {

def size: Int = map.size()
}

private[sql] trait FileStreamSourceCleaner {
def clean(entry: FileEntry): Unit
}

private[sql] object FileStreamSourceCleaner {
def apply(
fileSystem: FileSystem,
sourcePath: Path,
option: FileStreamOptions,
hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match {
case CleanSourceMode.ARCHIVE =>
require(option.sourceArchiveDir.isDefined)
val path = new Path(option.sourceArchiveDir.get)
val archiveFs = path.getFileSystem(hadoopConf)
val qualifiedArchivePath = archiveFs.makeQualified(path)
Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath))

case CleanSourceMode.DELETE =>
Some(new SourceFileRemover(fileSystem))

case _ => None
}
}

private[sql] class SourceFileArchiver(
fileSystem: FileSystem,
sourcePath: Path,
baseArchiveFileSystem: FileSystem,
baseArchivePath: Path) extends FileStreamSourceCleaner with Logging {
assertParameters()

private def assertParameters(): Unit = {
require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " +
s"on a different file system than the source files. source path: $sourcePath" +
s" / base archive path: $baseArchivePath")

/**
* FileStreamSource reads the files which one of below conditions is met:
* 1) file itself is matched with source path
* 2) parent directory is matched with source path
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR Could you clarify this? I think there are some cases we still read files but they don't met these conditions:

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing
Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all.

For recursiveFileLookup, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year.

Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this:

  1. No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.)
  2. Disallow any path to be used as base archive path if the path matches the source path (glob) - here "disallow" means fail the query. After then we don't need to check the pattern. If end users provide complicated glob path as source path, they also may be puzzled how to not match, but not sure they would really want to set the path be complicated in production.
  3. Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. It doesn't fail the query so end users need to check whether the files are not cleaned up due to the pattern check.

Which one (or couple of) would be the preferred approach?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Dec 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised a patch with picking the option 2. #26920

*
* Checking with glob pattern is costly, so set this requirement to eliminate the cases
* where the archive path can be matched with source path. For example, when file is moved
* to archive directory, destination path will retain input file's path as suffix, so
* destination path can't be matched with source path if archive directory's depth is longer
* than 2, as neither file nor parent directory of destination path can be matched with
* source path.
*/
require(baseArchivePath.depth() > 2, "Base archive path must have at least 2 " +
"subdirectories from root directory. e.g. '/data/archive'")
}

override def clean(entry: FileEntry): Unit = {
val curPath = new Path(new URI(entry.path))
val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath)

try {
logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}")
if (!fileSystem.exists(newPath.getParent)) {
fileSystem.mkdirs(newPath.getParent)
}

logDebug(s"Archiving completed file $curPath to $newPath")
if (!fileSystem.rename(curPath, newPath)) {
logWarning(s"Fail to move $curPath to $newPath / skip moving file.")
}
} catch {
case NonFatal(e) =>
logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e)
}
}
}

private[sql] class SourceFileRemover(fileSystem: FileSystem)
extends FileStreamSourceCleaner with Logging {

override def clean(entry: FileEntry): Unit = {
val curPath = new Path(new URI(entry.path))
try {
logDebug(s"Removing completed file $curPath")

if (!fileSystem.delete(curPath, false)) {
logWarning(s"Failed to remove $curPath / skip removing file.")
}
} catch {
case NonFatal(e) =>
// Log to error but swallow exception to avoid process being stopped
logWarning(s"Fail to remove $curPath / skip removing file.", e)
}
}
}
}
Loading