diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index e01ff56752c3..0009c2d7caca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -22,7 +22,6 @@ import java.net.{URI, URL} import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData} import java.util.{Locale, Properties, ServiceConfigurationError} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.hadoop.fs.permission.FsPermission import org.mockito.Mockito.{mock, spy, when} @@ -36,7 +35,6 @@ import org.apache.spark.sql.execution.datasources.orc.OrcTest import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager -import org.apache.spark.sql.execution.streaming.state.RenameReturnsFalseFileSystem import org.apache.spark.sql.functions.{lit, lower, struct, sum, udf} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.EXCEPTION @@ -670,28 +668,34 @@ class QueryExecutionErrorsSuite } test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not exist") { - var srcPath: Path = null - val e = intercept[SparkFileNotFoundException]( - withTempPath { p => - val conf = new Configuration() - conf.set("fs.test.impl", classOf[RenameReturnsFalseFileSystem].getName) - conf.set("fs.defaultFS", "test:///") - val basePath = new Path(p.getAbsolutePath) - val fm = new FileSystemBasedCheckpointFileManager(basePath, conf) - srcPath = new Path(s"$basePath/file") - assert(!fm.exists(srcPath)) - fm.createAtomic(srcPath, overwriteIfPossible = true).cancel() - assert(!fm.exists(srcPath)) - val dstPath = new Path(s"$basePath/new_file") - fm.renameTempFile(srcPath, dstPath, true) + withTempPath { p => + withSQLConf( + "spark.sql.streaming.checkpointFileManagerClass" -> + classOf[FileSystemBasedCheckpointFileManager].getName, + "fs.file.impl" -> classOf[FakeFileSystemNeverExists].getName, + // FileSystem caching could cause a different implementation of fs.file to be used + "fs.file.impl.disable.cache" -> "true" + ) { + val checkpointLocation = p.getAbsolutePath + + val ds = spark.readStream.format("rate").load() + val e = intercept[SparkFileNotFoundException] { + ds.writeStream + .option("checkpointLocation", checkpointLocation) + .queryName("_") + .format("memory") + .start() + } + + val expectedPath = p.toURI + checkError( + exception = e, + errorClass = "RENAME_SRC_PATH_NOT_FOUND", + matchPVals = true, + parameters = Map("sourcePath" -> s"$expectedPath.+") + ) } - ) - checkError( - exception = e, - errorClass = "RENAME_SRC_PATH_NOT_FOUND", - parameters = Map( - "sourcePath" -> s"$srcPath" - )) + } } test("UNSUPPORTED_FEATURE.JDBC_TRANSACTION: the target JDBC server does not support " + @@ -781,3 +785,7 @@ class FakeFileSystemSetPermission extends LocalFileSystem { class FakeFileSystemAlwaysExists extends DebugFilesystem { override def exists(f: Path): Boolean = true } + +class FakeFileSystemNeverExists extends DebugFilesystem { + override def exists(f: Path): Boolean = false +}