Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
*/
@throws(classOf[NoSuchTableException])
def append(): Unit = {
assertNoTempView("append")
val append = loadTable(catalog, identifier) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of a more aggressive refactor, which creates an unresolved logical plan here and leaves the table/temp view lookup to the analyzer.

For example, here we can just create AppendData.byName(UnresolvedRelation(...)).

By doing this, we can make the framework more clear: the API layer should just create logical plans, other works should be done by the analyzer and query planner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the suggestion is promising, but I'm not sure I understand all spots I need to modify. I could try to find places where InsertIntoStatement(UnresolvedRelation(...)) is resolved, but AppendData and InsertIntoStatement are not 100% same. In addition, looks like it should be done for each operation.

I'll take a look at this soon. If you can fix it easily (and you'd like to) please go ahead and I can learn from your PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in #29970

case Some(t) =>
AppendData.byName(
Expand All @@ -177,6 +178,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
*/
@throws(classOf[NoSuchTableException])
def overwrite(condition: Column): Unit = {
assertNoTempView("overwrite")
val overwrite = loadTable(catalog, identifier) match {
case Some(t) =>
OverwriteByExpression.byName(
Expand Down Expand Up @@ -204,6 +206,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we write it about AnalysisException and update the docs above? maybe at least just for the sake of matching.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @HyukjinKwon 's suggestion.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, and decided to skip because AnalysisException is thrown for whatever cases even without this case. We couldn't explain all these things. I'll add it if we prefer to explain it at least for explicit case.

@throws(classOf[NoSuchTableException])
def overwritePartitions(): Unit = {
assertNoTempView("overwritePartitions")
val dynamicOverwrite = loadTable(catalog, identifier) match {
case Some(t) =>
OverwritePartitionsDynamic.byName(
Expand All @@ -216,6 +219,12 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
runCommand("overwritePartitions")(dynamicOverwrite)
}

private def assertNoTempView(name: String): Unit = {
if (sparkSession.sessionState.catalog.isTempView(tableName)) {
throw new AnalysisException(s"Temporary view $table doesn't support $name")
}
}

/**
* Wrap an action to track the QueryExecution and time cost, then report to the user-registered
* callback functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(exc.getMessage.contains("table_name"))
}

test("Append: fail if it writes to the temp view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.table("testcat.table_name").createOrReplaceTempView("temp_view")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("temp_view").append()
}
assert(exc.getMessage.contains("temp_view"))
}

test("Append: fail if it writes to the view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("table_view").append()
}
assert(exc.getMessage.contains("table_view"))
}

test("Overwrite: overwrite by expression: true") {
spark.sql(
"CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")
Expand Down Expand Up @@ -208,6 +228,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(exc.getMessage.contains("table_name"))
}

test("Overwrite: fail if it writes to the temp view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.table("testcat.table_name").createOrReplaceTempView("temp_view")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("temp_view").overwrite(lit(true))
}
assert(exc.getMessage.contains("temp_view"))
}

test("Overwrite: fail if it writes to the view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("table_view").overwrite(lit(true))
}
assert(exc.getMessage.contains("table_view"))
}

test("OverwritePartitions: overwrite conflicting partitions") {
spark.sql(
"CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")
Expand Down Expand Up @@ -272,6 +312,26 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(exc.getMessage.contains("table_name"))
}

test("OverwritePartitions: fail if it writes to the temp view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.table("testcat.table_name").createOrReplaceTempView("temp_view")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("temp_view").overwritePartitions()
}
assert(exc.getMessage.contains("temp_view"))
}

test("OverwritePartitions: fail if it writes to the view") {
spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
spark.sql("CREATE OR REPLACE VIEW table_view AS SELECT id, data FROM testcat.table_name")

val exc = intercept[AnalysisException] {
spark.table("source").writeTo("table_view").overwritePartitions()
}
assert(exc.getMessage.contains("table_view"))
}

test("Create: basic behavior") {
spark.table("source").writeTo("testcat.table_name").create()

Expand Down