diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index e1c9b82ec2ac..876671c61d81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -43,12 +43,20 @@ object FileStreamSink extends Logging { path match { case Seq(singlePath) => val hdfsPath = new Path(singlePath) - val fs = hdfsPath.getFileSystem(hadoopConf) - if (fs.isDirectory(hdfsPath)) { - val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) - fs.exists(metadataPath) - } else { - false + try { + val fs = hdfsPath.getFileSystem(hadoopConf) + if (fs.isDirectory(hdfsPath)) { + val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf) + fs.exists(metadataPath) + } else { + false + } + } catch { + case e: SparkException => throw e + case NonFatal(e) => + logWarning(s"Assume no metadata directory. Error while looking for " + + s"metadata directory in the path: $singlePath.", e) + false } case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 4ccab58d24fe..53ef83206323 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,17 +17,18 @@ package org.apache.spark.sql.streaming -import java.io.File +import java.io.{File, IOException} import java.nio.file.Files import java.util.Locale import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.hadoop.mapreduce.JobContext import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{AnalysisException, DataFrame} @@ -575,6 +576,30 @@ abstract class FileStreamSinkSuite extends StreamTest { } } } + + test("formatCheck fail should not fail the query") { + withSQLConf( + "fs.file.impl" -> classOf[FailFormatCheckFileSystem].getName, + "fs.file.impl.disable.cache" -> "true") { + withTempDir { tempDir => + val path = new File(tempDir, "text").getCanonicalPath + Seq("foo").toDF.write.format("text").save(path) + spark.read.format("text").load(path) + } + } + } + + test("fail to check glob path should not fail the query") { + withSQLConf( + "fs.file.impl" -> classOf[FailFormatCheckFileSystem].getName, + "fs.file.impl.disable.cache" -> "true") { + withTempDir { tempDir => + val path = new File(tempDir, "text").getCanonicalPath + Seq("foo").toDF.write.format("text").save(path) + spark.read.format("text").load(path + "/*") + } + } + } } object PendingCommitFilesTrackingManifestFileCommitProtocol { @@ -685,3 +710,19 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite { // TODO: test partition pruning when file source V2 supports it. } } + +/** + * A special file system that fails when accessing metadata log directory or using a glob path to + * access. + */ +class FailFormatCheckFileSystem extends RawLocalFileSystem { + override def getFileStatus(f: Path): FileStatus = { + if (f.getName == FileStreamSink.metadataDir) { + throw new IOException("cannot access metadata log") + } + if (SparkHadoopUtil.get.isGlobPath(f)) { + throw new IOException("fail to access a glob path") + } + super.getFileStatus(f) + } +}