Skip to content

Commit a9afcb8

Browse files
committed
[HUDI-4303] Use Hive sentinel value as partition default to avoid type caste issues
1 parent 348519f commit a9afcb8

File tree

7 files changed

+37
-31
lines changed

7 files changed

+37
-31
lines changed

hudi-common/src/main/java/org/apache/hudi/common/util/PartitionPathEncodeUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
*/
2626
public class PartitionPathEncodeUtils {
2727

28-
public static final String DEFAULT_PARTITION_PATH = "default";
28+
public static final String DEFAULT_PARTITION_PATH = "__HIVE_DEFAULT_PARTITION__";
2929

3030
static BitSet charToEscape = new BitSet(128);
3131
static {

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Map;
4646
import java.util.Set;
4747

48+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
4849
import static org.apache.hudi.config.HoodieClusteringConfig.DAYBASED_LOOKBACK_PARTITIONS;
4950
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION;
5051
import static org.apache.hudi.config.HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION;
@@ -81,7 +82,7 @@ private FlinkOptions() {
8182
public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions
8283
.key("partition.default_name")
8384
.stringType()
84-
.defaultValue("default") // keep sync with hoodie style
85+
.defaultValue(DEFAULT_PARTITION_PATH) // keep sync with hoodie style
8586
.withDescription("The default partition name in case the dynamic partition"
8687
+ " column value is null/empty string");
8788

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGen.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.hudi.common.model.HoodieKey;
2222
import org.apache.hudi.common.util.Option;
23-
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
2423
import org.apache.hudi.common.util.StringUtils;
2524
import org.apache.hudi.configuration.FlinkOptions;
2625
import org.apache.hudi.exception.HoodieKeyException;
@@ -39,6 +38,9 @@
3938
import java.util.Arrays;
4039
import java.util.List;
4140

41+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
42+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName;
43+
4244
/**
4345
* Key generator for {@link RowData}.
4446
*/
@@ -52,7 +54,6 @@ public class RowDataKeyGen implements Serializable {
5254
private static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
5355
private static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
5456

55-
private static final String DEFAULT_PARTITION_PATH = "default";
5657
private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
5758

5859
private final String[] recordKeyFields;
@@ -192,7 +193,7 @@ private static String getRecordPartitionPath(
192193
: DEFAULT_PARTITION_PATH);
193194
} else {
194195
if (encodePartitionPath) {
195-
partValue = PartitionPathEncodeUtils.escapePathName(partValue);
196+
partValue = escapePathName(partValue);
196197
}
197198
partitionPath.append(hiveStylePartitioning ? partField + "=" + partValue : partValue);
198199
}
@@ -227,7 +228,7 @@ public static String getPartitionPath(
227228
partitionPath = DEFAULT_PARTITION_PATH;
228229
}
229230
if (encodePartitionPath) {
230-
partitionPath = PartitionPathEncodeUtils.escapePathName(partitionPath);
231+
partitionPath = escapePathName(partitionPath);
231232
}
232233
if (hiveStylePartitioning) {
233234
partitionPath = partField + "=" + partitionPath;

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.List;
4040
import java.util.Map;
4141

42+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
4243
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_FORMAT_DAY;
4344

4445
/**
@@ -178,7 +179,7 @@ public class FlinkStreamerConfig extends Configuration {
178179

179180
@Parameter(names = {"--partition-default-name"},
180181
description = "The default partition name in case the dynamic partition column value is null/empty string")
181-
public String partitionDefaultName = "default";
182+
public String partitionDefaultName = DEFAULT_PARTITION_PATH;
182183

183184
@Parameter(names = {"--index-bootstrap-enabled"},
184185
description = "Whether to bootstrap the index state from existing hoodie table, default false")

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.junit.jupiter.params.ParameterizedTest;
3333
import org.junit.jupiter.params.provider.ValueSource;
3434

35+
import static org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
3536
import static org.apache.hudi.utils.TestData.insertRow;
3637
import static org.hamcrest.CoreMatchers.is;
3738
import static org.hamcrest.MatcherAssert.assertThat;
@@ -54,19 +55,19 @@ void testSimpleKeyAndPartition() {
5455
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, StringData.fromString("Danny"), 23,
5556
TimestampData.fromEpochMillis(1), null);
5657
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
57-
assertThat(keyGen1.getPartitionPath(rowData2), is("default"));
58+
assertThat(keyGen1.getPartitionPath(rowData2), is(DEFAULT_PARTITION_PATH));
5859
// empty record key and partition path
5960
final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString("Danny"), 23,
6061
TimestampData.fromEpochMillis(1), StringData.fromString(""));
6162
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
62-
assertThat(keyGen1.getPartitionPath(rowData3), is("default"));
63+
assertThat(keyGen1.getPartitionPath(rowData3), is(DEFAULT_PARTITION_PATH));
6364

6465
// hive style partitioning
6566
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
6667
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE);
67-
assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1"));
68-
assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default"));
69-
assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default"));
68+
assertThat(keyGen2.getPartitionPath(rowData1), is(String.format("partition=%s", "par1")));
69+
assertThat(keyGen2.getPartitionPath(rowData2), is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
70+
assertThat(keyGen2.getPartitionPath(rowData3), is(String.format("partition=%s", DEFAULT_PARTITION_PATH)));
7071
}
7172

7273
@Test
@@ -83,19 +84,19 @@ void testComplexKeyAndPartition() {
8384
// null record key and partition path
8485
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE, null, null, 23, null, null);
8586
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
86-
assertThat(keyGen1.getPartitionPath(rowData2), is("default/default"));
87+
assertThat(keyGen1.getPartitionPath(rowData2), is(String.format("%s/%s", DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)));
8788
// empty record key and partition path
8889
final RowData rowData3 = insertRow(StringData.fromString(""), StringData.fromString(""), 23,
8990
TimestampData.fromEpochMillis(1), StringData.fromString(""));
9091
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
91-
assertThat(keyGen1.getPartitionPath(rowData3), is("default/1970-01-01T00:00:00.001"));
92+
assertThat(keyGen1.getPartitionPath(rowData3), is(String.format("%s/1970-01-01T00:00:00.001", DEFAULT_PARTITION_PATH)));
9293

9394
// hive style partitioning
9495
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
9596
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE);
96-
assertThat(keyGen2.getPartitionPath(rowData1), is("partition=par1/ts=1970-01-01T00:00:00.001"));
97-
assertThat(keyGen2.getPartitionPath(rowData2), is("partition=default/ts=default"));
98-
assertThat(keyGen2.getPartitionPath(rowData3), is("partition=default/ts=1970-01-01T00:00:00.001"));
97+
assertThat(keyGen2.getPartitionPath(rowData1), is(String.format("partition=%s/ts=%s", "par1", "1970-01-01T00:00:00.001")));
98+
assertThat(keyGen2.getPartitionPath(rowData2), is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)));
99+
assertThat(keyGen2.getPartitionPath(rowData3), is(String.format("partition=%s/ts=%s", DEFAULT_PARTITION_PATH, "1970-01-01T00:00:00.001")));
99100
}
100101

101102
@Test

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
2626
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
2727
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
2828
import org.apache.hudi.common.util
29+
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
2930
import org.apache.hudi.config.HoodieWriteConfig
3031
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
3132
import org.apache.hudi.keygen._
@@ -41,7 +42,7 @@ import org.joda.time.DateTime
4142
import org.joda.time.format.DateTimeFormat
4243
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue, fail}
4344
import org.junit.jupiter.api.function.Executable
44-
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test}
45+
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
4546
import org.junit.jupiter.params.ParameterizedTest
4647
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
4748

@@ -614,13 +615,14 @@ class TestCOWDataSource extends HoodieClientTestBase {
614615
.load(basePath)
615616
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= col("driver")).count() == 0)
616617

617-
// Use the `driver,rider` field as the partition key, If no such field exists, the default value `default` is used
618+
// Use the `driver,rider` field as the partition key, If no such field exists,
619+
// the default value [[PartitionPathEncodeUtils#DEFAULT_PARTITION_PATH]] is used
618620
writer = getDataFrameWriter(classOf[SimpleKeyGenerator].getName)
619621
writer.partitionBy("driver", "rider")
620622
.save(basePath)
621623
recordsReadDF = spark.read.format("org.apache.hudi")
622624
.load(basePath)
623-
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("default")).count() == 0)
625+
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit(DEFAULT_PARTITION_PATH)).count() == 0)
624626
}
625627

626628
@Test def testSparkPartitionByWithComplexKeyGenerator() {

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.hudi
1919

20-
import org.apache.spark.sql.Row
20+
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
2121

2222
class TestShowPartitions extends HoodieSparkSqlTestBase {
2323

@@ -90,7 +90,7 @@ class TestShowPartitions extends HoodieSparkSqlTestBase {
9090
| select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
9191
""".stripMargin)
9292
checkAnswer(s"show partitions $tableName")(
93-
Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), Seq("dt=default")
93+
Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), Seq("dt=%s".format(DEFAULT_PARTITION_PATH))
9494
)
9595
}
9696

@@ -138,27 +138,27 @@ class TestShowPartitions extends HoodieSparkSqlTestBase {
138138
Seq("year=2021/month=01/day=01"),
139139
Seq("year=2021/month=01/day=02"),
140140
Seq("year=2021/month=02/day=01"),
141-
Seq("year=2021/month=02/day=default"),
142-
Seq("year=2021/month=default/day=01"),
143-
Seq("year=default/month=01/day=default"),
144-
Seq("year=default/month=01/day=02"),
145-
Seq("year=default/month=default/day=01"),
146-
Seq("year=2022/month=default/day=default")
141+
Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
142+
Seq("year=2021/month=%s/day=01".format(DEFAULT_PARTITION_PATH)),
143+
Seq("year=%s/month=01/day=%s".format(DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)),
144+
Seq("year=%s/month=01/day=02".format(DEFAULT_PARTITION_PATH)),
145+
Seq("year=%s/month=%s/day=01".format(DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH)),
146+
Seq("year=2022/month=%s/day=%s".format(DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH))
147147
)
148148

149149
// check partial partitions
150150
checkAnswer(s"show partitions $tableName partition(year='2021', month='01', day='01')")(
151151
Seq("year=2021/month=01/day=01")
152152
)
153153
checkAnswer(s"show partitions $tableName partition(year='2021', month='02')")(
154-
Seq("year=2021/month=02/day=default"),
154+
Seq("year=2021/month=02/day=%s".format(DEFAULT_PARTITION_PATH)),
155155
Seq("year=2021/month=02/day=01")
156156
)
157157
checkAnswer(s"show partitions $tableName partition(day='01')")(
158158
Seq("year=2021/month=02/day=01"),
159-
Seq("year=2021/month=default/day=01"),
159+
Seq("year=2021/month=%s/day=01".format(DEFAULT_PARTITION_PATH)),
160160
Seq("year=2021/month=01/day=01"),
161-
Seq("year=default/month=default/day=01")
161+
Seq("year=%s/month=%s/day=01".format(DEFAULT_PARTITION_PATH, DEFAULT_PARTITION_PATH))
162162
)
163163
}
164164
}

0 commit comments

Comments
 (0)