Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

test("output metrics on records written") {
val file = new File(tmpDir, getClass.getSimpleName)
val filePath = "file://" + file.getAbsolutePath
val filePath = file.toURI.toURL.toString

val records = runAndReturnRecordsWritten {
sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
Expand All @@ -269,7 +269,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext

test("output metrics on records written - new Hadoop API") {
val file = new File(tmpDir, getClass.getSimpleName)
val filePath = "file://" + file.getAbsolutePath
val filePath = file.toURI.toURL.toString

val records = runAndReturnRecordsWritten {
sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {

Expand Down Expand Up @@ -147,6 +148,9 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
}

test("test late binding start offsets") {
// Kafka fails to remove the logs on Windows. See KAFKA-1194.
assume(!Utils.isWindows)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can't just ignore the deletion failure because it causes the test failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var kafkaUtils: KafkaTestUtils = null
try {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils

abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {

Expand Down Expand Up @@ -161,11 +162,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
// Make sure Spark 2.1.0 will throw an exception when reading the new log
intercept[java.lang.IllegalArgumentException] {
// Simulate how Spark 2.1.0 reads the log
val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
val length = in.read()
val bytes = new Array[Byte](length)
in.read(bytes)
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
}
}
}
}
Expand All @@ -181,13 +183,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
"subscribe" -> topic
)

val from = Paths.get(
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
val from = new File(
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
Files.copy(from, to)

val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
"", parameters)
val source = provider.createSource(
spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
val deserializedOffset = source.getOffset.get
val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
assert(referenceOffset == deserializedOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))

sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
sql(s"CREATE TABLE tab2 using json location '${tempDir.toURI}'")
checkAnswer(spark.table("tab2"), Row("a", "b"))
}
}
Expand Down Expand Up @@ -1814,7 +1814,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val defaultTablePath = spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get

sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
spark.catalog.refreshTable("tbl")
// SET LOCATION won't move data from previous table path to new table path.
assert(spark.table("tbl").count() == 0)
Expand All @@ -1836,15 +1836,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
|OPTIONS(path "$path")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)
assert(table.location == path)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
Expand All @@ -1859,8 +1859,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(newDir)
val newDirFile = new File(dir, "x")
val newDir = newDirFile.toURI.toString
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

Expand All @@ -1878,16 +1878,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "$dir"
|LOCATION "$path"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)
assert(table.location == path)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
Expand All @@ -1906,25 +1906,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
val path = dir.toURI.toString.stripSuffix("/")
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "$dir")
|OPTIONS(path "$path")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
assert(table.location.stripSuffix("/") == expectedPath)
assert(table.location == path)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
val newDirFile = new File(dir, "x")
val newDir = newDirFile.toURI.toString
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!new File(newDir).exists())
assert(!newDirFile.exists())
checkAnswer(spark.table("t"), Nil)
}
}
Expand All @@ -1938,7 +1939,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "$dir"
|LOCATION "${dir.toURI}"
""".stripMargin)
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ class HiveDDLSuite
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)

sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'")
sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
}

Expand Down