Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class FileStreamSource(
val newDataSource =
DataSource(
sparkSession,
paths = files.map(_.path),
paths = files.map(f => new Path(new URI(f.path)).toString),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
className = fileFormatClassName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ abstract class FileStreamSourceTest
protected def addData(source: FileStreamSource): Unit
}

case class AddTextFileData(content: String, src: File, tmp: File)
case class AddTextFileData(content: String, src: File, tmp: File, tmpFilePrefix: String = "text")
extends AddFileData {

override def addData(source: FileStreamSource): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val tempFile = Utils.tempFileWith(new File(tmp, tmpFilePrefix))
val finalFile = new File(src, tempFile.getName)
src.mkdirs()
require(stringToFile(tempFile, content).renameTo(finalFile))
Expand Down Expand Up @@ -408,6 +408,52 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-21996 read from text files -- file name has space") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we run the same test also for the other input format, ie. parquet, orc, ... ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should be enough. The issue is in file stream source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for this PR it is, but it would be great if we can ensure that all the data sources have the same behavior... Maybe we can do this is another PR if you think it is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I meant it's not an issue of file formats. There are not some special codes in file stream source. If there should be any tests for such issue, they should be inside file format tests.

withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
val filtered = textStream.filter($"value" contains "keep")

testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp, "text text"),
CheckAnswer("keep2", "keep3")
)
}
}

test("SPARK-21996 read from text files generated by file sink -- file name has space") {
val testTableName = "FileStreamSourceTest"
withTable(testTableName) {
withTempDirs { case (src, checkpoint) =>
val output = new File(src, "text text")
val inputData = MemoryStream[String]
val ds = inputData.toDS()

val query = ds.writeStream
.option("checkpointLocation", checkpoint.getCanonicalPath)
.format("text")
.start(output.getCanonicalPath)

try {
inputData.addData("foo")
failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
query.stop()
}

val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
val query2 = df2.writeStream.format("memory").queryName(testTableName).start()
try {
query2.processAllAvailable()
checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
} finally {
query2.stop()
}
}
}
}

test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
Expand Down