Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,20 @@ object FileStreamSink extends Logging {
path match {
case Seq(singlePath) =>
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
false
try {
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
false
}
} catch {
case e: SparkException => throw e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to be different from 2.4:

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Do you have specific exceptions to catch here? I can't imagine any SparkException from above logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added for fixing the UT org.apache.spark.sql.streaming.StreamingQuerySuite.detect escaped path and report the migration guide, which expects to have a SparkException thrown in checkEscapedMetadataPath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK I've missed that checkEscapedMetadataPath throws SparkException. My bad.

case NonFatal(e) =>
logWarning(s"Assume no metadata directory. Error while looking for " +
s"metadata directory in the path: $singlePath.", e)
false
}
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

package org.apache.spark.sql.streaming

import java.io.File
import java.io.{File, IOException}
import java.nio.file.Files
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.JobContext

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.{AnalysisException, DataFrame}
Expand Down Expand Up @@ -575,6 +576,30 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}
}

test("formatCheck fail should not fail the query") {
withSQLConf(
"fs.file.impl" -> classOf[FailFormatCheckFileSystem].getName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed here, but it's worth knowing that hadoop-common-test jar has a class org.apache.hadoop.fs.FileSystemTestHelper that lets you inject an instance of an FS into the cache. Very useful from time to time. Spark could either use that or add its own (test module) class to o.a.h.fs and call FileSystem.addFileSystemForTesting() when needed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the guidance, let me check the usage of FileSystemTestHelper.

"fs.file.impl.disable.cache" -> "true") {
withTempDir { tempDir =>
val path = new File(tempDir, "text").getCanonicalPath
Seq("foo").toDF.write.format("text").save(path)
spark.read.format("text").load(path)
}
}
}

test("fail to check glob path should not fail the query") {
withSQLConf(
"fs.file.impl" -> classOf[FailFormatCheckFileSystem].getName,
"fs.file.impl.disable.cache" -> "true") {
withTempDir { tempDir =>
val path = new File(tempDir, "text").getCanonicalPath
Seq("foo").toDF.write.format("text").save(path)
spark.read.format("text").load(path + "/*")
}
}
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down Expand Up @@ -685,3 +710,19 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
// TODO: test partition pruning when file source V2 supports it.
}
}

/**
* A special file system that fails when accessing metadata log directory or using a glob path to
* access.
*/
class FailFormatCheckFileSystem extends RawLocalFileSystem {
override def getFileStatus(f: Path): FileStatus = {
if (f.getName == FileStreamSink.metadataDir) {
throw new IOException("cannot access metadata log")
}
if (SparkHadoopUtil.get.isGlobPath(f)) {
throw new IOException("fail to access a glob path")
}
super.getFileStatus(f)
}
}