diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 7c1f6ca42c1f..194b5419af21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -96,7 +96,8 @@ abstract class StreamExecution( val resolvedCheckpointRoot = { val checkpointPath = new Path(checkpointRoot) - val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + val checkpointFileManager = CheckpointFileManager.create(checkpointPath, + sparkSession.sessionState.newHadoopConf()) if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) && StreamExecution.containsSpecialCharsInPath(checkpointPath)) { // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` @@ -106,7 +107,7 @@ abstract class StreamExecution( new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString val legacyCheckpointDirExists = try { - fs.exists(new Path(legacyCheckpointDir)) + checkpointFileManager.exists(new Path(legacyCheckpointDir)) } catch { case NonFatal(e) => // We may not have access to this directory. Don't fail the query if that happens. @@ -133,9 +134,8 @@ abstract class StreamExecution( .stripMargin) } } - val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - fs.mkdirs(checkpointDir) - checkpointDir.toString + checkpointFileManager.mkdirs(checkpointPath) + checkpointPath.toString } logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.") @@ -388,8 +388,9 @@ abstract class StreamExecution( val checkpointPath = new Path(resolvedCheckpointRoot) try { logInfo(s"Deleting checkpoint $checkpointPath.") - val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - fs.delete(checkpointPath, true) + val manager = CheckpointFileManager.create(checkpointPath, + sparkSession.sessionState.newHadoopConf()) + manager.delete(checkpointPath) } catch { case NonFatal(e) => // Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index abee5f6017df..64f05eb8d716 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -242,8 +242,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo // If offsets have already been created, we trying to resume a query. if (!recoverFromCheckpointLocation) { val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) - if (fs.exists(checkpointPath)) { + val checkpointFileManager = CheckpointFileManager.create(checkpointPath, + df.sparkSession.sessionState.newHadoopConf()) + if (checkpointFileManager.exists(checkpointPath)) { throw new AnalysisException( s"This query does not support recovering from checkpoint location. " + s"Delete $checkpointPath to start over.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 1ed2599444c5..b89f89627c76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -444,7 +444,9 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[String] val df = inputData.toDS().map(_ + "foo").groupBy("value").agg(count("*")) // Test StreamingQuery.display + val dir1 = Utils.createTempDir().getCanonicalFile val q = df.writeStream.queryName("memory_explain").outputMode("complete").format("memory") + .option("checkpointLocation", "file://" + dir1.getCanonicalPath) .start() .asInstanceOf[StreamingQueryWrapper] .streamingQuery @@ -853,7 +855,6 @@ class StreamSuite extends StreamTest { test("should resolve the checkpoint path") { withTempDir { dir => val checkpointLocation = dir.getCanonicalPath - assert(!checkpointLocation.startsWith("file:/")) val query = MemoryStream[Int].toDF .writeStream .option("checkpointLocation", checkpointLocation) @@ -862,7 +863,7 @@ class StreamSuite extends StreamTest { try { val resolvedCheckpointDir = query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot - assert(resolvedCheckpointDir.startsWith("file:/")) + assert(resolvedCheckpointDir.equals(checkpointLocation)) } finally { query.stop() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 3ad893f871c9..c9734fa533ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.io.File +import java.net.URI import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.CountDownLatch @@ -649,7 +650,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(6, 3, 6, 3, 1, 1), AssertOnQuery("metadata log should contain only two files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) + val metadataLogDir = new java.io.File( + new URI("file://" + q.offsetLog.metadataPath.toUri)) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 2 && toTest.head == "1") @@ -675,7 +677,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckAnswer(1, 2, 1, 2, 3, 4, 5, 6, 7, 8), AssertOnQuery("metadata log should contain three files") { q => - val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toUri) + val metadataLogDir = new java.io.File( + new URI("file://" + q.offsetLog.metadataPath.toUri)) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(!_.endsWith(".crc")).sorted // Workaround for SPARK-17475 assert(toTest.size == 3 && toTest.head == "2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index c630f1497a17..f776f165e46f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -377,14 +377,14 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/0"), + meq(s"${checkpointLocation.toString}/sources/0"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( any(), - meq(s"${new Path(makeQualifiedPath(checkpointLocation.toString)).toString}/sources/1"), + meq(s"${checkpointLocation.toString}/sources/1"), meq(None), meq("org.apache.spark.sql.streaming.test"), meq(Map.empty)) @@ -633,10 +633,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { query.processAllAvailable() val checkpointDir = new Path( query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) - val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) - assert(fs.exists(checkpointDir)) + val checkpointFileManager = CheckpointFileManager.create(checkpointDir, + spark.sessionState.newHadoopConf()) + assert(checkpointFileManager.exists(checkpointDir)) query.stop() - assert(!fs.exists(checkpointDir)) + assert(!checkpointFileManager.exists(checkpointDir)) } testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") { @@ -656,16 +657,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val query = input.toDS.map(_ / 0).writeStream.format("console").start() val checkpointDir = new Path( query.asInstanceOf[StreamingQueryWrapper].streamingQuery.resolvedCheckpointRoot) - val fs = checkpointDir.getFileSystem(spark.sessionState.newHadoopConf()) - assert(fs.exists(checkpointDir)) + val checkpointFileManager = CheckpointFileManager.create(checkpointDir, + spark.sessionState.newHadoopConf()) + assert(checkpointFileManager.exists(checkpointDir)) input.addData(1) intercept[StreamingQueryException] { query.awaitTermination() } if (!checkpointMustBeDeleted) { - assert(fs.exists(checkpointDir)) + assert(checkpointFileManager.exists(checkpointDir)) } else { - assert(!fs.exists(checkpointDir)) + assert(!checkpointFileManager.exists(checkpointDir)) } }