diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3dfe4dc8bee5..f3abc4347324 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1568,6 +1568,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val FILESTREAM_SINK_METADATA_IGNORED = + buildConf("spark.sql.streaming.fileStreamSink.ignoreMetadata") + .internal() + .doc("If this is enabled, when Spark reads from the results of a streaming query written " + + "by `FileStreamSink`, Spark will ignore the metadata log and treat it as normal path to " + + "read, e.g. listing files using HDFS APIs.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + @@ -3390,6 +3400,8 @@ class SQLConf extends Serializable with Logging { def statefulOperatorCorrectnessCheckEnabled: Boolean = getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED) + def fileStreamSinkMetadataIgnored: Boolean = getConf(FILESTREAM_SINK_METADATA_IGNORED) + def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 3891c13a7530..ad850cf06dfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -360,7 +360,9 @@ case class DataSource( baseRelation // We are reading from the results of a streaming query. Load files from the metadata log - // instead of listing them using HDFS APIs. + // instead of listing them using HDFS APIs. Note that the config + // `spark.sql.streaming.fileStreamSink.metadata.ignored` can be enabled to ignore the + // metadata log. case (format: FileFormat, _) if FileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 876671c61d81..efc64c448cf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -40,6 +40,11 @@ object FileStreamSink extends Logging { * be read. */ def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: SQLConf): Boolean = { + // User explicitly configs to ignore sink metadata. + if (sqlConf.fileStreamSinkMetadataIgnored) { + return false + } + path match { case Seq(singlePath) => val hdfsPath = new Path(singlePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 53ef83206323..407d7835a6a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{AnalysisException, DataFrame} +import org.apache.spark.sql.catalyst.util.stringToFile import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable} @@ -600,6 +601,51 @@ abstract class FileStreamSinkSuite extends StreamTest { } } } + + test("SPARK-35565: Ignore metadata directory when reading sink output") { + Seq(true, false).foreach { ignoreMetadata => + withSQLConf(SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> ignoreMetadata.toString) { + val inputData = MemoryStream[String] + val df = inputData.toDF() + + withTempDir { outputDir => + withTempDir { checkpointDir => + var query: StreamingQuery = null + try { + query = + df.writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .format("text") + .start(outputDir.getCanonicalPath) + + inputData.addData("1", "2", "3") + inputData.addData("4", "5") + + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } finally { + if (query != null) { + query.stop() + } + } + + val additionalFile = new File(outputDir, "additional.txt") + stringToFile(additionalFile, "6") + additionalFile.exists() + + val outputDf = spark.read.format("text").load(outputDir.getCanonicalPath) + .selectExpr("CAST(value AS INT)").as[Int] + if (ignoreMetadata) { + checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5, 6) + } else { + checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5) + } + } + } + } + } + } } object PendingCommitFilesTrackingManifestFileCommitProtocol { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index ff00c474e2ef..a91856a35fcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -972,6 +972,75 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("SPARK-35565: read data from outputs of another streaming query but ignore its metadata") { + withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3", + SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> "true") { + withTempDirs { case (outputDir, checkpointDir1) => + // q0 is a streaming query that reads from memory and writes to text files + val q0Source = MemoryStream[String] + val q0 = + q0Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir1.getCanonicalPath) + .format("text") + .start(outputDir.getCanonicalPath) + + q0Source.addData("keep0") + q0.processAllAvailable() + q0.stop() + Utils.deleteRecursively(new File(outputDir.getCanonicalPath + "/" + + FileStreamSink.metadataDir)) + + withTempDir { checkpointDir2 => + // q1 is a streaming query that reads from memory and writes to text files too + val q1Source = MemoryStream[String] + val q1 = + q1Source + .toDF() + .writeStream + .option("checkpointLocation", checkpointDir2.getCanonicalPath) + .format("text") + .start(outputDir.getCanonicalPath) + + // q2 is a streaming query that reads both q0 and q1's text outputs + val q2 = + createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep") + + def q1AddData(data: String*): StreamAction = + Execute { _ => + q1Source.addData(data) + q1.processAllAvailable() + } + + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep0", "keep2"), + + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep0", "keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { + // compact interval is 3, so file "2.compact" should exist + new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep0", "keep2", "keep3", "keep4"), + + Execute { _ => q1.stop() } + ) + } + } + } + } + test("start before another streaming query, and read its output") { withTempDirs { case (outputDir, checkpointDir) => // q1 is a streaming query that reads from memory and writes to text files