Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi
var mode = SaveMode.Append
var isOverWriteTable = false
var isOverWritePartition = false
if (overwrite && catalogTable.partitionFields.isEmpty) {
// insert overwrite non-partition table
if (overwrite && partitionSpec.isEmpty) {
// insert overwrite table
mode = SaveMode.Overwrite
isOverWriteTable = true
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
// Insert overwrite dynamic partition

// Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
Expand All @@ -379,14 +380,13 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)

// Insert overwrite dynamic partition
// Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| select 2 as id, 'a2' as name, 10 as price, 1000 as ts, '2021-01-06' as dt
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

Expand Down Expand Up @@ -433,122 +433,22 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName " +
s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
Seq(2, "a2", 12.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06"),
Seq(3, "a1", 10.0, 1000, "2021-01-04")
)

// test insert overwrite non-partitioned table
// Test insert overwrite non-partitioned table
spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10, 1000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 1000)
)
})
}

test("Test Insert Overwrite Table for V2 Table") {
withSQLConf("hoodie.schema.on.read.enable" -> "true") {
Copy link
Copy Markdown
Contributor

@stream2000 stream2000 Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2 Table is only enabled when hoodie.schema.on.read.enable is true, otherwise v2Table.v1TableWrapper will be used(see org.apache.spark.sql.hudi.catalog.HoodieCatalog#loadTable). In V2 table, we can distinguish between insert overwrite partition and insert overwrite table while we can't do this in v1 table, so I add a v2 table test here to test the different behaviors between v1 and v2 table.

Copy link
Copy Markdown
Contributor Author

@Zouxxyy Zouxxyy Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Year, I noticed that you added this config to force the use of the V2 table, but I think in the future, hudi spark3 may use v2 by default instead of being controlled by this config.
Beside, v1 table can also distinguish insert overwrite partition and insert overwrite table by checking partitionSpec is empty or not, so I think the test should be uniform.

withRecordType()(withTempDir { tmp =>
if (HoodieSparkUtils.gteqSpark3_2) {
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
|) using hudi
| tblproperties (primaryKey = 'id', preCombineField='dt')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)

// Test insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| values(1, 'a1', 10.0, 1000, '2021-01-05')
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)

// Insert overwrite table
spark.sql(
s"""
| insert overwrite table $tableName
| values (2, 'a2', 10.0, 1000, '2021-01-06')
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id")(
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert overwrite static partition
spark.sql(
s"""
| insert overwrite table $tableName partition(dt = '2021-01-05')
| select * from (select 2 , 'a2', 12.0, 1000) limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by dt")(
Seq(2, "a2", 12.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert data from another table
val tblNonPartition = generateTableName
spark.sql(
s"""
| create table $tblNonPartition (
| id int,
| name string,
| price double,
| ts long
| ) using hudi
| tblproperties (primaryKey = 'id')
| location '${tmp.getCanonicalPath}/$tblNonPartition'
""".stripMargin)
spark.sql(s"insert into $tblNonPartition select 1, 'a1', 10.0, 1000")
spark.sql(
s"""
| insert overwrite table $tableName partition(dt ='2021-01-04')
| select * from $tblNonPartition limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName order by id,dt")(
Seq(1, "a1", 10.0, 1000, "2021-01-04"),
Seq(2, "a2", 12.0, 1000, "2021-01-05"),
Seq(2, "a2", 10.0, 1000, "2021-01-06")
)

// Insert overwrite partitioned table, all partitions will be truncated
spark.sql(
s"""
| insert overwrite table $tableName
| select id + 2, name, price, ts , '2021-01-04' from $tblNonPartition limit 10
""".stripMargin)
checkAnswer(s"select id, name, price, ts, dt from $tableName " +
s"where dt >='2021-01-04' and dt <= '2021-01-06' order by id,dt")(
Seq(3, "a1", 10.0, 1000, "2021-01-04")
)

// Test insert overwrite non-partitioned table
spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 1000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 1000)
)

spark.sql(s"insert overwrite table $tblNonPartition select 2, 'a2', 10.0, 2000")
checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(2, "a2", 10.0, 2000)
)
}
})
}
spark.sql(s"insert overwrite table $tblNonPartition select 3, 'a3', 10, 1000")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there test case for insert overwrite partitioned table?

Copy link
Copy Markdown
Contributor Author

@Zouxxyy Zouxxyy Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf exist, such as lines 383 and 393

Copy link
Copy Markdown
Contributor Author

@Zouxxyy Zouxxyy Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leesf Test Insert Overwrite Table for V2 Table is unnecessary, it is completely consistent with Test Insert Overwrite, and their results must be the same in spark2 and spark3.
So I just removed it, and fix spark2 by repalce if (overwrite && catalogTable.partitionFields.isEmpty) with if (overwrite && partitionSpec.isEmpty)

checkAnswer(s"select id, name, price, ts from $tblNonPartition")(
Seq(3, "a3", 10.0, 1000)
)
})
}


test("Test Different Type of Partition Column") {
withRecordType()(withTempDir { tmp =>
val typeAndValue = Seq(
Expand Down Expand Up @@ -666,7 +566,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
| tblproperties (primaryKey = 'id')
| partitioned by (dt)
""".stripMargin)
checkException(s"insert overwrite table $tableName3 values(1, 'a1', 10, '2021-07-18')")(
checkException(s"insert overwrite table $tableName3 partition(dt = '2021-07-18') values(1, 'a1', 10, '2021-07-18')")(
"Insert Overwrite Partition can not use bulk insert."
)
}
Expand Down