-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20568][SS] Provide option to clean up completed files in streaming query #22952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
30b82df
1697c86
2f5a73f
6e2a824
33a5331
b1a6bec
b67778a
dd9d4ad
178d2f4
21c71c4
01f5750
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -546,6 +546,13 @@ Here are the details of all the sources in Spark. | |
| "s3://a/dataset.txt"<br/> | ||
| "s3n://a/b/dataset.txt"<br/> | ||
| "s3a://a/b/c/dataset.txt"<br/> | ||
| <code>cleanSource</code>: option to clean up completed files after processing.<br/> | ||
| Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/> | ||
| When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. /archived/here This will ensure archived files are never included as new source files.<br/> | ||
|
||
| Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt"<br/> | ||
|
||
| NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/> | ||
| NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/> | ||
| NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. | ||
| <br/><br/> | ||
| For file-format-specific options, see the related methods in <code>DataStreamReader</code> | ||
| (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,9 @@ package org.apache.spark.sql.execution.streaming | |
| import java.net.URI | ||
| import java.util.concurrent.TimeUnit._ | ||
|
|
||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} | ||
|
|
||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -53,6 +55,18 @@ class FileStreamSource( | |
| fs.makeQualified(new Path(path)) // can contain glob patterns | ||
| } | ||
|
|
||
| private val sourceCleaner: FileStreamSourceCleaner = { | ||
|
||
| val (archiveFs, qualifiedArchivePath) = sourceOptions.sourceArchiveDir match { | ||
| case Some(dir) => | ||
| val path = new Path(dir) | ||
| val fs = path.getFileSystem(hadoopConf) | ||
| (Some(fs), Some(fs.makeQualified(path))) | ||
|
|
||
| case None => (None, None) | ||
| } | ||
| new FileStreamSourceCleaner(fs, qualifiedBasePath, archiveFs, qualifiedArchivePath) | ||
| } | ||
|
|
||
| private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ { | ||
| if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { | ||
| Map("basePath" -> path) | ||
|
|
@@ -237,6 +251,7 @@ class FileStreamSource( | |
| val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => | ||
| (status.getPath.toUri.toString, status.getModificationTime) | ||
| } | ||
|
|
||
| val endTime = System.nanoTime | ||
| val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) | ||
| if (listingTimeMs > 2000) { | ||
|
|
@@ -258,16 +273,33 @@ class FileStreamSource( | |
| * equal to `end` and will only request offsets greater than `end` in the future. | ||
| */ | ||
| override def commit(end: Offset): Unit = { | ||
| // No-op for now; FileStreamSource currently garbage-collects files based on timestamp | ||
| // and the value of the maxFileAge parameter. | ||
| val logOffset = FileStreamSourceOffset(end).logOffset | ||
|
|
||
| if (sourceOptions.cleanSource != CleanSourceMode.OFF) { | ||
| val files = metadataLog.get(Some(logOffset), Some(logOffset)).flatMap(_._2) | ||
| val validFileEntities = files.filter(_.batchId == logOffset) | ||
| logDebug(s"completed file entries: ${validFileEntities.mkString(",")}") | ||
| sourceOptions.cleanSource match { | ||
| case CleanSourceMode.ARCHIVE => | ||
| validFileEntities.foreach(sourceCleaner.archive) | ||
|
|
||
| case CleanSourceMode.DELETE => | ||
|
||
| validFileEntities.foreach(sourceCleaner.delete) | ||
|
|
||
| case _ => | ||
| } | ||
| } else { | ||
| // No-op for now; FileStreamSource currently garbage-collects files based on timestamp | ||
| // and the value of the maxFileAge parameter. | ||
| } | ||
|
|
||
|
||
| } | ||
|
|
||
| override def stop(): Unit = {} | ||
| } | ||
|
|
||
|
|
||
| object FileStreamSource { | ||
|
|
||
| /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */ | ||
| type Timestamp = Long | ||
|
|
||
|
|
@@ -330,4 +362,77 @@ object FileStreamSource { | |
|
|
||
| def size: Int = map.size() | ||
| } | ||
|
|
||
| private[sql] class FileStreamSourceCleaner( | ||
| fileSystem: FileSystem, | ||
| sourcePath: Path, | ||
| baseArchiveFileSystem: Option[FileSystem], | ||
| baseArchivePath: Option[Path]) extends Logging { | ||
| assertParameters() | ||
|
|
||
| private def assertParameters(): Unit = { | ||
| require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) | ||
|
|
||
| baseArchiveFileSystem.foreach { fs => | ||
| require(fileSystem.getUri == fs.getUri, "Base archive path is located to the different" + | ||
|
||
| s" filesystem with source, which is not supported. source path: $sourcePath" + | ||
| s" / base archive path: ${baseArchivePath.get}") | ||
| } | ||
|
|
||
| baseArchivePath.foreach { path => | ||
|
|
||
|
||
| /** | ||
| * FileStreamSource reads the files which one of below conditions is met: | ||
| * 1) file itself is matched with source path | ||
| * 2) parent directory is matched with source path | ||
| * | ||
| * Checking with glob pattern is costly, so set this requirement to eliminate the cases | ||
| * where the archive path can be matched with source path. For example, when file is moved | ||
| * to archive directory, destination path will retain input file's path as suffix, so | ||
| * destination path can't be matched with source path if archive directory's depth is longer | ||
| * than 2, as neither file nor parent directory of destination path can be matched with | ||
| * source path. | ||
| */ | ||
| require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + | ||
|
||
| "subdirectories. e.g. '/data/archive'") | ||
| } | ||
| } | ||
|
|
||
| def archive(entry: FileEntry): Unit = { | ||
| require(baseArchivePath.isDefined) | ||
|
|
||
| val curPath = new Path(new URI(entry.path)) | ||
| val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) | ||
|
||
|
|
||
| try { | ||
| logDebug(s"Creating directory if it doesn't exist ${newPath.getParent}") | ||
| if (!fileSystem.exists(newPath.getParent)) { | ||
| fileSystem.mkdirs(newPath.getParent) | ||
| } | ||
|
|
||
| logDebug(s"Archiving completed file $curPath to $newPath") | ||
| if (!fileSystem.rename(curPath, newPath)) { | ||
| logWarning(s"Fail to move $curPath to $newPath / skip moving file.") | ||
| } | ||
| } catch { | ||
| case NonFatal(e) => | ||
| logWarning(s"Fail to move $curPath to $newPath / skip moving file.", e) | ||
| } | ||
| } | ||
|
|
||
| def delete(entry: FileEntry): Unit = { | ||
| val curPath = new Path(new URI(entry.path)) | ||
| try { | ||
| logDebug(s"Removing completed file $curPath") | ||
|
|
||
| if (!fileSystem.delete(curPath, false)) { | ||
| logWarning(s"Failed to remove $curPath / skip removing file.") | ||
| } | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // Log to error but swallow exception to avoid process being stopped | ||
| logWarning(s"Fail to remove $curPath / skip removing file.", e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd drop s3n & s3 refs as they have gone from deprecated to deceased
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like beyond of this PR: we can address it in separate PR. Could you raise another one?