diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index feef0f3cc9a4e..bf1fc0b145648 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -142,12 +142,6 @@ private FlinkOptions() { + "2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the\n" + " log file records(combines the two records with same key for base and log file records), then read the left log file records"); - public static final ConfigOption HIVE_STYLE_PARTITION = ConfigOptions - .key("hoodie.datasource.hive_style_partition") - .booleanType() - .defaultValue(false) - .withDescription("Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false"); - public static final ConfigOption UTC_TIMEZONE = ConfigOptions .key("read.utc-timezone") .booleanType() @@ -260,12 +254,20 @@ private FlinkOptions() { .withDescription("Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`.\n" + "Actual value obtained by invoking .toString(), default ''"); - public static final ConfigOption PARTITION_PATH_URL_ENCODE = ConfigOptions - .key("write.partition.url_encode") + public static final ConfigOption URL_ENCODE_PARTITIONING = ConfigOptions + .key(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY) .booleanType() .defaultValue(false) .withDescription("Whether to encode the partition path url, default false"); + public static final ConfigOption HIVE_STYLE_PARTITIONING = ConfigOptions + .key(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY) + .booleanType() + .defaultValue(false) + .withDescription("Whether to use Hive style partitioning.\n" + + "If set true, the names of partition folders follow = format.\n" + + "By default false (the names of partition folders are only partition values)"); + public static final ConfigOption KEYGEN_CLASS = ConfigOptions .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP) .stringType() diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8edf02cf04e78..bffd7d2a251ab 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -554,7 +554,7 @@ private boolean hasData() { && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); } - private String instantToWrite() { + private String instantToWrite(boolean hasData) { String instant = this.writeClient.getLastPendingInstant(this.actionType); // if exactly-once semantics turns on, // waits for the checkpoint notification until the checkpoint timeout threshold hits. @@ -565,7 +565,7 @@ private String instantToWrite() { // wait condition: // 1. there is no inflight instant // 2. the inflight instant does not change and the checkpoint has buffering data - while (instant == null || (instant.equals(this.currentInstant) && hasData())) { + while (instant == null || (instant.equals(this.currentInstant) && hasData)) { // sleep for a while try { if (waitingTime > ckpTimeout) { @@ -588,7 +588,7 @@ private String instantToWrite() { @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { - String instant = instantToWrite(); + String instant = instantToWrite(true); if (instant == null) { // in case there are empty checkpoints that has no input data @@ -619,7 +619,7 @@ private boolean flushBucket(DataBucket bucket) { @SuppressWarnings("unchecked, rawtypes") private void flushRemaining(boolean isEndInput) { - this.currentInstant = instantToWrite(); + this.currentInstant = instantToWrite(hasData()); if (this.currentInstant == null) { // in case there are empty checkpoints that has no input data throw new HoodieException("No inflight instant when flushing data!"); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index 62a67980d6373..db1d577c026c6 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -83,7 +83,7 @@ private static HiveSyncConfig buildSyncConfig(Configuration conf) { hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS); hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP); hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB); - hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE); + hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING); hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX); hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION); return hiveSyncConfig; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index ab450cca1955a..4c8074558e1f1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -164,6 +164,11 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--write-partition-url-encode"}, description = "Whether to encode the partition path url, default false") public Boolean writePartitionUrlEncode; + @Parameter(names = {"--hive-style-partitioning"}, description = "Whether to use Hive style partitioning.\n" + + "If set true, the names of partition folders follow = format.\n" + + "By default false (the names of partition folders are only partition values)") + public Boolean hiveStylePartitioning = false; + @Parameter(names = {"--write-task-max-size"}, description = "Maximum memory in MB for a write task, when the threshold hits,\n" + "it flushes the max size data bucket to avoid OOM, default 1GB") public Double writeTaskMaxSize = 1024D; @@ -314,7 +319,8 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setString(FlinkOptions.READ_AVRO_SCHEMA_PATH, config.avroSchemaPath); conf.setString(FlinkOptions.READ_AVRO_SCHEMA, config.avroSchema); conf.setBoolean(FlinkOptions.UTC_TIMEZONE, config.utcTimezone); - conf.setBoolean(FlinkOptions.PARTITION_PATH_URL_ENCODE, config.writePartitionUrlEncode); + conf.setBoolean(FlinkOptions.URL_ENCODE_PARTITIONING, config.writePartitionUrlEncode); + conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, config.hiveStylePartitioning); conf.setDouble(FlinkOptions.WRITE_TASK_MAX_SIZE, config.writeTaskMaxSize); conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, config.writeBatchSize); conf.setInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE, config.writeLogBlockSize); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 55ec46a2ebcb2..786023efa335f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -218,7 +218,7 @@ public Result applyFilters(List filters) { @Override public Optional>> listPartitions() { List> partitions = FilePathUtils.getPartitions(path, hadoopConf, - partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); + partitionKeys, defaultPartName, conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); return Optional.of(partitions); } @@ -446,7 +446,7 @@ public Path[] getReadPaths() { return partitionKeys.isEmpty() ? new Path[] {path} : FilePathUtils.partitionPath2ReadPath(path, partitionKeys, getOrFetchPartitions(), - conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION)); + conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)); } private static class LatestFileFilter extends FilePathFilter { diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java index e8ee5a244f15e..83607cd9c714d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/FilePathUtils.java @@ -347,7 +347,7 @@ public static Path[] getReadPaths( return new Path[] {path}; } else { final String defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME); - final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION); + final boolean hivePartition = conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING); List> partitionPaths = getPartitions(path, hadoopConf, partitionKeys, defaultParName, hivePartition); return partitionPath2ReadPath(path, partitionKeys, partitionPaths, hivePartition); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index d00c7fadc091b..aa7453cd9d4d4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -272,7 +272,7 @@ private ParquetColumnarRowSplitReader getReader(String path, int[] requiredPos) // generate partition specs. LinkedHashMap partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), - this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITION), + this.conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING), FilePathUtils.extractPartitionKeys(this.conf)); LinkedHashMap partObjects = new LinkedHashMap<>(); partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType( diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index c34fd59094b32..f9ff471431255 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -45,7 +45,9 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.io.File; import java.util.Collection; @@ -56,6 +58,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.utils.TestData.assertRowsEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -254,11 +257,14 @@ void testStreamReadWithDeletes() throws Exception { } @ParameterizedTest - @EnumSource(value = ExecMode.class) - void testWriteAndRead(ExecMode execMode) { + @MethodSource("configParams") + void testWriteAndRead(ExecMode execMode, boolean hiveStylePartitioning) { TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; Map options = new HashMap<>(); options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + if (hiveStylePartitioning) { + options.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true"); + } String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); tableEnv.executeSql(hoodieTableDDL); String insertInto = "insert into t1 values\n" @@ -576,6 +582,19 @@ private enum ExecMode { BATCH, STREAM } + /** + * Return test params => (execution mode, hive style partitioning). + */ + private static Stream configParams() { + Object[][] data = + new Object[][] { + {ExecMode.BATCH, false}, + {ExecMode.BATCH, true}, + {ExecMode.STREAM, false}, + {ExecMode.STREAM, true}}; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish