diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 376b36e3dcd8..4499df87e579 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.table.format.FilePathUtils; import org.apache.flink.configuration.Configuration; @@ -92,4 +93,11 @@ public static boolean isDeltaTimeCompaction(Configuration conf) { final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toLowerCase(Locale.ROOT); return FlinkOptions.TIME_ELAPSED.equals(strategy) || FlinkOptions.NUM_OR_TIME.equals(strategy); } + + /** + * Returns whether the table is partitioned. + */ + public static boolean isPartitionedTable(Configuration conf) { + return FilePathUtils.extractPartitionKeys(conf).length > 0; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 855a2ca1b4d0..5f156e839f1e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -20,6 +20,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.CleanFunction; import org.apache.hudi.sink.StreamWriteOperator; import org.apache.hudi.sink.append.AppendWriteOperator; @@ -129,10 +130,10 @@ public static DataStream bootstrap( final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); if (overwrite) { return rowDataToHoodieRecord(conf, rowType, dataStream); - } else if (bounded && !globalIndex) { + } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { return boundedBootstrap(conf, rowType, defaultParallelism, dataStream); } else { - return streamBootstrap(conf, rowType, defaultParallelism, dataStream); + return streamBootstrap(conf, rowType, defaultParallelism, dataStream, bounded); } } @@ -140,10 +141,11 @@ private static DataStream streamBootstrap( Configuration conf, RowType rowType, int defaultParallelism, - DataStream dataStream) { + DataStream dataStream, + boolean bounded) { DataStream dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); - if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) { + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) { dataStream1 = dataStream1 .transform( "index_bootstrap", @@ -161,13 +163,10 @@ private static DataStream boundedBootstrap( RowType rowType, int defaultParallelism, DataStream dataStream) { - final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); - if (partitionFields.length > 0) { - RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); - // shuffle by partition keys - dataStream = dataStream - .keyBy(rowDataKeyGen::getPartitionPath); - } + final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + // shuffle by partition keys + dataStream = dataStream + .keyBy(rowDataKeyGen::getPartitionPath); return rowDataToHoodieRecord(conf, rowType, dataStream) .transform( diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 285df4931e1d..7d00a658b7f6 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -424,6 +424,8 @@ void testBatchModeUpsertWithoutPartition(HoodieTableType tableType) { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.TABLE_NAME, tableType.name()) + .option("hoodie.parquet.small.file.limit", "0") // invalidate the small file strategy + .option("hoodie.parquet.max.file.size", "0") .noPartition() .end(); tableEnv.executeSql(hoodieTableDDL); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 0eafb1281ff4..46cad3e826d3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -247,6 +247,11 @@ public Sql option(ConfigOption option, Object val) { return this; } + public Sql option(String key, Object val) { + this.options.put(key, val.toString()); + return this; + } + public Sql options(Map options) { this.options.putAll(options); return this;