Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,16 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val FILESTREAM_SINK_METADATA_IGNORED =
buildConf("spark.sql.streaming.fileStreamSink.metadata.ignored")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the guideline for naming configurations, maybe the config can be named like spark.sql.streaming.fileStreamSink.ignoreMetadata or spark.sql.streaming.fileStreamSink.formatCheck.enabled, or any other good names :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally spark.sql.streaming.fileStreamSink.ignoreMetadata sounds better. I couldn't get what formatCheck means intuitively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.streaming.fileStreamSink.ignoreMetadata sounds good.

.internal()
.doc("If this is enabled, when Spark reads from the results of a streaming query written " +
"by `FileStreamSink`, Spark will ignore the metadata log and treat it as normal path to " +
"read, e.g. listing files using HDFS APIs.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)

val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like `${var}`, `${system:var}`, " +
Expand Down Expand Up @@ -3390,6 +3400,8 @@ class SQLConf extends Serializable with Logging {
def statefulOperatorCorrectnessCheckEnabled: Boolean =
getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED)

def fileStreamSinkMetadataIgnored: Boolean = getConf(FILESTREAM_SINK_METADATA_IGNORED)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)

def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,15 @@ case class DataSource(
baseRelation

// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
// instead of listing them using HDFS APIs. Note that the config
// `spark.sql.streaming.fileStreamSink.metadata.ignored` can be enabled to ignore the
// metadata log.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
if !sparkSession.sessionState.conf.fileStreamSinkMetadataIgnored &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking the config on the caller side in three places, maybe we can directly check the config in FileStreamSink.hasMetadata? These 2 approaches should be equivalent while the latter one only changes single code segment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either is fine for me.

FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
newHadoopConfiguration(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
caseInsensitiveOptions, userSpecifiedSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ abstract class FileTable(
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
val ignoreMetadata = sparkSession.sessionState.conf.fileStreamSinkMetadataIgnored
if (!ignoreMetadata &&
FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) {
// We are reading from the results of a streaming query. We will load files from
// the metadata log instead of listing them using HDFS APIs.
new MetadataLogFileIndex(sparkSession, new Path(paths.head),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ class FileStreamSource(
@volatile private[sql] var sourceHasMetadata: Option[Boolean] =
if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None

// Whether ignoring metadata directory.
private val ignoreMetadata = sparkSession.sessionState.conf.fileStreamSinkMetadataIgnored

private def allFilesUsingInMemoryFileIndex() = {
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath)
val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
Expand Down Expand Up @@ -275,6 +278,10 @@ class FileStreamSource(

var allFiles: Seq[FileStatus] = null
sourceHasMetadata match {
case _ if ignoreMetadata =>
logWarning(s"`spark.sql.streaming.fileStreamSink.metadata.ignored` is enabled, ignoring " +
"metadata directory and using `InMemoryFileIndex`.")
allFiles = allFilesUsingInMemoryFileIndex()
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
setSourceHasMetadata(Some(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
Expand Down Expand Up @@ -600,6 +601,51 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
}

test("SPARK-35565: Ignore metadata directory when reading sink output") {
Seq(true, false).foreach { ignoreMetadata =>
withSQLConf(SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> ignoreMetadata.toString) {
val inputData = MemoryStream[String]
val df = inputData.toDF()

withTempDir { outputDir =>
withTempDir { checkpointDir =>
var query: StreamingQuery = null
try {
query =
df.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

inputData.addData("1", "2", "3")
inputData.addData("4", "5")

failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
if (query != null) {
query.stop()
}
}

val additionalFile = new File(outputDir, "additional.txt")
stringToFile(additionalFile, "6")
additionalFile.exists()

val outputDf = spark.read.format("text").load(outputDir.getCanonicalPath)
.selectExpr("CAST(value AS INT)").as[Int]
if (ignoreMetadata) {
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5, 6)
} else {
checkDatasetUnorderly(outputDf, 1, 2, 3, 4, 5)
}
}
}
}
}
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,75 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-35565: read data from outputs of another streaming query but ignore its metadata") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
SQLConf.FILESTREAM_SINK_METADATA_IGNORED.key -> "true") {
withTempDirs { case (outputDir, checkpointDir1) =>
// q0 is a streaming query that reads from memory and writes to text files
val q0Source = MemoryStream[String]
val q0 =
q0Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir1.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

q0Source.addData("keep0")
q0.processAllAvailable()
q0.stop()
Utils.deleteRecursively(new File(outputDir.getCanonicalPath + "/" +
FileStreamSink.metadataDir))

withTempDir { checkpointDir2 =>
// q1 is a streaming query that reads from memory and writes to text files too
val q1Source = MemoryStream[String]
val q1 =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir2.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

// q2 is a streaming query that reads both q0 and q1's text outputs
val q2 =
createFileStream("text", outputDir.getCanonicalPath).filter($"value" contains "keep")

def q1AddData(data: String*): StreamAction =
Execute { _ =>
q1Source.addData(data)
q1.processAllAvailable()
}

def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }

testStream(q2)(
// batch 0
q1AddData("drop1", "keep2"),
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2"),

q1AddData("keep3"),
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2", "keep3"),

// batch 2: check that things work well when the sink log gets compacted
q1AddData("keep4"),
Assert {
// compact interval is 3, so file "2.compact" should exist
new File(outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists()
},
q2ProcessAllAvailable(),
CheckAnswer("keep0", "keep2", "keep3", "keep4"),

Execute { _ => q1.stop() }
)
}
}
}
}

test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
Expand Down