Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -175,6 +176,9 @@ public String getPartitionPath(Object partitionVal) {
timeMs = convertLongTimeToMillis(((Integer) partitionVal).longValue());
} else if (partitionVal instanceof BigDecimal) {
timeMs = convertLongTimeToMillis(((BigDecimal) partitionVal).longValue());
} else if (partitionVal instanceof LocalDate) {
// Avro uses LocalDate to represent the Date value internal.
timeMs = convertLongTimeToMillis(((LocalDate) partitionVal).toEpochDay());
} else if (partitionVal instanceof CharSequence) {
if (!inputFormatter.isPresent()) {
throw new HoodieException("Missing inputformatter. Ensure " + KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,13 +367,14 @@ private FlinkOptions() {

public static final String PARTITION_FORMAT_HOUR = "yyyyMMddHH";
public static final String PARTITION_FORMAT_DAY = "yyyyMMdd";
public static final String PARTITION_FORMAT_DASHED_DAY = "yyyy-MM-dd";
public static final ConfigOption<String> PARTITION_FORMAT = ConfigOptions
.key("write.partition.format")
.stringType()
.noDefaultValue()
.withDescription("Partition path format, only valid when 'write.datetime.partitioning' is true, default is:\n"
+ "1) 'yyyyMMddHH' for timestamp(3) WITHOUT TIME ZONE, LONG, FLOAT, DOUBLE, DECIMAL;\n"
+ "2) 'yyyyMMdd' for DAY and INT.");
+ "2) 'yyyyMMdd' for DATE and INT.");

public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = ConfigOptions
.key("write.index_bootstrap.tasks")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

Expand Down Expand Up @@ -243,6 +244,11 @@ private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table
* <p>The UTC timezone is used as default.
*/
public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) {
if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
// the keygen clazz has been set up explicitly, skipping
return;
}

conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName());
LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on",
FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName());
Expand All @@ -257,13 +263,17 @@ public static void setupTimestampKeygenOptions(Configuration conf, DataType fiel
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
}
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat);
} else {
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
TimestampBasedAvroKeyGenerator.TimestampType.DATE_STRING.name());
String partitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, partitionFormat);
TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.DAYS.toString());

String outputPartitionFormat = conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat);
// the option is actually useless, it only works for validation
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, FlinkOptions.PARTITION_FORMAT_DAY);
}
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.apache.hudi.utils.TestData.insertRow;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -126,4 +128,40 @@ void testTimestampBasedKeyGenerator() {
assertThat(keyGen2.getPartitionPath(rowData2), is("ts=1970010100"));
assertThat(keyGen2.getPartitionPath(rowData3), is("ts=1970010100"));
}

@ParameterizedTest
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DASHED_DAY, FlinkOptions.PARTITION_FORMAT_DAY})
void testDateBasedKeyGenerator(String partitionFormat) {
boolean dashed = partitionFormat.equals(FlinkOptions.PARTITION_FORMAT_DASHED_DAY);
Configuration conf = TestConfigurations.getDefaultConf("path1", TestConfigurations.ROW_DATA_TYPE_DATE);
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "dt");
conf.setString(FlinkOptions.PARTITION_FORMAT, partitionFormat);
HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.DATE());
final RowData rowData1 = insertRow(TestConfigurations.ROW_TYPE_DATE,
StringData.fromString("id1"), StringData.fromString("Danny"), 23, 1);
final RowDataKeyGen keyGen1 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE);

assertThat(keyGen1.getRecordKey(rowData1), is("id1"));
String expectedPartition1 = dashed ? "1970-01-02" : "19700102";
assertThat(keyGen1.getPartitionPath(rowData1), is(expectedPartition1));

// null record key and partition path
final RowData rowData2 = insertRow(TestConfigurations.ROW_TYPE_DATE, null, StringData.fromString("Danny"), 23, null);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData2));
String expectedPartition2 = dashed ? "1970-01-02" : "19700102";
assertThat(keyGen1.getPartitionPath(rowData2), is(expectedPartition2));

// empty record key
String expectedPartition3 = dashed ? "1970-01-03" : "19700103";
final RowData rowData3 = insertRow(TestConfigurations.ROW_TYPE_DATE, StringData.fromString(""), StringData.fromString("Danny"), 23, 2);
assertThrows(HoodieKeyException.class, () -> keyGen1.getRecordKey(rowData3));
assertThat(keyGen1.getPartitionPath(rowData3), is(expectedPartition3));

// hive style partitioning
conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
final RowDataKeyGen keyGen2 = RowDataKeyGen.instance(conf, TestConfigurations.ROW_TYPE_DATE);
assertThat(keyGen2.getPartitionPath(rowData1), is("dt=" + expectedPartition1));
assertThat(keyGen2.getPartitionPath(rowData2), is("dt=" + expectedPartition2));
assertThat(keyGen2.getPartitionPath(rowData3), is("dt=" + expectedPartition3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,37 @@ void testWriteAndReadWithTimestampPartitioning(ExecMode execMode) {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@ParameterizedTest
@ValueSource(strings = {FlinkOptions.PARTITION_FORMAT_DAY, FlinkOptions.PARTITION_FORMAT_DASHED_DAY})
void testWriteAndReadWithDatePartitioning(String partitionFormat) {
TableEnvironment tableEnv = batchTableEnv;
String hoodieTableDDL = sql("t1")
.field("uuid varchar(20)")
.field("name varchar(10)")
.field("age int")
.field("ts date")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.PARTITION_FORMAT, partitionFormat)
.partitionField("ts") // use date as partition path field
.end();
tableEnv.executeSql(hoodieTableDDL);

execInsertSql(tableEnv, TestSQL.INSERT_DATE_PARTITION_T1);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
String expected = "["
+ "+I[id1, Danny, 23, 1970-01-01], "
+ "+I[id2, Stephen, 33, 1970-01-01], "
+ "+I[id3, Julian, 53, 1970-01-01], "
+ "+I[id4, Fabian, 31, 1970-01-01], "
+ "+I[id5, Sophia, 18, 1970-01-01], "
+ "+I[id6, Emma, 20, 1970-01-01], "
+ "+I[id7, Bob, 44, 1970-01-01], "
+ "+I[id8, Han, 56, 1970-01-01]]";
assertRowsEquals(result, expected);
}

@ParameterizedTest
@ValueSource(strings = {"bulk_insert", "upsert"})
void testWriteReadDecimals(String operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@ void testSetupCleaningOptionsForSink() {
@Test
void testSetupTimestampBasedKeyGenForSink() {
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "dummyField");
this.conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, "dummyKeyGenClass");
// definition with simple primary key and partition path
ResolvedSchema schema1 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;

Expand Down Expand Up @@ -74,6 +75,15 @@ private TestConfigurations() {

public static final RowType ROW_TYPE_WIDER = (RowType) ROW_DATA_TYPE_WIDER.getLogicalType();

public static final DataType ROW_DATA_TYPE_DATE = DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),// record key
DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("dt", DataTypes.DATE()))
.notNull();

public static final RowType ROW_TYPE_DATE = (RowType) ROW_DATA_TYPE_DATE.getLogicalType();

public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
return getCreateHoodieTableDDL(tableName, options, true, "partition");
}
Expand Down Expand Up @@ -212,6 +222,15 @@ public static Configuration getDefaultConf(String tablePath) {
return conf;
}

public static Configuration getDefaultConf(String tablePath, DataType dataType) {
Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, tablePath);
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
conf.setString(FlinkOptions.TABLE_NAME, "TestHoodieTable");
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
return conf;
}

public static FlinkStreamerConfig getDefaultStreamerConf(String tablePath) {
FlinkStreamerConfig streamerConf = new FlinkStreamerConfig();
streamerConf.targetBasePath = tablePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,14 @@ private TestSQL() {
+ "(1, array['abc1', 'def1'], array[1, 1], map['abc1', 1, 'def1', 3], row(array['abc1', 'def1'], row(1, 'abc1'))),\n"
+ "(2, array['abc2', 'def2'], array[2, 2], map['abc2', 1, 'def2', 3], row(array['abc2', 'def2'], row(2, 'abc2'))),\n"
+ "(3, array['abc3', 'def3'], array[3, 3], map['abc3', 1, 'def3', 3], row(array['abc3', 'def3'], row(3, 'abc3')))";

public static final String INSERT_DATE_PARTITION_T1 = "insert into t1 values\n"
+ "('id1','Danny',23,DATE '1970-01-01'),\n"
+ "('id2','Stephen',33,DATE '1970-01-01'),\n"
+ "('id3','Julian',53,DATE '1970-01-01'),\n"
+ "('id4','Fabian',31,DATE '1970-01-01'),\n"
+ "('id5','Sophia',18,DATE '1970-01-01'),\n"
+ "('id6','Emma',20,DATE '1970-01-01'),\n"
+ "('id7','Bob',44,DATE '1970-01-01'),\n"
+ "('id8','Han',56,DATE '1970-01-01')";
}