Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.{URI, URL}
import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}
import java.util.{Locale, Properties, ServiceConfigurationError}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocalFileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.mockito.Mockito.{mock, spy, when}
Expand All @@ -36,7 +35,6 @@ import org.apache.spark.sql.execution.datasources.orc.OrcTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager
import org.apache.spark.sql.execution.streaming.state.RenameReturnsFalseFileSystem
import org.apache.spark.sql.functions.{lit, lower, struct, sum, udf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.EXCEPTION
Expand Down Expand Up @@ -670,28 +668,34 @@ class QueryExecutionErrorsSuite
}

test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not exist") {
var srcPath: Path = null
val e = intercept[SparkFileNotFoundException](
withTempPath { p =>
val conf = new Configuration()
conf.set("fs.test.impl", classOf[RenameReturnsFalseFileSystem].getName)
conf.set("fs.defaultFS", "test:///")
val basePath = new Path(p.getAbsolutePath)
val fm = new FileSystemBasedCheckpointFileManager(basePath, conf)
srcPath = new Path(s"$basePath/file")
assert(!fm.exists(srcPath))
fm.createAtomic(srcPath, overwriteIfPossible = true).cancel()
assert(!fm.exists(srcPath))
val dstPath = new Path(s"$basePath/new_file")
fm.renameTempFile(srcPath, dstPath, true)
withTempPath { p =>
withSQLConf(
"spark.sql.streaming.checkpointFileManagerClass" ->
classOf[FileSystemBasedCheckpointFileManager].getName,
"fs.file.impl" -> classOf[FakeFileSystemNeverExists].getName,
// FileSystem caching could cause a different implementation of fs.file to be used
"fs.file.impl.disable.cache" -> "true"
) {
val checkpointLocation = p.getAbsolutePath

val ds = spark.readStream.format("rate").load()
val e = intercept[SparkFileNotFoundException] {
ds.writeStream
.option("checkpointLocation", checkpointLocation)
.queryName("_")
.format("memory")
.start()
}

val expectedPath = p.toURI
checkError(
exception = e,
errorClass = "RENAME_SRC_PATH_NOT_FOUND",
matchPVals = true,
parameters = Map("sourcePath" -> s"$expectedPath.+")
)
}
)
checkError(
exception = e,
errorClass = "RENAME_SRC_PATH_NOT_FOUND",
parameters = Map(
"sourcePath" -> s"$srcPath"
))
}
}

test("UNSUPPORTED_FEATURE.JDBC_TRANSACTION: the target JDBC server does not support " +
Expand Down Expand Up @@ -781,3 +785,7 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
class FakeFileSystemAlwaysExists extends DebugFilesystem {
override def exists(f: Path): Boolean = true
}

class FakeFileSystemNeverExists extends DebugFilesystem {
override def exists(f: Path): Boolean = false
}