Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ abstract class StreamExecution(

val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
val checkpointFileManager = CheckpointFileManager.create(checkpointPath,
sparkSession.sessionState.newHadoopConf())
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
Expand All @@ -106,7 +107,7 @@ abstract class StreamExecution(
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
val legacyCheckpointDirExists =
try {
fs.exists(new Path(legacyCheckpointDir))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here would it make sense to keep the fs check as well since we want to catch the previously created check point directories and suggest folks migrate?

checkpointFileManager.exists(new Path(legacyCheckpointDir))
} catch {
case NonFatal(e) =>
// We may not have access to this directory. Don't fail the query if that happens.
Expand All @@ -133,9 +134,8 @@ abstract class StreamExecution(
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makeQualified disappeared from here, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. We're using the checkpointFileManager for this behavior now in the PR so we are not directly making FS operations. Looking inside of the default FileSystem impl we don't explicitly translate this to a fully-qualified path, do you think that would be an issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this ends-up in behavior change it may break existing queries. I think there must be a reason when we do such change.

fs.mkdirs(checkpointDir)
checkpointDir.toString
checkpointFileManager.mkdirs(checkpointPath)
checkpointPath.toString
}
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")

Expand Down Expand Up @@ -388,8 +388,9 @@ abstract class StreamExecution(
val checkpointPath = new Path(resolvedCheckpointRoot)
try {
logInfo(s"Deleting checkpoint $checkpointPath.")
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
val manager = CheckpointFileManager.create(checkpointPath,
sparkSession.sessionState.newHadoopConf())
manager.delete(checkpointPath)
} catch {
case NonFatal(e) =>
// Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
val checkpointFileManager = CheckpointFileManager.create(checkpointPath,
df.sparkSession.sessionState.newHadoopConf())
if (checkpointFileManager.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,9 @@ class StreamSuite extends StreamTest {
val inputData = MemoryStream[String]
val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*"))
// Test StreamingQuery.display
val dir1 = Utils.createTempDir().getCanonicalFile
val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this (and similar changes)? The original test is running fine with temp checkpoint.

.option("checkpointLocation", "file://" + dir1.getCanonicalPath)
.start()
.asInstanceOf[StreamingQueryWrapper]
.streamingQuery
Expand Down Expand Up @@ -853,7 +855,6 @@ class StreamSuite extends StreamTest {
test("should resolve the checkpoint path") {
withTempDir { dir =>
val checkpointLocation = dir.getCanonicalPath
assert(!checkpointLocation.startsWith("file:/"))
val query = MemoryStream[Int].toDF
.writeStream
.option("checkpointLocation", checkpointLocation)
Expand All @@ -862,7 +863,7 @@ class StreamSuite extends StreamTest {
try {
val resolvedCheckpointDir =
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot
assert(resolvedCheckpointDir.startsWith("file:/"))
assert(resolvedCheckpointDir.equals(checkpointLocation))
} finally {
query.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.File
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.CountDownLatch

Expand Down Expand Up @@ -649,7 +650,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckAnswer(6, 3, 6, 3, 1, 1),

AssertOnQuery("metadata log should contain only two files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
val metadataLogDir = new java.io.File(
new URI("file://" + q.offsetLog.metadataPath.toUri))
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
assert(toTest.size == 2 && toTest.head == "1")
Expand All @@ -675,7 +677,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8),

AssertOnQuery("metadata log should contain three files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri)
val metadataLogDir = new java.io.File(
new URI("file://" + q.offsetLog.metadataPath.toUri))
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475
assert(toTest.size == 3 && toTest.head == "2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {

verify(LastOptions.mockStreamSourceProvider).createSource(
any(),
meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/0"),
meq(s"${checkpointLocation.toString}/sources/0"),
meq(None),
meq("org.apache.spark.sql.streaming.test"),
meq(Map.empty))

verify(LastOptions.mockStreamSourceProvider).createSource(
any(),
meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/1"),
meq(s"${checkpointLocation.toString}/sources/1"),
meq(None),
meq("org.apache.spark.sql.streaming.test"),
meq(Map.empty))
Expand Down Expand Up @@ -633,10 +633,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
query.processAllAvailable()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
val checkpointFileManager = CheckpointFileManager.create(checkpointDir,
spark.sessionState.newHadoopConf())
assert(checkpointFileManager.exists(checkpointDir))
query.stop()
assert(!fs.exists(checkpointDir))
assert(!checkpointFileManager.exists(checkpointDir))
}

testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
Expand All @@ -656,16 +657,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
val checkpointDir = new Path(
query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot)
val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf())
assert(fs.exists(checkpointDir))
val checkpointFileManager = CheckpointFileManager.create(checkpointDir,
spark.sessionState.newHadoopConf())
assert(checkpointFileManager.exists(checkpointDir))
input.addData(1)
intercept[StreamingQueryException] {
query.awaitTermination()
}
if (!checkpointMustBeDeleted) {
assert(fs.exists(checkpointDir))
assert(checkpointFileManager.exists(checkpointDir))
} else {
assert(!fs.exists(checkpointDir))
assert(!checkpointFileManager.exists(checkpointDir))
}
}

Expand Down