Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ private FlinkOptions() {
public static final ConfigOption<Double> INDEX_STATE_TTL = ConfigOptions
.key("index.state.ttl")
.doubleType()
.defaultValue(1.5D)
.withDescription("Index state ttl in days, default 1.5 day");
.defaultValue(0D)
.withDescription("Index state ttl in days, default stores the index permanently");

public static final ConfigOption<Boolean> INDEX_GLOBAL_ENABLED = ConfigOptions
.key("index.global.enabled")
.booleanType()
.defaultValue(false)
.defaultValue(true)
.withDescription("Whether to update index for the old partition path\n"
+ "if same key record with different partition path came in, default false");
+ "if same key record with different partition path came in, default true");

public static final ConfigOption<String> INDEX_PARTITION_REGEX = ConfigOptions
.key("index.partition.regex")
Expand Down Expand Up @@ -255,15 +255,17 @@ private FlinkOptions() {
+ "This will render any value set for the option in-effective");

/**
* Flag to indicate whether to drop duplicates upon insert.
* By default insert will accept duplicates, to gain extra performance.
* Flag to indicate whether to drop duplicates before insert/upsert.
* By default false to gain extra performance.
*/
public static final ConfigOption<Boolean> INSERT_DROP_DUPS = ConfigOptions
.key("write.insert.drop.duplicates")
public static final ConfigOption<Boolean> PRE_COMBINE = ConfigOptions
.key("write.precombine")
.booleanType()
.defaultValue(false)
.withDescription("Flag to indicate whether to drop duplicates upon insert.\n"
+ "By default insert will accept duplicates, to gain extra performance");
.withDescription("Flag to indicate whether to drop duplicates before insert/upsert.\n"
+ "By default these cases will accept duplicates, to gain extra performance:\n"
+ "1) insert operation;\n"
+ "2) upsert for MOR table, the MOR table deduplicate on reading");

public static final ConfigOption<Integer> RETRY_TIMES = ConfigOptions
.key("write.retry.times")
Expand Down Expand Up @@ -496,8 +498,8 @@ private FlinkOptions() {
public static final ConfigOption<Long> COMPACTION_TARGET_IO = ConfigOptions
.key("compaction.target_io")
.longType()
.defaultValue(5120L) // default 5 GB
.withDescription("Target IO per compaction (both read and write), default 5 GB");
.defaultValue(500 * 1024L) // default 500 GB
.withDescription("Target IO per compaction (both read and write), default 500 GB");

public static final ConfigOption<Boolean> CLEAN_ASYNC_ENABLED = ConfigOptions
.key("clean.async.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private boolean flushBucket(DataBucket bucket) {

List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
bucket.preWrite(records);
Expand Down Expand Up @@ -454,7 +454,7 @@ private void flushRemaining(boolean endInput) {
.forEach(bucket -> {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
if (config.getBoolean(FlinkOptions.INSERT_DROP_DUPS)) {
if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1);
}
bucket.preWrite(records);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ public void notifyCheckpointComplete(long checkpointId) {
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
final boolean committed = commitInstant(this.instant);
if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
if (committed || tableState.timeCompactionTriggerStrategy) {
if (committed) {
if (tableState.scheduleCompaction) {
// if async compaction is on, schedule the compaction
writeClient.scheduleCompaction(Option.empty());
}
}
if (committed) {
// start new instant.
startInstant();
// sync Hive if is enabled
Expand Down Expand Up @@ -532,7 +530,6 @@ private static class TableState implements Serializable {
final String commitAction;
final boolean isOverwrite;
final boolean scheduleCompaction;
final boolean timeCompactionTriggerStrategy;
final boolean syncHive;
final boolean syncMetadata;

Expand All @@ -542,7 +539,6 @@ private TableState(Configuration conf) {
HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE).toUpperCase(Locale.ROOT)));
this.isOverwrite = WriteOperationType.isOverwrite(this.operationType);
this.scheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.timeCompactionTriggerStrategy = StreamerUtil.isTimeCompactionTriggerStrategy(conf);
this.syncHive = conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED);
this.syncMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private PayloadCreation(
}

public static PayloadCreation instance(Configuration conf) throws Exception {
boolean shouldCombine = conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS)
boolean shouldCombine = conf.getBoolean(FlinkOptions.PRE_COMBINE)
|| WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)) == WriteOperationType.UPSERT;
String preCombineField = null;
final Class<?>[] argTypes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class FlinkStreamerConfig extends Configuration {

@Parameter(names = {"--filter-dupes"},
description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert.")
public Boolean filterDupes = false;
public Boolean preCombine = false;

@Parameter(names = {"--commit-on-errors"}, description = "Commit even when some records failed to be written.")
public Boolean commitOnErrors = false;
Expand Down Expand Up @@ -220,8 +220,8 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--compaction-max-memory"}, description = "Max memory in MB for compaction spillable map, default 100MB")
public Integer compactionMaxMemory = 100;

@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 5 GB")
public Long compactionTargetIo = 5120L;
@Parameter(names = {"--compaction-target-io"}, description = "Target IO per compaction (both read and write), default 500 GB")
public Long compactionTargetIo = 512000L;

@Parameter(names = {"--clean-async-enabled"}, description = "Whether to cleanup the old commits immediately on new commits, enabled by default")
public Boolean cleanAsyncEnabled = true;
Expand Down Expand Up @@ -312,7 +312,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.OPERATION, config.operation.value());
conf.setString(FlinkOptions.PRECOMBINE_FIELD, config.sourceOrderingField);
conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, config.payloadClassName);
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, config.filterDupes);
conf.setBoolean(FlinkOptions.PRE_COMBINE, config.preCombine);
conf.setInteger(FlinkOptions.RETRY_TIMES, Integer.parseInt(config.instantRetryTimes));
conf.setLong(FlinkOptions.RETRY_INTERVAL_MS, Long.parseLong(config.instantRetryInterval));
conf.setBoolean(FlinkOptions.IGNORE_FAILED, config.commitOnErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
Expand Down Expand Up @@ -157,6 +158,8 @@ private static void setupConfOptions(
setupHiveOptions(conf);
// read options
setupReadOptions(conf);
// write options
setupWriteOptions(conf);
// infer avro schema from physical DDL schema
inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType());
}
Expand Down Expand Up @@ -249,12 +252,6 @@ private static void setupCompactionOptions(Configuration conf) {
conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10);
conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20);
}
if (conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED)
&& !conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
&& FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.COMPACTION_TARGET_IO)) {
// if compaction schedule is on, tweak the target io to 500GB
conf.setLong(FlinkOptions.COMPACTION_TARGET_IO, 500 * 1024L);
}
if (StreamerUtil.allowDuplicateInserts(conf)) {
// no need for compaction if insert duplicates is allowed
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
Expand Down Expand Up @@ -282,6 +279,16 @@ private static void setupReadOptions(Configuration conf) {
}
}

/**
* Sets up the write options from the table definition.
*/
private static void setupWriteOptions(Configuration conf) {
if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
&& HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)) == HoodieTableType.COPY_ON_WRITE) {
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
}
}

/**
* Inferences the deserialization Avro schema from the table schema (e.g. the DDL)
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
Expand Down
12 changes: 1 addition & 11 deletions hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
.combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true)
.withMergeAllowDuplicateOnInserts(allowDuplicateInserts(conf))
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
Expand Down Expand Up @@ -302,16 +302,6 @@ public static boolean needsScheduleCompaction(Configuration conf) {
&& conf.getBoolean(FlinkOptions.COMPACTION_SCHEDULE_ENABLED);
}

/**
* Returns whether the compaction trigger strategy is time based.
*
* @param conf The flink configuration.
*/
public static boolean isTimeCompactionTriggerStrategy(Configuration conf) {
final String strategy = conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY);
return FlinkOptions.TIME_ELAPSED.equalsIgnoreCase(strategy) || FlinkOptions.NUM_OR_TIME.equalsIgnoreCase(strategy);
}

/**
* Creates the meta client for reader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void testInsert() throws Exception {
@Test
public void testInsertDuplicates() throws Exception {
// reset the config option
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);

// open the function and ingest data
Expand Down Expand Up @@ -470,7 +470,7 @@ public void testInsertWithMiniBatches() throws Exception {
public void testInsertWithDeduplication() throws Exception {
// reset the config option
conf.setDouble(FlinkOptions.WRITE_BATCH_SIZE, 0.0008); // 839 bytes batch size
conf.setBoolean(FlinkOptions.INSERT_DROP_DUPS, true);
conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), conf);

// open the function and ingest data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ void testWriteGlobalIndex() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, true)
.option(FlinkOptions.INSERT_DROP_DUPS, true)
.option(FlinkOptions.PRE_COMBINE, true)
.end();
streamTableEnv.executeSql(hoodieTableDDL);

Expand All @@ -674,7 +674,7 @@ void testWriteLocalIndex() {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.INDEX_GLOBAL_ENABLED, false)
.option(FlinkOptions.INSERT_DROP_DUPS, true)
.option(FlinkOptions.PRE_COMBINE, true)
.end();
streamTableEnv.executeSql(hoodieTableDDL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,21 @@ void testSetupTimestampBasedKeyGenForSink() {
is("UTC"));
}

@Test
void testSetupWriteOptionsForSink() {
final HoodieTableSink tableSink1 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
final Configuration conf1 = tableSink1.getConf();
assertThat(conf1.get(FlinkOptions.PRE_COMBINE), is(true));

// set up operation as 'insert'
this.conf.setString(FlinkOptions.OPERATION, "insert");
HoodieTableSink tableSink2 =
(HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf));
Configuration conf2 = tableSink2.getConf();
assertThat(conf2.get(FlinkOptions.PRE_COMBINE), is(false));
}

// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions packaging/hudi-flink-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>javax.servlet.</pattern>
<shadedPattern>${flink.bundle.shade.prefix}javax.servlet.</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro.</pattern>
<shadedPattern>${flink.bundle.shade.prefix}org.apache.avro.</shadedPattern>
Expand Down