From 644b8ab375a71bdddccc2833cefbd3136c965bc8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 20 Feb 2019 11:37:08 +0900 Subject: [PATCH 1/2] [SPARK-24295][SS] Add option to retain only last batch in file stream sink metadata --- .../structured-streaming-programming-guide.md | 11 +++ .../execution/datasources/DataSource.scala | 5 +- .../streaming/FileStreamOptions.scala | 6 ++ .../execution/streaming/FileStreamSink.scala | 4 +- .../streaming/FileStreamSource.scala | 5 +- .../ManifestFileCommitProtocol.scala | 11 ++- .../sql/streaming/FileStreamSinkSuite.scala | 69 +++++++++++++++++++ .../sql/streaming/FileStreamSourceSuite.scala | 50 ++++++++++++++ 8 files changed, 156 insertions(+), 5 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index d425bcc1256f..0940b7eccfbb 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -536,6 +536,11 @@ Here are the details of all the sources in Spark. href="api/R/read.stream.html">R). E.g. for "parquet" format options see DataStreamReader.parquet().

+ ignoreFileStreamSinkMetadata: whether to ignore metadata information being left from file stream sink, which leads to always use in-memory file index. (default: false) +
+ This option is useful when metadata grows too big and reading metadata is even slower than listing files from filesystem.
+ NOTE: This option must be set to "true" if file source is reading from output files which file stream sink is written, with setting "retainOnlyLastBatchInMetadata" option to "true". +

In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section. Yes @@ -1812,6 +1817,12 @@ Here are the details of all the sinks in Spark. (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet() +
+ retainOnlyLastBatchInMetadata: whether to retain metadata information only for last succeed batch. +

+ This option greatly reduces overhead on compacting metadata files which would be non-trivial when query processes lots of files in each batch.
+ NOTE: As it only retains the last batch in metadata, the metadata is not readable from file source: you must set "ignoreFileStreamSinkMetadata" option + to "true" when reading sink's output files from another query, regardless of batch and streaming source. Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 10dae8a55b47..239a265bef36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -344,8 +344,9 @@ case class DataSource( // We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. case (format: FileFormat, _) - if FileStreamSink.hasMetadata( - caseInsensitiveOptions.get("path").toSeq ++ paths, + if !caseInsensitiveOptions.getOrElse( + "ignoreFileStreamSinkMetadata", "false").toBoolean && + FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index 1d57cb084df9..95ae7caae6b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -74,6 +74,12 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging */ val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false) + /** + * Whether to ignore FileStreamSink metadata in source, which leads to use in-memory file index. + */ + val ignoreFileStreamSinkMetadata: Boolean = withBooleanParameter("ignoreFileStreamSinkMetadata", + default = false) + private def withBooleanParameter(name: String, default: Boolean) = { parameters.get(name).map { str => try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index b3d12f67b5d6..daba013fcda8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -97,6 +97,8 @@ class FileStreamSink( private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString) private val hadoopConf = sparkSession.sessionState.newHadoopConf() + private val retainOnlyLastBatchInMetadata: Boolean = + options.getOrElse("retainOnlyLastBatchInMetadata", "false").toBoolean private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = { val serializableHadoopConf = new SerializableConfiguration(hadoopConf) @@ -114,7 +116,7 @@ class FileStreamSink( committer match { case manifestCommitter: ManifestFileCommitProtocol => - manifestCommitter.setupManifestOptions(fileLog, batchId) + manifestCommitter.setupManifestOptions(fileLog, batchId, retainOnlyLastBatchInMetadata) case _ => // Do nothing } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 103fa7ce9066..d5ea6942b38e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -208,7 +208,10 @@ class FileStreamSource( var allFiles: Seq[FileStatus] = null sourceHasMetadata match { case None => - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + if (sourceOptions.ignoreFileStreamSinkMetadata) { + sourceHasMetadata = Some(false) + allFiles = allFilesUsingMetadataLogFileIndex() + } else if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { sourceHasMetadata = Some(true) allFiles = allFilesUsingMetadataLogFileIndex() } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 92191c8b64b7..2132dd368774 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -42,14 +42,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String) @transient private var fileLog: FileStreamSinkLog = _ private var batchId: Long = _ + private var retainOnlyLastBatch: Boolean = _ /** * Sets up the manifest log output and the batch id for this job. * Must be called before any other function. */ - def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = { + def setupManifestOptions( + fileLog: FileStreamSinkLog, + batchId: Long, + retainOnlyLastBatch: Boolean): Unit = { this.fileLog = fileLog this.batchId = batchId + this.retainOnlyLastBatch = retainOnlyLastBatch } override def setupJob(jobContext: JobContext): Unit = { @@ -63,6 +68,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String) if (fileLog.add(batchId, fileStatuses)) { logInfo(s"Committed batch $batchId") + if (retainOnlyLastBatch) { + // purge older than batchId, which always keep only one batch in file log + fileLog.purge(batchId) + } } else { throw new IllegalStateException(s"Race while writing batch $batchId") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index ed53def556cb..18f63fd3acda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -190,6 +190,75 @@ class FileStreamSinkSuite extends StreamTest { } } + test("SPARK-24295 retain only last batch for file log metadata") { + val inputData = MemoryStream[Long] + val inputDF = inputData.toDF.toDF("time") + val outputDf = inputDF + .selectExpr("CAST(time AS timestamp) AS timestamp") + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + try { + query = + outputDf.writeStream + .option("checkpointLocation", checkpointDir) + .option("retainOnlyLastBatchInMetadata", true) + .format("parquet") + .start(outputDir) + + def addTimestamp(timestampInSecs: Int*): Unit = { + inputData.addData(timestampInSecs.map(_ * 1L): _*) + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + + def check(expectedResult: Long*): Unit = { + val outputDf = spark.read + // This option must be provided when we enable 'retainOnlyLastBatchInFileLog' + // to purge metadata from FileStreamSink, otherwise query will fail while loading + // due to incomplete of metadata. + .option("ignoreFileStreamSinkMetadata", "true") + .parquet(outputDir) + .selectExpr("timestamp") + .sort("timestamp") + checkDataset(outputDf.as[Long], expectedResult: _*) + } + + val logPath = new Path(outputDir, FileStreamSink.metadataDir) + val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toUri.toString) + + addTimestamp(100) + check(100) + + // only new batch is retained, hence length should be 1 + assert(fileLog.get(None, None).length == 1) + assert(fileLog.get(None, None).head._1 === 0) + + addTimestamp(104, 123) + check(100, 104, 123) + + // only new batch is retained, hence length should be 1 + assert(fileLog.get(None, None).length === 1) + assert(fileLog.get(None, None).head._1 === 1) + + addTimestamp(140) + check(100, 104, 123, 140) + + // only new batch is retained, hence length should be 1 + assert(fileLog.get(None, None).length === 1) + assert(fileLog.get(None, None).head._1 === 2) + + } finally { + if (query != null) { + query.stop() + } + } + } + test("partitioned writing and batch reading with 'basePath'") { withTempDir { outputDir => withTempDir { checkpointDir => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9235c6d7c896..a6de2a7aade1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -908,6 +908,56 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("ignore metadata when reading data from outputs of another streaming query") { + withTempDirs { case (outputDir, checkpointDir) => + // q1 is a streaming query that reads from memory and writes to text files + val q1Source = MemoryStream[String] + val q1 = + q1Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("text") + .start(outputDir.getCanonicalPath) + + // q2 is a streaming query that reads q1's text outputs + // even q1 is supposed to store metadata in output location, we intend to ignore it + val q2 = + createFileStream("text", outputDir.getCanonicalPath, + options = Map("ignoreFileStreamSinkMetadata" -> "true")) + .filter($"value" contains "keep") + + def q1AddData(data: String*): StreamAction = + Execute { _ => + q1Source.addData(data) + q1.processAllAvailable() + } + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { + // create a text file that won't be on q1's sink log + // given we are ignoring sink metadata, the content should appear in q2's answer + val shouldNotKeep = new File(outputDir, "keep.txt") + stringToFile(shouldNotKeep, "keep") + shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + // here we should see "keep", whereas with metadata index, it should not appear + CheckAnswer("keep", "keep2", "keep3"), + + Execute { _ => q1.stop() } + ) + } + } + test("start before another streaming query, and read its output") { withTempDirs { case (outputDir, checkpointDir) => // q1 is a streaming query that reads from memory and writes to text files From fa88d9edb6c5a4393eaacd18a4efba975d16fcb4 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 20 Feb 2019 13:20:34 +0900 Subject: [PATCH 2/2] Fix silly bug --- .../execution/streaming/FileStreamSource.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index d5ea6942b38e..c43c09b6602e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -184,8 +184,14 @@ class FileStreamSource( * Some(true) means we know for sure the source DOES have metadata * Some(false) means we know for sure the source DOSE NOT have metadata */ - @volatile private[sql] var sourceHasMetadata: Option[Boolean] = - if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None + @volatile private[sql] var sourceHasMetadata: Option[Boolean] = { + if (sourceOptions.ignoreFileStreamSinkMetadata || + SparkHadoopUtil.get.isGlobPath(new Path(path))) { + Some(false) + } else { + None + } + } private def allFilesUsingInMemoryFileIndex() = { val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath) @@ -208,10 +214,7 @@ class FileStreamSource( var allFiles: Seq[FileStatus] = null sourceHasMetadata match { case None => - if (sourceOptions.ignoreFileStreamSinkMetadata) { - sourceHasMetadata = Some(false) - allFiles = allFilesUsingMetadataLogFileIndex() - } else if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { + if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { sourceHasMetadata = Some(true) allFiles = allFilesUsingMetadataLogFileIndex() } else {