From 92f9b7b0f20b12a80281c70d7b85d72d9fa657d8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 19 Mar 2019 09:07:06 +0900 Subject: [PATCH 1/2] [SPARK-27188][SS] Introduce retention of output entities for FileStreamSink --- .../structured-streaming-programming-guide.md | 5 +- .../execution/streaming/FileStreamSink.scala | 5 +- .../streaming/FileStreamSinkLog.scala | 29 +++- .../ManifestFileCommitProtocol.scala | 5 +- .../streaming/FileStreamSinkLogSuite.scala | 125 +++++++++++------- 5 files changed, 112 insertions(+), 57 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index deaf262c5f57..bdcbe8e14d50 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1830,7 +1830,10 @@ Here are the details of all the sinks in Spark. File Sink Append - path: path to the output directory, must be specified. + path: path to the output directory, must be specified.
+ outputRetentionMs: time to live (TTL) for output files. Output files which batches were + committed older than TTL will be eventually excluded in metadata log. This means reader queries which read + the sink's output directory may not process them.

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/ + log.action == FileStreamSinkLog.DELETE_ACTION || (curTime - log.commitTime) > ttlMs + }.map(_.path).toSet if (deletedFiles.isEmpty) { logs } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 916bd2ddbc81..e07baa8e7a1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -59,7 +59,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String) override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { require(fileLog != null, "setupManifestOptions must be called before this function") - val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray + val commitTimestamp = System.currentTimeMillis() + val fileStatuses = taskCommits.flatMap { taskCommit => + taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.copy(commitTime = commitTimestamp)) + }.toArray if (fileLog.add(batchId, fileStatuses)) { logInfo(s"Committed batch $batchId") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index f95daafdfe19..ba5164b4da44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.charset.StandardCharsets.UTF_8 +import org.apache.hadoop.fs.FileSystem + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -55,7 +57,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 1000L, blockReplication = 1, blockSize = 10000L, - action = FileStreamSinkLog.ADD_ACTION), + action = FileStreamSinkLog.ADD_ACTION, + commitTime = 1000L), SinkFileStatus( path = "/a/b/y", size = 200L, @@ -63,7 +66,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 2000L, blockReplication = 2, blockSize = 20000L, - action = FileStreamSinkLog.DELETE_ACTION), + action = FileStreamSinkLog.DELETE_ACTION, + commitTime = 2000L), SinkFileStatus( path = "/a/b/z", size = 300L, @@ -71,13 +75,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 3000L, blockReplication = 3, blockSize = 30000L, - action = FileStreamSinkLog.ADD_ACTION)) + action = FileStreamSinkLog.ADD_ACTION, + commitTime = 3000L)) // scalastyle:off val expected = s"""v$VERSION - |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} - |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} - |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin + |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000} + |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000} + |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin // scalastyle:on val baos = new ByteArrayOutputStream() sinkLog.serialize(logs, baos) @@ -92,9 +97,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { withFileStreamSinkLog { sinkLog => // scalastyle:off val logs = s"""v$VERSION - |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"} - |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"} - |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin + |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000} + |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000} + |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin // scalastyle:on val expected = Seq( @@ -105,7 +110,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 1000L, blockReplication = 1, blockSize = 10000L, - action = FileStreamSinkLog.ADD_ACTION), + action = FileStreamSinkLog.ADD_ACTION, + commitTime = 1000L), SinkFileStatus( path = "/a/b/y", size = 200L, @@ -113,7 +119,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 2000L, blockReplication = 2, blockSize = 20000L, - action = FileStreamSinkLog.DELETE_ACTION), + action = FileStreamSinkLog.DELETE_ACTION, + commitTime = 2000L), SinkFileStatus( path = "/a/b/z", size = 300L, @@ -121,7 +128,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 3000L, blockReplication = 3, blockSize = 30000L, - action = FileStreamSinkLog.ADD_ACTION)) + action = FileStreamSinkLog.ADD_ACTION, + commitTime = 3000L)) assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8)))) @@ -149,6 +157,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } + private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = { + fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => + try { + getBatchIdFromFileName(fileName) + true + } catch { + case _: NumberFormatException => false + } + }.toSet + } + test("delete expired file") { // Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour // deterministically and one min batches to retain @@ -158,18 +177,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") { withFileStreamSinkLog { sinkLog => val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) - - def listBatchFiles(): Set[String] = { - fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => - try { - getBatchIdFromFileName(fileName) - true - } catch { - case _: NumberFormatException => false - } - }.toSet - } - + def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog) sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) assert(Set("0") === listBatchFiles()) sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) @@ -193,18 +201,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") { withFileStreamSinkLog { sinkLog => val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf()) - - def listBatchFiles(): Set[String] = { - fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName => - try { - getBatchIdFromFileName(fileName) - true - } catch { - case _: NumberFormatException => false - } - }.toSet - } - + def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog) sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION))) assert(Set("0") === listBatchFiles()) sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION))) @@ -225,18 +222,34 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } } + test("filter out outdated entries when compacting") { + val curTime = System.currentTimeMillis() + withFileStreamSinkLog(Some(60000), sinkLog => { + val logs = Seq( + newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime), + newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime), + newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime)) + assert(logs === sinkLog.compactLogs(logs)) + + val logs2 = Seq( + newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000), + newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000)) + assert(logs === sinkLog.compactLogs(logs ++ logs2)) + }) + } + test("read Spark 2.1.0 log format") { assert(readFromResource("file-sink-log-version-2.1.0") === Seq( // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted - SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION), - SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION) + SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, 100), + SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, 200), + SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, 300), + SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, 400), + SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, 500), + SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, 600), + SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, 700), + SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, 800), + SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, 900) )) } @@ -244,7 +257,16 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields * in SinkFileStatus. */ - private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = { + private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = + newFakeSinkFileStatus(path, action, 100L) + + /** + * Create a fake SinkFileStatus using path and action, and commit time. + */ + private def newFakeSinkFileStatus( + path: String, + action: String, + commitTime: Long): SinkFileStatus = { SinkFileStatus( path = path, size = 100L, @@ -252,12 +274,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { modificationTime = 100L, blockReplication = 1, blockSize = 100L, - action = action) + action = action, + commitTime = commitTime) } - private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = { + private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = + withFileStreamSinkLog(None, f) + + private def withFileStreamSinkLog(ttl: Option[Long], f: FileStreamSinkLog => Unit): Unit = { withTempDir { file => - val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath) + val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath, + ttl) f(sinkLog) } } From 2a3bad5819d49bb2e54b6be1334c117a4bb6f962 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 19 Mar 2019 14:16:42 +0900 Subject: [PATCH 2/2] Let 'commitTime' be independent with modificationTime --- .../streaming/FileStreamSinkLog.scala | 21 +++++++++---------- .../streaming/FileStreamSinkLogSuite.scala | 21 ++++++++++--------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index d5d2e202b315..ab381e38b15f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -57,8 +57,8 @@ case class SinkFileStatus( blockReplication: Int, blockSize: Long, action: String) { - // use modification time if we don't know about exact commit time - this(path, size, isDir, modificationTime, blockReplication, blockSize, action, modificationTime) + // use Long.MaxValue if we don't know about exact commit time, which means they will not evicted + this(path, size, isDir, modificationTime, blockReplication, blockSize, action, Long.MaxValue) } def toFileStatus: FileStatus = { @@ -69,15 +69,14 @@ case class SinkFileStatus( object SinkFileStatus { def apply(f: FileStatus): SinkFileStatus = { - SinkFileStatus( - path = f.getPath.toUri.toString, - size = f.getLen, - isDir = f.isDirectory, - modificationTime = f.getModificationTime, - blockReplication = f.getReplication, - blockSize = f.getBlockSize, - action = FileStreamSinkLog.ADD_ACTION, - commitTime = f.getModificationTime) + new SinkFileStatus( + f.getPath.toUri.toString, + f.getLen, + f.isDirectory, + f.getModificationTime, + f.getReplication, + f.getBlockSize, + FileStreamSinkLog.ADD_ACTION) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala index ba5164b4da44..a2200b54a2f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala @@ -239,17 +239,18 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { } test("read Spark 2.1.0 log format") { + val maxLong = Long.MaxValue assert(readFromResource("file-sink-log-version-2.1.0") === Seq( // SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted - SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, 100), - SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, 200), - SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, 300), - SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, 400), - SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, 500), - SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, 600), - SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, 700), - SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, 800), - SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, 900) + SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong), + SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, maxLong) )) } @@ -258,7 +259,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession { * in SinkFileStatus. */ private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = - newFakeSinkFileStatus(path, action, 100L) + newFakeSinkFileStatus(path, action, Long.MaxValue) /** * Create a fake SinkFileStatus using path and action, and commit time.