diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index f9c9898f20192..4e507436b942e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -103,10 +103,19 @@ public static HoodieTableMetaClient init(Configuration hadoopConf, String basePa } public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, - HoodieFileFormat baseFileFormat) + HoodieFileFormat baseFileFormat) throws IOException { + return init(hadoopConf, basePath, tableType, baseFileFormat, false, null, true); + } + + public static HoodieTableMetaClient init(Configuration hadoopConf, String basePath, HoodieTableType tableType, + HoodieFileFormat baseFileFormat, boolean setKeyGen, String keyGenerator, boolean populateMetaFields) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), baseFileFormat.toString()); + if (setKeyGen) { + properties.setProperty("hoodie.datasource.write.keygenerator.class", keyGenerator); + } + properties.setProperty("hoodie.populate.meta.fields", Boolean.toString(populateMetaFields)); return init(hadoopConf, basePath, tableType, properties); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java index 5ae91dc46d96f..f11ca88d29c0f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieCopyOnWriteTableInputFormat.java @@ -43,6 +43,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.realtime.HoodieVirtualKeyInfo; @@ -275,16 +276,16 @@ protected static Option getHoodieVirtualKeyInfo(HoodieTabl if (tableConfig.populateMetaFields()) { return Option.empty(); } - TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { Schema schema = tableSchemaResolver.getTableAvroSchema(); + boolean isNonPartitionedKeyGen = StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp()); return Option.of( new HoodieVirtualKeyInfo( tableConfig.getRecordKeyFieldProp(), - tableConfig.getPartitionFieldProp(), + isNonPartitionedKeyGen ? Option.empty() : Option.of(tableConfig.getPartitionFieldProp()), schema.getField(tableConfig.getRecordKeyFieldProp()).pos(), - schema.getField(tableConfig.getPartitionFieldProp()).pos())); + isNonPartitionedKeyGen ? Option.empty() : Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos()))); } catch (Exception exception) { throw new HoodieException("Fetching table schema failed with exception ", exception); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java index 763b80d4a1f8c..41031e613d112 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieVirtualKeyInfo.java @@ -18,6 +18,8 @@ package org.apache.hudi.hadoop.realtime; +import org.apache.hudi.common.util.Option; + import java.io.Serializable; /** @@ -26,11 +28,11 @@ public class HoodieVirtualKeyInfo implements Serializable { private final String recordKeyField; - private final String partitionPathField; + private final Option partitionPathField; private final int recordKeyFieldIndex; - private final int partitionPathFieldIndex; + private final Option partitionPathFieldIndex; - public HoodieVirtualKeyInfo(String recordKeyField, String partitionPathField, int recordKeyFieldIndex, int partitionPathFieldIndex) { + public HoodieVirtualKeyInfo(String recordKeyField, Option partitionPathField, int recordKeyFieldIndex, Option partitionPathFieldIndex) { this.recordKeyField = recordKeyField; this.partitionPathField = partitionPathField; this.recordKeyFieldIndex = recordKeyFieldIndex; @@ -41,7 +43,7 @@ public String getRecordKeyField() { return recordKeyField; } - public String getPartitionPathField() { + public Option getPartitionPathField() { return partitionPathField; } @@ -49,7 +51,7 @@ public int getRecordKeyFieldIndex() { return recordKeyFieldIndex; } - public int getPartitionPathFieldIndex() { + public Option getPartitionPathFieldIndex() { return partitionPathFieldIndex; } @@ -57,9 +59,9 @@ public int getPartitionPathFieldIndex() { public String toString() { return "HoodieVirtualKeyInfo{" + "recordKeyField='" + recordKeyField + '\'' - + ", partitionPathField='" + partitionPathField + '\'' + + ", partitionPathField='" + (partitionPathField.isPresent() ? partitionPathField.get() : "null") + '\'' + ", recordKeyFieldIndex=" + recordKeyFieldIndex - + ", partitionPathFieldIndex=" + partitionPathFieldIndex + + ", partitionPathFieldIndex=" + (partitionPathFieldIndex.isPresent() ? partitionPathFieldIndex.get() : "-1") + '}'; } } \ No newline at end of file diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index d9b1923c60f80..4b0b2d6ea79e2 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -107,9 +107,12 @@ default void writeToOutput(DataOutput out) throws IOException { } else { InputSplitUtils.writeBoolean(true, out); InputSplitUtils.writeString(virtualKeyInfoOpt.get().getRecordKeyField(), out); - InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField(), out); InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getRecordKeyFieldIndex()), out); - InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out); + InputSplitUtils.writeBoolean(virtualKeyInfoOpt.get().getPartitionPathField().isPresent(), out); + if (virtualKeyInfoOpt.get().getPartitionPathField().isPresent()) { + InputSplitUtils.writeString(virtualKeyInfoOpt.get().getPartitionPathField().get(), out); + InputSplitUtils.writeString(String.valueOf(virtualKeyInfoOpt.get().getPartitionPathFieldIndex()), out); + } } } @@ -130,9 +133,10 @@ default void readFromInput(DataInput in) throws IOException { boolean hoodieVirtualKeyPresent = InputSplitUtils.readBoolean(in); if (hoodieVirtualKeyPresent) { String recordKeyField = InputSplitUtils.readString(in); - String partitionPathField = InputSplitUtils.readString(in); int recordFieldIndex = Integer.parseInt(InputSplitUtils.readString(in)); - int partitionPathIndex = Integer.parseInt(InputSplitUtils.readString(in)); + boolean isPartitionPathFieldPresent = InputSplitUtils.readBoolean(in); + Option partitionPathField = isPartitionPathFieldPresent ? Option.of(InputSplitUtils.readString(in)) : Option.empty(); + Option partitionPathIndex = isPartitionPathFieldPresent ? Option.of(Integer.parseInt(InputSplitUtils.readString(in))) : Option.empty(); setVirtualKeyInfo(Option.of(new HoodieVirtualKeyInfo(recordKeyField, partitionPathField, recordFieldIndex, partitionPathIndex))); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index eb44769d9ff32..42038e61f66a3 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -87,7 +87,9 @@ public static void addRequiredProjectionFields(Configuration configuration, Opti } else { HoodieVirtualKeyInfo hoodieVirtualKey = hoodieVirtualKeyInfo.get(); addProjectionField(configuration, hoodieVirtualKey.getRecordKeyField(), hoodieVirtualKey.getRecordKeyFieldIndex()); - addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField(), hoodieVirtualKey.getPartitionPathFieldIndex()); + if (hoodieVirtualKey.getPartitionPathField().isPresent()) { + addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get()); + } } } @@ -99,7 +101,8 @@ public static boolean requiredProjectionFieldsExistInConf(Configuration configur && readColNames.contains(HoodieRecord.PARTITION_PATH_METADATA_FIELD); } else { return readColNames.contains(hoodieVirtualKeyInfo.get().getRecordKeyField()) - && readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField()); + && (hoodieVirtualKeyInfo.get().getPartitionPathField().isPresent() ? readColNames.contains(hoodieVirtualKeyInfo.get().getPartitionPathField().get()) + : true); } } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java index 10ea84f5dca11..ccc57d0f6185c 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java @@ -27,12 +27,15 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.Job; + +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -40,6 +43,9 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.utils.HoodieHiveUtils; @@ -55,6 +61,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; @@ -167,6 +174,26 @@ public void testInputFormatLoad() throws IOException { assertEquals(10, files.length); } + @Test + public void testInputFormatLoadForNonPartitionedAndVirtualKeyedTable() throws IOException { + // initial commit + Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); + File partitionDir = InputFormatTestUtil.prepareCustomizedTable(basePath, baseFileFormat, 10, "100", true, false, + true, schema); + HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(), Option.empty(), WriteOperationType.UPSERT, + schema.toString(), HoodieTimeline.COMMIT_ACTION); + FileCreateUtils.createCommit(basePath.toString(), "100", Option.of(commitMetadata)); + + // Add the paths + FileInputFormat.setInputPaths(jobConf, partitionDir.getPath()); + + InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10); + assertEquals(10, inputSplits.length); + + FileStatus[] files = inputFormat.listStatus(jobConf); + assertEquals(10, files.length); + } + @Test public void testInputFormatLoadWithEmptyTable() throws IOException { // initial hoodie table diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 1081e43175630..db8002cd2d462 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -71,16 +71,35 @@ public class InputFormatTestUtil { private static String TEST_WRITE_TOKEN = "1-0-1"; public static File prepareTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, - String commitNumber) + String commitNumber) throws IOException { + return prepareCustomizedTable(basePath, baseFileFormat, numberOfFiles, commitNumber, false, true, false, null); + } + + public static File prepareCustomizedTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles, + String commitNumber, boolean useNonPartitionedKeyGen, boolean populateMetaFields, boolean injectData, Schema schema) throws IOException { - HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, - baseFileFormat); + if (useNonPartitionedKeyGen) { + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat, true, "org.apache.hudi.keygen.NonpartitionedKeyGenerator", populateMetaFields); + } else { + HoodieTestUtils.init(HoodieTestUtils.getDefaultHadoopConf(), basePath.toString(), HoodieTableType.COPY_ON_WRITE, + baseFileFormat); + } java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", "05", "01")); setupPartition(basePath, partitionPath); - return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles, - commitNumber); + if (injectData) { + try { + createSimpleData(schema, partitionPath, numberOfFiles, 100, commitNumber); + return partitionPath.toFile(); + } catch (Exception e) { + throw new IOException("Excpetion thrown while writing data ", e); + } + } else { + return simulateInserts(partitionPath.toFile(), baseFileFormat.getFileExtension(), "fileId1", numberOfFiles, + commitNumber); + } } public static File prepareMultiPartitionTable(java.nio.file.Path basePath, HoodieFileFormat baseFileFormat, int numberOfFiles,