diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 1fd46d31e5ba7..453fe45a36f72 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -73,9 +73,13 @@ public static String getPartitionPathFromGenericRecord(GenericRecord genericReco */ public static String[] extractRecordKeys(String recordKey) { String[] fieldKV = recordKey.split(","); + return Arrays.stream(fieldKV).map(kv -> { final String[] kvArray = kv.split(":"); - if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { + // a simple key + if (kvArray.length == 1) { + return kvArray[0]; + } else if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) { return null; } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) { return ""; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java index 06a6fcd7d76d3..abe85e0a1debe 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/TestKeyGenUtils.java @@ -25,6 +25,9 @@ public class TestKeyGenUtils { @Test public void testExtractRecordKeys() { + String[] s0 = KeyGenUtils.extractRecordKeys("1"); + Assertions.assertArrayEquals(new String[]{"1"}, s0); + String[] s1 = KeyGenUtils.extractRecordKeys("id:1"); Assertions.assertArrayEquals(new String[]{"1"}, s1); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 9f821619089fd..632da8dac8075 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.table.HoodieTableSource; import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; @@ -48,6 +49,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -290,6 +292,60 @@ void testReadWithDeletesMOR() throws Exception { assertThat(actual, is(expected)); } + @Test + void testReadWithDeletesMORWithSimpleKeyGenWithChangeLogDisabled() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "false"); + options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition"); + options.put(FlinkOptions.KEYGEN_TYPE.key(), KeyGeneratorType.SIMPLE.name()); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result); + final String expected = "[" + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "-D[id3, null, null, null, null], " + + "-D[id5, null, null, null, null], " + + "-D[id9, null, null, null, null]]"; + assertThat(actual, is(expected)); + } + + @Test + void testReadWithDeletesMORWithComplexKeyGenWithChangeLogDisabled() throws Exception { + Map options = new HashMap<>(); + options.put(FlinkOptions.CHANGELOG_ENABLED.key(), "false"); + options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition,name"); + options.put(FlinkOptions.KEYGEN_TYPE.key(), KeyGeneratorType.COMPLEX.name()); + beforeEach(HoodieTableType.MERGE_ON_READ, options); + + // write another commit to read again + TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf); + + InputFormat inputFormat = this.tableSource.getInputFormat(); + assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class)); + ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true); + + List result = readData(inputFormat); + + final String actual = TestData.rowDataToString(result); + final String expected = "[" + + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], " + + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], " + + "-D[id3, null, null, null, null], " + + "-D[id5, null, null, null, null], " + + "-D[id9, null, null, null, null]]"; + assertThat(actual, is(expected)); + } + @Test void testReadWithDeletesCOW() throws Exception { beforeEach(HoodieTableType.COPY_ON_WRITE); @@ -626,7 +682,7 @@ private HoodieTableSource getTableSource(Configuration conf) { return new HoodieTableSource( TestConfigurations.TABLE_SCHEMA, new Path(tempFile.getAbsolutePath()), - Collections.singletonList("partition"), + Arrays.asList(conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",")), "default", conf); }