Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
// TODO: improve `InMemoryCatalog` and remove this limitation.
catalogTable = if (withHiveSupport) Some(table) else None)

LogicalRelation(dataSource.resolveRelation(), catalogTable = Some(table))
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false),
catalogTable = Some(table))
}
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1816,4 +1816,127 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}

test("insert data to a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "file:${dir.getCanonicalPath}")
Copy link
Member

@gatorsmile gatorsmile Feb 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • First, you do not need to add the file:
  • Second, you still need to adjust the indent.
        spark.sql(
          s"""
            |CREATE TABLE t(a int, b int, c int, d int)
            |USING parquet
            |PARTITIONED BY(a, b)
            |LOCATION '$dir'
           """.stripMargin)
        val expectedPath = dir.getAbsolutePath.stripSuffix("/")

""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete
val tableLocFile = new File(table.location.stripPrefix("file:"))
assert(!tableLocFile.exists)
spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

Utils.deleteRecursively(dir)
assert(!tableLocFile.exists)
spark.sql("INSERT OVERWRITE TABLE t SELECT 'c', 1")
assert(tableLocFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new File(dir, "x")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will fix this when I do another pr, thanks~

val newDirFile = new File(newDir)
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")
spark.sessionState.catalog.refreshTable(TableIdentifier("t"))

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!newDirFile.exists)

spark.sql("INSERT INTO TABLE t SELECT 'c', 1")
assert(newDirFile.exists)
checkAnswer(spark.table("t"), Row("c", 1) :: Nil)
}
}
}

test("insert into a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

val partLoc = new File(s"${dir.getAbsolutePath}/a=1")
Utils.deleteRecursively(partLoc)
assert(!partLoc.exists())
// insert overwrite into a partition which location has been deleted.
spark.sql("INSERT OVERWRITE TABLE t PARTITION(a=1, b=2) SELECT 7, 8")
assert(partLoc.exists())
checkAnswer(spark.table("t"), Row(7, 8, 1, 2) :: Nil)

// TODO:insert into a partition after alter the partition location by alter command
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To other reviewers: ALTER TABLE SET LOCATION for partition is not allowed for tables defined using the datasource API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found there is a bug in this situation. and I create a jira
https://issues.apache.org/jira/browse/SPARK-19577

shall we just forbid this situation or fix it?

}
}
}

test("read data from a data source table which has a not existed location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a string, b int)
|USING parquet
|OPTIONS(path "file:${dir.getAbsolutePath}")
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
val expectedPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
assert(table.location.stripSuffix("/") == expectedPath)

dir.delete()
checkAnswer(spark.table("t"), Nil)

val newDir = dir.getAbsolutePath.stripSuffix("/") + "/x"
spark.sql(s"ALTER TABLE t SET LOCATION '$newDir'")

val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table1.location == newDir)
assert(!new File(newDir).exists())
checkAnswer(spark.table("t"), Nil)
}
}
}

test("read data from a data source table with no existed partition location should succeed") {
withTable("t") {
withTempDir { dir =>
spark.sql(
s"""
|CREATE TABLE t(a int, b int, c int, d int)
|USING parquet
|PARTITIONED BY(a, b)
|LOCATION "file:${dir.getCanonicalPath}"
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not being used.


spark.sql("INSERT INTO TABLE t PARTITION(a=1, b=2) SELECT 3, 4")
checkAnswer(spark.table("t"), Row(3, 4, 1, 2) :: Nil)

// select from a partition which location has been deleted.
Utils.deleteRecursively(dir)
assert(!dir.exists())
spark.sql("REFRESH TABLE t")
checkAnswer(spark.sql("select * from t where a=1 and b=2"), Nil)
}
}
}
}