Skip to content

Commit 8d16724

Browse files
windpigercmonkey
authored andcommitted
[SPARK-19329][SQL] Reading from or writing to a datasource table with a non pre-existing location should succeed
## What changes were proposed in this pull request? when we insert data into a datasource table use `sqlText`, and the table has an not exists location, this will throw an Exception. example: ``` spark.sql("create table t(a string, b int) using parquet") spark.sql("alter table t set location '/xx'") spark.sql("insert into table t select 'c', 1") ``` Exception: ``` com.google.common.util.concurrent.UncheckedExecutionException: org.apache.spark.sql.AnalysisException: Path does not exist: /xx; at com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4814) at com.google.common.cache.LocalCache$LocalLoadingCache.apply(LocalCache.java:4830) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:122) at org.apache.spark.sql.hive.HiveSessionCatalog.lookupRelation(HiveSessionCatalog.scala:69) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:456) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:465) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:463) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:463) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:453) ``` As discussed following comments, we should unify the action when we reading from or writing to a datasource table with a non pre-existing locaiton: 1. reading from a datasource table: return 0 rows 2. writing to a datasource table: write data successfully ## How was this patch tested? unit test added Author: windpiger <[email protected]> Closes apache#16672 from windpiger/insertNotExistLocation.
1 parent b9d963c commit 8d16724

File tree

2 files changed

+121
-1
lines changed

2 files changed

+121
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
233233
// TODO: improve `InMemoryCatalog` and remove this limitation.
234234
catalogTable = if (withHiveSupport) Some(table) else None)
235235

236-
LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
236+
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
237+
catalogTable = Some(table))
237238
}
238239
})
239240
}

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1832,4 +1832,123 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
18321832
}
18331833
}
18341834
}
1835+
1836+
test("insert data to a data source table which has a not existed location should succeed") {
1837+
withTable("t") {
1838+
withTempDir { dir =>
1839+
spark.sql(
1840+
s"""
1841+
|CREATE TABLE t(a string, b int)
1842+
|USING parquet
1843+
|OPTIONS(path "$dir")
1844+
""".stripMargin)
1845+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1846+
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1847+
assert(table.location.stripSuffix("/") == expectedPath)
1848+
1849+
dir.delete
1850+
val tableLocFile = new File(table.location.stripPrefix("file:"))
1851+
assert(!tableLocFile.exists)
1852+
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
1853+
assert(tableLocFile.exists)
1854+
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
1855+
1856+
Utils.deleteRecursively(dir)
1857+
assert(!tableLocFile.exists)
1858+
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
1859+
assert(tableLocFile.exists)
1860+
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
1861+
1862+
val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
1863+
val newDirFile = new File(newDir)
1864+
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
1865+
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))
1866+
1867+
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1868+
assert(table1.location == newDir)
1869+
assert(!newDirFile.exists)
1870+
1871+
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
1872+
assert(newDirFile.exists)
1873+
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
1874+
}
1875+
}
1876+
}
1877+
1878+
test("insert into a data source table with no existed partition location should succeed") {
1879+
withTable("t") {
1880+
withTempDir { dir =>
1881+
spark.sql(
1882+
s"""
1883+
|CREATE TABLE t(a int, b int, c int, d int)
1884+
|USING parquet
1885+
|PARTITIONED BY(a, b)
1886+
|LOCATION "$dir"
1887+
""".stripMargin)
1888+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1889+
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1890+
assert(table.location.stripSuffix("/") == expectedPath)
1891+
1892+
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
1893+
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
1894+
1895+
val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
1896+
Utils.deleteRecursively(partLoc)
1897+
assert(!partLoc.exists())
1898+
// insert overwrite into a partition which location has been deleted.
1899+
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
1900+
assert(partLoc.exists())
1901+
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)
1902+
}
1903+
}
1904+
}
1905+
1906+
test("read data from a data source table which has a not existed location should succeed") {
1907+
withTable("t") {
1908+
withTempDir { dir =>
1909+
spark.sql(
1910+
s"""
1911+
|CREATE TABLE t(a string, b int)
1912+
|USING parquet
1913+
|OPTIONS(path "$dir")
1914+
""".stripMargin)
1915+
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1916+
val expectedPath = dir.getAbsolutePath.stripSuffix("/")
1917+
assert(table.location.stripSuffix("/") == expectedPath)
1918+
1919+
dir.delete()
1920+
checkAnswer(spark.table("t"), Nil)
1921+
1922+
val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
1923+
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
1924+
1925+
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
1926+
assert(table1.location == newDir)
1927+
assert(!new File(newDir).exists())
1928+
checkAnswer(spark.table("t"), Nil)
1929+
}
1930+
}
1931+
}
1932+
1933+
test("read data from a data source table with no existed partition location should succeed") {
1934+
withTable("t") {
1935+
withTempDir { dir =>
1936+
spark.sql(
1937+
s"""
1938+
|CREATE TABLE t(a int, b int, c int, d int)
1939+
|USING parquet
1940+
|PARTITIONED BY(a, b)
1941+
|LOCATION "$dir"
1942+
""".stripMargin)
1943+
spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
1944+
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)
1945+
1946+
// select from a partition which location has been deleted.
1947+
Utils.deleteRecursively(dir)
1948+
assert(!dir.exists())
1949+
spark.sql("REFRESH TABLE t")
1950+
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
1951+
}
1952+
}
1953+
}
18351954
}

0 commit comments

Comments
 (0)