Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val FILESTREAM_SINK_METADATA_IGNORED =
buildConf("spark.sql.streaming.fileStreamSink.ignoreMetadata")
.internal()
.doc("If this is enabled, when Spark reads from the results of a streaming query written " +
"by `FileStreamSink`, Spark will ignore the metadata log and treat it as normal path to " +
"read, e.g. listing files using HDFS APIs.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down Expand Up @@ -3390,6 +3400,8 @@ class SQLConf extends Serializable with Logging {
def statefulOperatorCorrectnessCheckEnabled: Boolean =
getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED)

def fileStreamSinkMetadataIgnored: Boolean = getConf(FILESTREAM_SINK_METADATA_IGNORED)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ case class DataSource(
baseRelation

// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
// instead of listing them using HDFS APIs. Note that the config
// `spark.sql.streaming.fileStreamSink.metadata.ignored` can be enabled to ignore the
// metadata log.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ object FileStreamSink extends Logging {
* be read.
*/
def hasMetadata(path: Seq[String], hadoopConf: Configuration, sqlConf: SQLConf): Boolean = {
// User explicitly configs to ignore sink metadata.
if (sqlConf.fileStreamSinkMetadataIgnored) {
return false
}

path match {
case Seq(singlePath) =>
val hdfsPath = new Path(singlePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
Expand Down Expand Up @@ -600,6 +601,51 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
}

test("SPARK-35565: Ignore metadata directory when reading sink output") {
Seq(true, false).foreach { ignoreMetadata =>
withSQLConf(SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> ignoreMetadata.toString) {
val inputData = MemoryStream[String]
val df = inputData.toDF()

withTempDir { outputDir =>
withTempDir { checkpointDir =>
var query: StreamingQuery = null
try {
query =
df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

inputData.addData("1", "2", "3")
inputData.addData("4", "5")

failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
if (query != null) {
query.stop()
}
}

val additionalFile = new File(outputDir, "additional.txt")
stringToFile(additionalFile, "6")
additionalFile.exists()

val outputDf = spark.read.format("text").load(outputDir.getCanonicalPath)
.selectExpr("CAST(value AS INT)").as[Int]
if (ignoreMetadata) {
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5, 6)
} else {
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
}
}
}
}
}
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,75 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-35565: read data from outputs of another streaming query but ignore its metadata") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> "true") {
withTempDirs { case (outputDir, checkpointDir1) =>
// q0 is a streaming query that reads from memory and writes to text files
val q0Source = MemoryStream[String]
val q0 =
q0Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir1.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

q0Source.addData("keep0")
q0.processAllAvailable()
q0.stop()
Utils.deleteRecursively(new File(outputDir.getCanonicalPath + "/" +
FileStreamSink.metadataDir))

withTempDir { checkpointDir2 =>
// q1 is a streaming query that reads from memory and writes to text files too
val q1Source = MemoryStream[String]
val q1 =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir2.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

// q2 is a streaming query that reads both q0 and q1's text outputs
val q2 =
createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")

def q1AddData(data: String*): StreamAction =
Execute { _ =>
q1Source.addData(data)
q1.processAllAvailable()
}

def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }

testStream(q2)(
// batch 0
q1AddData("drop1", "keep2"),
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2"),

q1AddData("keep3"),
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2", "keep3"),

// batch 2: check that things work well when the sink log gets compacted
q1AddData("keep4"),
Assert {
// compact interval is 3, so file "2.compact" should exist
new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
},
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2", "keep3", "keep4"),

Execute { _ => q1.stop() }
)
}
}
}
}

test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
Expand Down