-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34203][SQL] Convert null partition values to __HIVE_DEFAULT_PARTITION__ in v1 In-Memory catalog
#31322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
null partition values to __HIVE_DEFAULT_PARTITION__ in the In-Memory catalognull partition values to __HIVE_DEFAULT_PARTITION__ in v1 In-Memory catalog
|
|
||
| def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = { | ||
| spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).toMap | ||
| spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).map(identity).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise I got the "Task is not serializable" exception
spark-sql> drop table ppp;
spark-sql> create table ppp(i string, j string) using parquet partitioned by (j);
spark-sql>
> INSERT OVERWRITE ppp VALUES ('1', null);
21/01/25 21:55:03 WARN log: Updating partition stats fast for: ppp
21/01/25 21:55:03 WARN log: Updated size to 396
spark-sql> alter table ppp drop partition(j=null);
Error in query: No partition is dropped. One partition spec 'Map(j -> null)' does not exist in table 'ppp' database 'default';
spark-sql> select version();
3.0.1 2b147c4cd50da32fe2b4167f97c8142102a0510d
spark-sql>I also test the 3.0.1 release, might not be just in-memory catalog? |
|
ah yes, I verified the case locally with |
|
GA passed, merging to master/3.1! (I didn't backport further because in-memory catalog is for testing purpose) |
…_PARTITION__` in v1 `In-Memory` catalog
In the PR, I propose to convert `null` partition values to `"__HIVE_DEFAULT_PARTITION__"` before storing in the `In-Memory` catalog internally. Currently, the `In-Memory` catalog maintains null partitions as `"__HIVE_DEFAULT_PARTITION__"` in file system but as `null` values in memory that could cause some issues like in SPARK-34203.
`InMemoryCatalog` stores partitions in the file system in the Hive compatible form, for instance, it converts the `null` partition value to `"__HIVE_DEFAULT_PARTITION__"` but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below:
```
$ ./bin/spark-shell -c spark.sql.catalogImplementation=in-memory
```
```scala
scala> spark.conf.get("spark.sql.catalogImplementation")
res0: String = in-memory
scala> sql("CREATE TABLE tbl (col1 INT, p1 STRING) USING parquet PARTITIONED BY (p1)")
res1: org.apache.spark.sql.DataFrame = []
scala> sql("INSERT OVERWRITE TABLE tbl VALUES (0, null)")
res2: org.apache.spark.sql.DataFrame = []
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException: The following partitions not found in table 'tbl' database 'default':
Map(p1 -> null)
at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.dropPartitions(InMemoryCatalog.scala:440)
```
Yes. After the changes, `ALTER TABLE .. DROP PARTITION` can drop the `null` partition in `In-Memory` catalog:
```scala
scala> spark.table("tbl").show(false)
+----+----+
|col1|p1 |
+----+----+
|0 |null|
+----+----+
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
res4: org.apache.spark.sql.DataFrame = []
scala> spark.table("tbl").show(false)
+----+---+
|col1|p1 |
+----+---+
+----+---+
```
Added new test to `AlterTableDropPartitionSuiteBase`:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
```
Closes apache#31322 from MaxGekk/insert-overwrite-null-part.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit bfc0235)
Signed-off-by: Wenchen Fan <[email protected]>
…_PARTITION__` in v1 `In-Memory` catalog In the PR, I propose to convert `null` partition values to `"__HIVE_DEFAULT_PARTITION__"` before storing in the `In-Memory` catalog internally. Currently, the `In-Memory` catalog maintains null partitions as `"__HIVE_DEFAULT_PARTITION__"` in file system but as `null` values in memory that could cause some issues like in SPARK-34203. `InMemoryCatalog` stores partitions in the file system in the Hive compatible form, for instance, it converts the `null` partition value to `"__HIVE_DEFAULT_PARTITION__"` but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below: ``` $ ./bin/spark-shell -c spark.sql.catalogImplementation=in-memory ``` ```scala scala> spark.conf.get("spark.sql.catalogImplementation") res0: String = in-memory scala> sql("CREATE TABLE tbl (col1 INT, p1 STRING) USING parquet PARTITIONED BY (p1)") res1: org.apache.spark.sql.DataFrame = [] scala> sql("INSERT OVERWRITE TABLE tbl VALUES (0, null)") res2: org.apache.spark.sql.DataFrame = [] scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)") org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException: The following partitions not found in table 'tbl' database 'default': Map(p1 -> null) at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.dropPartitions(InMemoryCatalog.scala:440) ``` Yes. After the changes, `ALTER TABLE .. DROP PARTITION` can drop the `null` partition in `In-Memory` catalog: ```scala scala> spark.table("tbl").show(false) +----+----+ |col1|p1 | +----+----+ |0 |null| +----+----+ scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)") res4: org.apache.spark.sql.DataFrame = [] scala> spark.table("tbl").show(false) +----+---+ |col1|p1 | +----+---+ +----+---+ ``` Added new test to `AlterTableDropPartitionSuiteBase`: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite" ``` Closes #31322 from MaxGekk/insert-overwrite-null-part. Authored-by: Max Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan Doesn't it mean that it is less risky to backport it to 3.0 than #31095 for Hive catalog merged to 3.0 already? |
|
It's less risky, but also less useful (as it doesn't fix a real bug). If you have time to open a backport PR for 3.0, I'm OK too. |
|
Test build #134441 has finished for PR 31322 at commit
|
…_PARTITION__` in v1 `In-Memory` catalog
In the PR, I propose to convert `null` partition values to `"__HIVE_DEFAULT_PARTITION__"` before storing in the `In-Memory` catalog internally. Currently, the `In-Memory` catalog maintains null partitions as `"__HIVE_DEFAULT_PARTITION__"` in file system but as `null` values in memory that could cause some issues like in SPARK-34203.
`InMemoryCatalog` stores partitions in the file system in the Hive compatible form, for instance, it converts the `null` partition value to `"__HIVE_DEFAULT_PARTITION__"` but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below:
```
$ ./bin/spark-shell -c spark.sql.catalogImplementation=in-memory
```
```scala
scala> spark.conf.get("spark.sql.catalogImplementation")
res0: String = in-memory
scala> sql("CREATE TABLE tbl (col1 INT, p1 STRING) USING parquet PARTITIONED BY (p1)")
res1: org.apache.spark.sql.DataFrame = []
scala> sql("INSERT OVERWRITE TABLE tbl VALUES (0, null)")
res2: org.apache.spark.sql.DataFrame = []
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException: The following partitions not found in table 'tbl' database 'default':
Map(p1 -> null)
at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.dropPartitions(InMemoryCatalog.scala:440)
```
Yes. After the changes, `ALTER TABLE .. DROP PARTITION` can drop the `null` partition in `In-Memory` catalog:
```scala
scala> spark.table("tbl").show(false)
+----+----+
|col1|p1 |
+----+----+
|0 |null|
+----+----+
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
res4: org.apache.spark.sql.DataFrame = []
scala> spark.table("tbl").show(false)
+----+---+
|col1|p1 |
+----+---+
+----+---+
```
Added new test to `AlterTableDropPartitionSuiteBase`:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
```
Closes apache#31322 from MaxGekk/insert-overwrite-null-part.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit bfc0235)
Signed-off-by: Max Gekk <[email protected]>
|
Here is the backport to 3.0: #31326 |
|
Hi, this causes a cc @HyukjinKwon since he is the release manager of Apache Spark 3.1.0. |
|
Thank you, @MaxGekk ! |
…_PARTITION__` in v1 `In-Memory` catalog
### What changes were proposed in this pull request?
In the PR, I propose to convert `null` partition values to `"__HIVE_DEFAULT_PARTITION__"` before storing in the `In-Memory` catalog internally. Currently, the `In-Memory` catalog maintains null partitions as `"__HIVE_DEFAULT_PARTITION__"` in file system but as `null` values in memory that could cause some issues like in SPARK-34203.
### Why are the changes needed?
`InMemoryCatalog` stores partitions in the file system in the Hive compatible form, for instance, it converts the `null` partition value to `"__HIVE_DEFAULT_PARTITION__"` but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below:
```
$ ./bin/spark-shell -c spark.sql.catalogImplementation=in-memory
```
```scala
scala> spark.conf.get("spark.sql.catalogImplementation")
res0: String = in-memory
scala> sql("CREATE TABLE tbl (col1 INT, p1 STRING) USING parquet PARTITIONED BY (p1)")
res1: org.apache.spark.sql.DataFrame = []
scala> sql("INSERT OVERWRITE TABLE tbl VALUES (0, null)")
res2: org.apache.spark.sql.DataFrame = []
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException: The following partitions not found in table 'tbl' database 'default':
Map(p1 -> null)
at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.dropPartitions(InMemoryCatalog.scala:440)
```
### Does this PR introduce _any_ user-facing change?
Yes. After the changes, `ALTER TABLE .. DROP PARTITION` can drop the `null` partition in `In-Memory` catalog:
```scala
scala> spark.table("tbl").show(false)
+----+----+
|col1|p1 |
+----+----+
|0 |null|
+----+----+
scala> sql("ALTER TABLE tbl DROP PARTITION (p1 = null)")
res4: org.apache.spark.sql.DataFrame = []
scala> spark.table("tbl").show(false)
+----+---+
|col1|p1 |
+----+---+
+----+---+
```
### How was this patch tested?
Added new test to `AlterTableDropPartitionSuiteBase`:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableDropPartitionSuite"
```
Closes apache#31322 from MaxGekk/insert-overwrite-null-part.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In the PR, I propose to convert
nullpartition values to"__HIVE_DEFAULT_PARTITION__"before storing in theIn-Memorycatalog internally. Currently, theIn-Memorycatalog maintains null partitions as"__HIVE_DEFAULT_PARTITION__"in file system but asnullvalues in memory that could cause some issues like in SPARK-34203.Why are the changes needed?
InMemoryCatalogstores partitions in the file system in the Hive compatible form, for instance, it converts thenullpartition value to"__HIVE_DEFAULT_PARTITION__"but at the same time it keeps null as is internally. That causes an issue demonstrated by the example below:Does this PR introduce any user-facing change?
Yes. After the changes,
ALTER TABLE .. DROP PARTITIONcan drop thenullpartition inIn-Memorycatalog:How was this patch tested?
Added new test to
AlterTableDropPartitionSuiteBase: