diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index c140d40af88fc..556d0b2ef2bf2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -39,6 +39,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorType; +import org.apache.hudi.sink.overwrite.PartitionOverwriteMode; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; import org.apache.hudi.util.ClientIds; @@ -613,6 +614,16 @@ private FlinkOptions() { .defaultValue(128) .withDescription("Sort memory in MB, default 128MB"); + @AdvancedConfig + public static final ConfigOption WRITE_PARTITION_OVERWRITE_MODE = ConfigOptions + .key("write.partition.overwrite.mode") + .stringType() + .defaultValue(PartitionOverwriteMode.STATIC.name()) + .withDescription("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: static and dynamic. " + + "Static mode deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting. " + + "Dynamic mode doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime. " + + "By default we use static mode to keep the same behavior of previous version."); + // this is only for internal use @AdvancedConfig public static final ConfigOption WRITE_CLIENT_ID = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 944e795dc2fea..bfde0b0e2b0e0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -35,6 +35,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.sink.overwrite.PartitionOverwriteMode; import org.apache.hudi.table.format.FilePathUtils; import org.apache.flink.configuration.ConfigOption; @@ -241,6 +242,14 @@ public static boolean isInsertOverwrite(Configuration conf) { || conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE.value()); } + /** + * Returns whether the operation is INSERT OVERWRITE dynamic partition. + */ + public static boolean overwriteDynamicPartition(Configuration conf) { + return conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE.value()) + || conf.getString(FlinkOptions.WRITE_PARTITION_OVERWRITE_MODE).equalsIgnoreCase(PartitionOverwriteMode.DYNAMIC.name()); + } + /** * Returns whether the read start commit is specific commit timestamp. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java new file mode 100644 index 0000000000000..700c7432e2560 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/overwrite/PartitionOverwriteMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.overwrite; + +import org.apache.hudi.common.config.EnumDescription; +import org.apache.hudi.common.config.EnumFieldDescription; + +/** + * Mode of INSERT OVERWRITE partitioned table. + */ +@EnumDescription("Mode of INSERT OVERWRITE a partitioned data source table.") +public enum PartitionOverwriteMode { + + @EnumFieldDescription("Deletes all the partitions that match the partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before overwriting.") + STATIC, + + @EnumFieldDescription("Doesn't delete partitions ahead, and only overwrite those partitions that have data written into it at runtime.") + DYNAMIC +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index ec0db6b12627c..e80e2510a6567 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -148,7 +148,7 @@ public String asSummaryString() { @Override public void applyStaticPartition(Map partitions) { // #applyOverwrite should have been invoked. - if (this.overwrite && partitions.size() > 0) { + if (this.overwrite && !partitions.isEmpty()) { this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value()); } } @@ -156,9 +156,12 @@ public void applyStaticPartition(Map partitions) { @Override public void applyOverwrite(boolean overwrite) { this.overwrite = overwrite; - // set up the operation as INSERT_OVERWRITE_TABLE first, - // if there are explicit partitions, #applyStaticPartition would overwrite the option. - this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value()); + if (OptionsResolver.overwriteDynamicPartition(conf)) { + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value()); + } else { + // if there are explicit partitions, #applyStaticPartition would overwrite the option. + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value()); + } } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 5fbf44062a062..4ea92fbb84586 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -777,8 +777,8 @@ void testInsertOverwrite(String indexType, HoodieTableType tableType) { () -> tableEnv.sqlQuery("select * from t1").execute().collect()); assertRowsEquals(result2, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE); - // overwrite the whole table - final String insertInto3 = "insert overwrite t1 values\n" + // overwrite the dynamic partition + final String insertInto3 = "insert overwrite t1 /*+ OPTIONS('write.partition.overwrite.mode'='dynamic') */ values\n" + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n" + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n"; @@ -786,16 +786,31 @@ void testInsertOverwrite(String indexType, HoodieTableType tableType) { List result3 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); - final String expected = "[" - + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], " - + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]"; - assertRowsEquals(result3, expected); + assertRowsEquals(result3, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION); // execute the same statement again and check the result execInsertSql(tableEnv, insertInto3); + assertRowsEquals(result3, TestData.DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION); + + // overwrite the whole table + final String insertInto4 = "insert overwrite t1 values\n" + + "('id1','Danny',24,TIMESTAMP '1970-01-01 00:00:01', 'par1'),\n" + + "('id2','Stephen',34,TIMESTAMP '1970-01-01 00:00:02', 'par2')\n"; + + execInsertSql(tableEnv, insertInto4); + List result4 = CollectionUtil.iterableToList( () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "+I[id1, Danny, 24, 1970-01-01T00:00:01, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:02, par2]]"; assertRowsEquals(result4, expected); + + // execute the same statement again and check the result + execInsertSql(tableEnv, insertInto4); + List result5 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result5, expected); } @ParameterizedTest diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index db9cd65b9f152..65c8e82ada166 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -287,6 +287,22 @@ public class TestData { TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) ); + // data set of test_source.data with partition 'par1' and 'par2' overwrite + public static List DATA_SET_SOURCE_INSERT_OVERWRITE_DYNAMIC_PARTITION = Arrays.asList( + insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24, + TimestampData.fromEpochMillis(1000), StringData.fromString("par1")), + insertRow(StringData.fromString("id2"), StringData.fromString("Stephen"), 34, + TimestampData.fromEpochMillis(2000), StringData.fromString("par2")), + insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 18, + TimestampData.fromEpochMillis(5000), StringData.fromString("par3")), + insertRow(StringData.fromString("id6"), StringData.fromString("Emma"), 20, + TimestampData.fromEpochMillis(6000), StringData.fromString("par3")), + insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44, + TimestampData.fromEpochMillis(7000), StringData.fromString("par4")), + insertRow(StringData.fromString("id8"), StringData.fromString("Han"), 56, + TimestampData.fromEpochMillis(8000), StringData.fromString("par4")) + ); + public static List DATA_SET_UPDATE_DELETE = Arrays.asList( // this is update insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 24,