Skip to content

Commit 0dd78c1

Browse files
committed
Fix Windows test failures
1 parent 776b8f1 commit 0dd78c1

File tree

5 files changed

+35
-28
lines changed

5 files changed

+35
-28
lines changed

core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
259259

260260
test("output metrics on records written") {
261261
val file = new File(tmpDir, getClass.getSimpleName)
262-
val filePath = "file://" + file.getAbsolutePath
262+
val filePath = file.toURI.toURL.toString
263263

264264
val records = runAndReturnRecordsWritten {
265265
sc.parallelize(1 to numRecords).saveAsTextFile(filePath)
@@ -269,7 +269,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
269269

270270
test("output metrics on records written - new Hadoop API") {
271271
val file = new File(tmpDir, getClass.getSimpleName)
272-
val filePath = "file://" + file.getAbsolutePath
272+
val filePath = file.toURI.toURL.toString
273273

274274
val records = runAndReturnRecordsWritten {
275275
sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString))

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter
2424

2525
import org.apache.spark.sql.QueryTest
2626
import org.apache.spark.sql.test.SharedSQLContext
27+
import org.apache.spark.util.Utils
2728

2829
class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
2930

@@ -147,6 +148,9 @@ class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLCon
147148
}
148149

149150
test("test late binding start offsets") {
151+
// Kafka fails to remove the logs on Windows. See KAFKA-1194.
152+
assume(!Utils.isWindows)
153+
150154
var kafkaUtils: KafkaTestUtils = null
151155
try {
152156
/**

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
3939
import org.apache.spark.sql.functions.{count, window}
4040
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
4141
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
42+
import org.apache.spark.util.Utils
4243

4344
abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
4445

@@ -161,11 +162,12 @@ class KafkaSourceSuite extends KafkaSourceTest {
161162
// Make sure Spark 2.1.0 will throw an exception when reading the new log
162163
intercept[java.lang.IllegalArgumentException] {
163164
// Simulate how Spark 2.1.0 reads the log
164-
val in = new FileInputStream(metadataPath.getAbsolutePath + "/0")
165-
val length = in.read()
166-
val bytes = new Array[Byte](length)
167-
in.read(bytes)
168-
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
165+
Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
166+
val length = in.read()
167+
val bytes = new Array[Byte](length)
168+
in.read(bytes)
169+
KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
170+
}
169171
}
170172
}
171173
}
@@ -181,13 +183,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
181183
"subscribe" -> topic
182184
)
183185

184-
val from = Paths.get(
185-
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").getPath)
186+
val from = new File(
187+
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
186188
val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
187189
Files.copy(from, to)
188190

189-
val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None,
190-
"", parameters)
191+
val source = provider.createSource(
192+
spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
191193
val deserializedOffset = source.getOffset.get
192194
val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L))
193195
assert(referenceOffset == deserializedOffset)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1520,7 +1520,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
15201520
val e = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING json") }.getMessage
15211521
assert(e.contains("Unable to infer schema for JSON. It must be specified manually"))
15221522

1523-
sql(s"CREATE TABLE tab2 using json location '${tempDir.getCanonicalPath}'")
1523+
sql(s"CREATE TABLE tab2 using json location '${tempDir.toURI}'")
15241524
checkAnswer(spark.table("tab2"), Row("a", "b"))
15251525
}
15261526
}
@@ -1814,7 +1814,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18141814
val defaultTablePath = spark.sessionState.catalog
18151815
.getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
18161816

1817-
sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
1817+
sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
18181818
spark.catalog.refreshTable("tbl")
18191819
// SET LOCATION won't move data from previous table path to new table path.
18201820
assert(spark.table("tbl").count() == 0)
@@ -1836,15 +1836,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18361836
test("insert data to a data source table which has a not existed location should succeed") {
18371837
withTable("t") {
18381838
withTempDir { dir =>
1839+
val path = dir.toURI.toString.stripSuffix("/")
18391840
spark.sql(
18401841
s"""
18411842
|CREATE TABLE t(a string, b int)
18421843
|USING parquet
1843-
|OPTIONS(path "$dir")
1844+
|OPTIONS(path "$path")
18441845
""".stripMargin)
18451846
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1846-
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1847-
assert(table.location.stripSuffix("/") == expectedPath)
1847+
assert(table.location == path)
18481848

18491849
dir.delete
18501850
val tableLocFile = new File(table.location.stripPrefix("file:"))
@@ -1859,8 +1859,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18591859
assert(tableLocFile.exists)
18601860
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
18611861

1862-
val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
1863-
val newDirFile = new File(newDir)
1862+
val newDirFile = new File(dir, "x")
1863+
val newDir = newDirFile.toURI.toString
18641864
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
18651865
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
18661866

@@ -1878,16 +1878,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18781878
test("insert into a data source table with no existed partition location should succeed") {
18791879
withTable("t") {
18801880
withTempDir { dir =>
1881+
val path = dir.toURI.toString.stripSuffix("/")
18811882
spark.sql(
18821883
s"""
18831884
|CREATE TABLE t(a int, b int, c int, d int)
18841885
|USING parquet
18851886
|PARTITIONED BY(a, b)
1886-
|LOCATION "$dir"
1887+
|LOCATION "$path"
18871888
""".stripMargin)
18881889
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1889-
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1890-
assert(table.location.stripSuffix("/") == expectedPath)
1890+
assert(table.location == path)
18911891

18921892
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
18931893
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
@@ -1906,25 +1906,26 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19061906
test("read data from a data source table which has a not existed location should succeed") {
19071907
withTable("t") {
19081908
withTempDir { dir =>
1909+
val path = dir.toURI.toString.stripSuffix("/")
19091910
spark.sql(
19101911
s"""
19111912
|CREATE TABLE t(a string, b int)
19121913
|USING parquet
1913-
|OPTIONS(path "$dir")
1914+
|OPTIONS(path "$path")
19141915
""".stripMargin)
19151916
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1916-
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1917-
assert(table.location.stripSuffix("/") == expectedPath)
1917+
assert(table.location == path)
19181918

19191919
dir.delete()
19201920
checkAnswer(spark.table("t"), Nil)
19211921

1922-
val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
1922+
val newDirFile = new File(dir, "x")
1923+
val newDir = newDirFile.toURI.toString
19231924
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
19241925

19251926
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
19261927
assert(table1.location == newDir)
1927-
assert(!new File(newDir).exists())
1928+
assert(!newDirFile.exists())
19281929
checkAnswer(spark.table("t"), Nil)
19291930
}
19301931
}
@@ -1938,7 +1939,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
19381939
|CREATE TABLE t(a int, b int, c int, d int)
19391940
|USING parquet
19401941
|PARTITIONED BY(a, b)
1941-
|LOCATION "$dir"
1942+
|LOCATION "${dir.toURI}"
19421943
""".stripMargin)
19431944
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
19441945
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1570,7 +1570,7 @@ class HiveDDLSuite
15701570
val dataPath = new File(new File(path, "d=1"), "b=1").getCanonicalPath
15711571
Seq(1 -> 1).toDF("a", "c").write.save(dataPath)
15721572

1573-
sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.getCanonicalPath}'")
1573+
sql(s"CREATE TABLE t3 USING parquet LOCATION '${path.toURI}'")
15741574
assert(getTableColumns("t3") == Seq("a", "c", "d", "b"))
15751575
}
15761576

0 commit comments

Comments
 (0)