Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class FileStreamSource(
val newDataSource =
DataSource(
sparkSession,
paths = files.map(_.path),
paths = files.map(f => new Path(new URI(f.path)).toString),
userSpecifiedSchema = Some(schema),
partitionColumns = partitionColumns,
className = fileFormatClassName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,18 @@ abstract class FileStreamSourceTest
}
}

case class AddTextFileDataWithSpaceInFileName(content: String, src: File, tmp: File)
Copy link
Member

@zsxwing zsxwing Jan 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that adding a new parameter to AddTextFileData rather than introducing a new class, such as

case class AddTextFileData(content: String, src: File, tmp: File, tempFilePrefix: String = "text")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will update and add the test for file sink. Thanks for the review.

extends AddFileData {

override def addData(source: FileStreamSource): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text text"))
val finalFile = new File(src, tempFile.getName)
src.mkdirs()
require(stringToFile(tempFile, content).renameTo(finalFile))
logInfo(s"Written text '$content' to file $finalFile")
}
}

case class AddOrcFileData(data: DataFrame, src: File, tmp: File) extends AddFileData {
override def addData(source: FileStreamSource): Unit = {
AddOrcFileData.writeToFile(data, src, tmp)
Expand Down Expand Up @@ -408,6 +420,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-21996 read from text files -- file name has space") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we run the same test also for the other input format, ie. parquet, orc, ... ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should be enough. The issue is in file stream source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for this PR it is, but it would be great if we can ensure that all the data sources have the same behavior... Maybe we can do this is another PR if you think it is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I meant it's not an issue of file formats. There are not some special codes in file stream source. If there should be any tests for such issue, they should be inside file format tests.

withTempDirs { case (src, tmp) =>
val textStream = createFileStream("text", src.getCanonicalPath)
val filtered = textStream.filter($"value" contains "keep")

testStream(filtered)(
AddTextFileDataWithSpaceInFileName("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3")
)
}
}

test("read from textfile") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.textFile(src.getCanonicalPath)
Expand Down