diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index b2359f4b36839..fe02370652d96 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -115,15 +115,15 @@ private FlinkOptions() { public static final ConfigOption INDEX_STATE_TTL = ConfigOptions .key("index.state.ttl") .doubleType() - .defaultValue(1.5D) - .withDescription("Index state ttl in days, default 1.5 day"); + .defaultValue(0D) + .withDescription("Index state ttl in days, default stores the index permanently"); public static final ConfigOption INDEX_GLOBAL_ENABLED = ConfigOptions .key("index.global.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("Whether to update index for the old partition path\n" - + "if same key record with different partition path came in, default false"); + + "if same key record with different partition path came in, default true"); public static final ConfigOption INDEX_PARTITION_REGEX = ConfigOptions .key("index.partition.regex") @@ -255,15 +255,17 @@ private FlinkOptions() { + "This will render any value set for the option in-effective"); /** - * Flag to indicate whether to drop duplicates upon insert. - * By default insert will accept duplicates, to gain extra performance. + * Flag to indicate whether to drop duplicates before insert/upsert. + * By default false to gain extra performance. */ - public static final ConfigOption INSERT_DROP_DUPS = ConfigOptions - .key("write.insert.drop.duplicates") + public static final ConfigOption PRE_COMBINE = ConfigOptions + .key("write.precombine") .booleanType() .defaultValue(false) - .withDescription("Flag to indicate whether to drop duplicates upon insert.\n" - + "By default insert will accept duplicates, to gain extra performance"); + .withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n" + + "By default these cases will accept duplicates, to gain extra performance:\n" + + "1) insert operation;\n" + + "2) upsert for MOR table, the MOR table deduplicate on reading"); public static final ConfigOption RETRY_TIMES = ConfigOptions .key("write.retry.times") @@ -496,8 +498,8 @@ private FlinkOptions() { public static final ConfigOption COMPACTION_TARGET_IO = ConfigOptions .key("compaction.target_io") .longType() - .defaultValue(5120L) // default 5 GB - .withDescription("Target IO per compaction (both read and write), default 5 GB"); + .defaultValue(500 * 1024L) // default 500 GB + .withDescription("Target IO per compaction (both read and write), default 500 GB"); public static final ConfigOption CLEAN_ASYNC_ENABLED = ConfigOptions .key("clean.async.enabled") diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index f8eea2e89b657..d510de2b105f5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -419,7 +419,7 @@ private boolean flushBucket(DataBucket bucket) { List records = bucket.writeBuffer(); ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); - if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } bucket.preWrite(records); @@ -454,7 +454,7 @@ private void flushRemaining(boolean endInput) { .forEach(bucket -> { List records = bucket.writeBuffer(); if (records.size() > 0) { - if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) { + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); } bucket.preWrite(records); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 51280c3da8683..a7faeca5c205f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -211,13 +211,11 @@ public void notifyCheckpointComplete(long checkpointId) { // the stream write task snapshot and flush the data buffer synchronously in sequence, // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract) final boolean committed = commitInstant(this.instant); - if (tableState.scheduleCompaction) { - // if async compaction is on, schedule the compaction - if (committed || tableState.timeCompactionTriggerStrategy) { + if (committed) { + if (tableState.scheduleCompaction) { + // if async compaction is on, schedule the compaction writeClient.scheduleCompaction(Option.empty()); } - } - if (committed) { // start new instant. startInstant(); // sync Hive if is enabled @@ -532,7 +530,6 @@ private static class TableState implements Serializable { final String commitAction; final boolean isOverwrite; final boolean scheduleCompaction; - final boolean timeCompactionTriggerStrategy; final boolean syncHive; final boolean syncMetadata; @@ -542,7 +539,6 @@ private TableState(Configuration conf) { HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT))); this.isOverwrite = WriteOperationType.isOverwrite(this.operationType); this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf); - this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf); this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED); this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java index d10447f816d8e..f2cb60d51c11b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java @@ -55,7 +55,7 @@ private PayloadCreation( } public static PayloadCreation instance(Configuration conf) throws Exception { - boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS) + boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE) || WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT; String preCombineField = null; final Class[] argTypes; diff --git a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java index c552bed222956..7ca91f7e24e02 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java +++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java @@ -112,7 +112,7 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--filter-dupes"}, description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.") - public Boolean filterDupes = false; + public Boolean preCombine = false; @Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.") public Boolean commitOnErrors = false; @@ -220,8 +220,8 @@ public class FlinkStreamerConfig extends Configuration { @Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB") public Integer compactionMaxMemory = 100; - @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB") - public Long compactionTargetIo = 5120L; + @Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB") + public Long compactionTargetIo = 512000L; @Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default") public Boolean cleanAsyncEnabled = true; @@ -312,7 +312,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt conf.setString(FlinkOptions.OPERATION, config.operation.value()); conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField); conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName); - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes); + conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine); conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes)); conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval)); conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors); diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 627bc2c29acf6..c19c831042def 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; @@ -157,6 +158,8 @@ private static void setupConfOptions( setupHiveOptions(conf); // read options setupReadOptions(conf); + // write options + setupWriteOptions(conf); // infer avro schema from physical DDL schema inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); } @@ -249,12 +252,6 @@ private static void setupCompactionOptions(Configuration conf) { conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); } - if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED) - && !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED) - && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) { - // if compaction schedule is on, tweak the target io to 500GB - conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L); - } if (StreamerUtil.allowDuplicateInserts(conf)) { // no need for compaction if insert duplicates is allowed conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); @@ -282,6 +279,16 @@ private static void setupReadOptions(Configuration conf) { } } + /** + * Sets up the write options from the table definition. + */ + private static void setupWriteOptions(Configuration conf) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) + && HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) { + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); + } + } + /** * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 7fb550d472a53..7e7bfaa3da174 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -154,7 +154,7 @@ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { HoodieWriteConfig.newBuilder() .withEngineType(EngineType.FLINK) .withPath(conf.getString(FlinkOptions.PATH)) - .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true) + .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) .withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf)) .withCompactionConfig( HoodieCompactionConfig.newBuilder() @@ -302,16 +302,6 @@ public static boolean needsScheduleCompaction(Configuration conf) { && conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED); } - /** - * Returns whether the compaction trigger strategy is time based. - * - * @param conf The flink configuration. - */ - public static boolean isTimeCompactionTriggerStrategy(Configuration conf) { - final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY); - return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy); - } - /** * Creates the meta client for reader. * diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 624a8e8c45e36..da418f965d863 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -275,7 +275,7 @@ public void testInsert() throws Exception { @Test public void testInsertDuplicates() throws Exception { // reset the config option - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data @@ -470,7 +470,7 @@ public void testInsertWithMiniBatches() throws Exception { public void testInsertWithDeduplication() throws Exception { // reset the config option conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size - conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true); + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf); // open the function and ingest data diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index 621cd1c434ed7..9b4ea008441a0 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -652,7 +652,7 @@ void testWriteGlobalIndex() { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, true) - .option(FlinkOptions.INSERT_DROP_DUPS, true) + .option(FlinkOptions.PRE_COMBINE, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); @@ -674,7 +674,7 @@ void testWriteLocalIndex() { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.INDEX_GLOBAL_ENABLED, false) - .option(FlinkOptions.INSERT_DROP_DUPS, true) + .option(FlinkOptions.PRE_COMBINE, true) .end(); streamTableEnv.executeSql(hoodieTableDDL); diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index bbbb49d4277c6..ad55dbf2380d7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -394,6 +394,21 @@ void testSetupTimestampBasedKeyGenForSink() { is("UTC")); } + @Test + void testSetupWriteOptionsForSink() { + final HoodieTableSink tableSink1 = + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); + final Configuration conf1 = tableSink1.getConf(); + assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true)); + + // set up operation as 'insert' + this.conf.setString(FlinkOptions.OPERATION, "insert"); + HoodieTableSink tableSink2 = + (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf)); + Configuration conf2 = tableSink2.getConf(); + assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false)); + } + // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 120cba3a20b9c..c9b14587c1d86 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -152,6 +152,10 @@ + + javax.servlet. + ${flink.bundle.shade.prefix}javax.servlet. + org.apache.avro. ${flink.bundle.shade.prefix}org.apache.avro.