diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index bce7e24c57a5f..c543fd26041a1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -37,6 +37,7 @@ import java.io.Serializable; import java.math.BigDecimal; import java.sql.Timestamp; +import java.time.LocalDate; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -175,6 +176,9 @@ public String getPartitionPath(Object partitionVal) { timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue()); } else if (partitionVal instanceof BigDecimal) { timeMs = convertLongTimeToMillis(((BigDecimal) partitionVal).longValue()); + } else if (partitionVal instanceof LocalDate) { + // Avro uses LocalDate to represent the Date value internal. + timeMs = convertLongTimeToMillis(((LocalDate) partitionVal).toEpochDay()); } else if (partitionVal instanceof CharSequence) { if (!inputFormatter.isPresent()) { throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index e2be7d364b77f..8e202c692383d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -367,13 +367,14 @@ private FlinkOptions() { public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH"; public static final String PARTITION_FORMAT_DAY = "yyyyMMdd"; + public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd"; public static final ConfigOption PARTITION_FORMAT = ConfigOptions .key("write.partition.format") .stringType() .noDefaultValue() .withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n" + "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n" - + "2) 'yyyyMMdd' for DAY and INT."); + + "2) 'yyyyMMdd' for DATE and INT."); public static final ConfigOption INDEX_BOOTSTRAP_TASKS = ConfigOptions .key("write.index_bootstrap.tasks") diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 5464ea3f203fc..987ae10fe75ce 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -53,6 +53,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; @@ -243,6 +244,11 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table *

The UTC timezone is used as default. */ public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) { + if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) { + // the keygen clazz has been set up explicitly, skipping + return; + } + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName()); LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on", FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName()); @@ -257,13 +263,17 @@ public static void setupTimestampKeygenOptions(Configuration conf, DataType fiel conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name()); } - String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR); - conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat); } else { conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, - TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name()); - String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY); - conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat); + TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name()); + conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.DAYS.toString()); + + String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat); + // the option is actually useless, it only works for validation + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, FlinkOptions.PARTITION_FORMAT_DAY); } conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC"); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java index 822df063b5ab2..5643ca8d04744 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGen.java @@ -29,6 +29,8 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.apache.hudi.utils.TestData.insertRow; import static org.hamcrest.CoreMatchers.is; @@ -126,4 +128,40 @@ void testTimestampBasedKeyGenerator() { assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100")); assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100")); } + + @ParameterizedTest + @ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DASHED_DAY, FlinkOptions.PARTITION_FORMAT_DAY}) + void testDateBasedKeyGenerator(String partitionFormat) { + boolean dashed = partitionFormat.equals(FlinkOptions.PARTITION_FORMAT_DASHED_DAY); + Configuration conf = TestConfigurations.getDefaultConf("path1", TestConfigurations.ROW_DATA_TYPE_DATE); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "dt"); + conf.setString(FlinkOptions.PARTITION_FORMAT, partitionFormat); + HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.DATE()); + final RowData rowData1 = insertRow(TestConfigurations.ROW_TYPE_DATE, + StringData.fromString("id1"), StringData.fromString("Danny"), 23, 1); + final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE); + + assertThat(keyGen1.getRecordKey(rowData1), is("id1")); + String expectedPartition1 = dashed ? "1970-01-02" : "19700102"; + assertThat(keyGen1.getPartitionPath(rowData1), is(expectedPartition1)); + + // null record key and partition path + final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE_DATE, null, StringData.fromString("Danny"), 23, null); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2)); + String expectedPartition2 = dashed ? "1970-01-02" : "19700102"; + assertThat(keyGen1.getPartitionPath(rowData2), is(expectedPartition2)); + + // empty record key + String expectedPartition3 = dashed ? "1970-01-03" : "19700103"; + final RowData rowData3 = insertRow(TestConfigurations.ROW_TYPE_DATE, StringData.fromString(""), StringData.fromString("Danny"), 23, 2); + assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3)); + assertThat(keyGen1.getPartitionPath(rowData3), is(expectedPartition3)); + + // hive style partitioning + conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true); + final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE); + assertThat(keyGen2.getPartitionPath(rowData1), is("dt=" + expectedPartition1)); + assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + expectedPartition2)); + assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + expectedPartition3)); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 786a45cac7ac9..088ddb260dd5f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1028,6 +1028,37 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY}) + void testWriteAndReadWithDatePartitioning(String partitionFormat) { + TableEnvironment tableEnv = batchTableEnv; + String hoodieTableDDL = sql("t1") + .field("uuid varchar(20)") + .field("name varchar(10)") + .field("age int") + .field("ts date") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.PARTITION_FORMAT, partitionFormat) + .partitionField("ts") // use date as partition path field + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_DATE_PARTITION_T1); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + String expected = "[" + + "+I[id1, Danny, 23, 1970-01-01], " + + "+I[id2, Stephen, 33, 1970-01-01], " + + "+I[id3, Julian, 53, 1970-01-01], " + + "+I[id4, Fabian, 31, 1970-01-01], " + + "+I[id5, Sophia, 18, 1970-01-01], " + + "+I[id6, Emma, 20, 1970-01-01], " + + "+I[id7, Bob, 44, 1970-01-01], " + + "+I[id8, Han, 56, 1970-01-01]]"; + assertRowsEquals(result, expected); + } + @ParameterizedTest @ValueSource(strings = {"bulk_insert", "upsert"}) void testWriteReadDecimals(String operation) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index c6a1b0068aa50..efd365064454d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -419,7 +419,6 @@ void testSetupCleaningOptionsForSink() { @Test void testSetupTimestampBasedKeyGenForSink() { this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField"); - this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass"); // definition with simple primary key and partition path ResolvedSchema schema1 = SchemaBuilder.instance() .field("f0", DataTypes.INT().notNull()) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index e1106671799b7..f2e8f1ab67a7c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -20,6 +20,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; +import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; @@ -74,6 +75,15 @@ private TestConfigurations() { public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType(); + public static final DataType ROW_DATA_TYPE_DATE = DataTypes.ROW( + DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key + DataTypes.FIELD("name", DataTypes.VARCHAR(10)), + DataTypes.FIELD("age", DataTypes.INT()), + DataTypes.FIELD("dt", DataTypes.DATE())) + .notNull(); + + public static final RowType ROW_TYPE_DATE = (RowType) ROW_DATA_TYPE_DATE.getLogicalType(); + public static String getCreateHoodieTableDDL(String tableName, Map options) { return getCreateHoodieTableDDL(tableName, options, true, "partition"); } @@ -212,6 +222,15 @@ public static Configuration getDefaultConf(String tablePath) { return conf; } + public static Configuration getDefaultConf(String tablePath, DataType dataType) { + Configuration conf = new Configuration(); + conf.setString(FlinkOptions.PATH, tablePath); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString()); + conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable"); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition"); + return conf; + } + public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) { FlinkStreamerConfig streamerConf = new FlinkStreamerConfig(); streamerConf.targetBasePath = tablePath; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java index 1695e4e7149a9..b109fee0fff2a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestSQL.java @@ -61,4 +61,14 @@ private TestSQL() { + "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n" + "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n" + "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))"; + + public static final String INSERT_DATE_PARTITION_T1 = "insert into t1 values\n" + + "('id1','Danny',23,DATE '1970-01-01'),\n" + + "('id2','Stephen',33,DATE '1970-01-01'),\n" + + "('id3','Julian',53,DATE '1970-01-01'),\n" + + "('id4','Fabian',31,DATE '1970-01-01'),\n" + + "('id5','Sophia',18,DATE '1970-01-01'),\n" + + "('id6','Emma',20,DATE '1970-01-01'),\n" + + "('id7','Bob',44,DATE '1970-01-01'),\n" + + "('id8','Han',56,DATE '1970-01-01')"; }