diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 0339c4737e31..da4a7830db53 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -149,7 +149,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class); - public static List bootstrapAndTableOperationTestArgs() { + public static List tableTypeAndEnableOperationArgs() { return asList( Arguments.of(COPY_ON_WRITE, true), Arguments.of(COPY_ON_WRITE, false), @@ -162,7 +162,7 @@ public static List bootstrapAndTableOperationTestArgs() { * Metadata Table bootstrap scenarios. */ @ParameterizedTest - @MethodSource("bootstrapAndTableOperationTestArgs") + @MethodSource("tableTypeAndEnableOperationArgs") public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRollback) throws Exception { init(tableType, false); // bootstrap with few commits @@ -243,7 +243,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep * Test various table operations sync to Metadata Table correctly. */ @ParameterizedTest - @MethodSource("bootstrapAndTableOperationTestArgs") + @MethodSource("tableTypeAndEnableOperationArgs") public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { init(tableType, true, enableFullScan, false, false); doWriteInsertAndUpsert(testTable); @@ -319,6 +319,16 @@ public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exce validateMetadata(testTable, emptyList(), true); } + @Test + public void testMetadataInsertUpsertCleanNonPartitioned() throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType); + doWriteOperationNonPartitioned(testTable, "0000001", INSERT); + doWriteOperationNonPartitioned(testTable, "0000002", UPSERT); + testTable.doCleanBasedOnCommits("0000003", Arrays.asList("0000001")); + validateMetadata(testTable, emptyList(), true); + } + @ParameterizedTest @EnumSource(HoodieTableType.class) public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception { @@ -509,7 +519,7 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception { doWriteInsertAndUpsert(testTable); // trigger an upsert - doWriteOperationAndValidate(testTable, "0000003"); + doWriteOperation(testTable, "0000003", UPSERT); // trigger a commit and rollback doWriteOperation(testTable, "0000004"); @@ -549,6 +559,27 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception { validateMetadata(testTable, true); } + @Test + public void testRollbackOperationsNonPartitioned() throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType); + doWriteInsertAndUpsertNonPartitioned(testTable); + + // trigger an upsert + doWriteOperationNonPartitioned(testTable, "0000003", UPSERT); + + // trigger a commit and rollback + doWriteOperationNonPartitioned(testTable, "0000004", UPSERT); + doRollback(testTable, "0000004", "0000005"); + validateMetadata(testTable); + + // trigger few upserts and validate + for (int i = 6; i < 10; i++) { + doWriteOperationNonPartitioned(testTable, "000000" + i, UPSERT); + } + validateMetadata(testTable); + } + /** * Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table * timeline. @@ -573,7 +604,7 @@ public void testManualRollbacks(final boolean populateMateFields) throws Excepti .build(); initWriteConfigAndMetatableWriter(writeConfig, true); - doWriteInsertAndUpsert(testTable, "000001", "000002"); + doWriteInsertAndUpsert(testTable, "000001", "000002", false); for (int i = 3; i < 10; i++) { doWriteOperation(testTable, "00000" + i); @@ -674,8 +705,8 @@ private Long getNextCommitTime(long curCommitTime) { } @ParameterizedTest - @EnumSource(HoodieTableType.class) - public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception { + @MethodSource("tableTypeAndEnableOperationArgs") + public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType, boolean nonPartitionedDataset) throws Exception { init(tableType, true, true, true, false); long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime()); for (int i = 1; i < 25; i += 7) { @@ -687,17 +718,17 @@ public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) thro long commitTime6 = getNextCommitTime(commitTime5); long commitTime7 = getNextCommitTime(commitTime6); baseCommitTime = commitTime7; - doWriteOperation(testTable, Long.toString(commitTime1), INSERT); - doWriteOperation(testTable, Long.toString(commitTime2)); + doWriteOperation(testTable, Long.toString(commitTime1), INSERT, nonPartitionedDataset); + doWriteOperation(testTable, Long.toString(commitTime2), UPSERT, nonPartitionedDataset); doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1))); - doWriteOperation(testTable, Long.toString(commitTime4)); + doWriteOperation(testTable, Long.toString(commitTime4), UPSERT, nonPartitionedDataset); if (tableType == MERGE_ON_READ) { - doCompaction(testTable, Long.toString(commitTime5)); + doCompaction(testTable, Long.toString(commitTime5), nonPartitionedDataset); } - doWriteOperation(testTable, Long.toString(commitTime6)); + doWriteOperation(testTable, Long.toString(commitTime6), UPSERT, nonPartitionedDataset); doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7)); } - validateMetadata(testTable, emptyList(), true); + validateMetadata(testTable, emptyList(), nonPartitionedDataset); } // Some operations are not feasible with test table infra. hence using write client to test those cases. @@ -1563,8 +1594,12 @@ private void doPreBootstrapOperations(HoodieTestTable testTable, String commit1, validateMetadata(testTable); } + private void doWriteInsertAndUpsertNonPartitioned(HoodieTestTable testTable) throws Exception { + doWriteInsertAndUpsert(testTable, "0000001", "0000002", true); + } + private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { - doWriteInsertAndUpsert(testTable, "0000001", "0000002"); + doWriteInsertAndUpsert(testTable, "0000001", "0000002", false); } private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 51860ac9eeff..d6f151e34255 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -61,7 +61,7 @@ public void testTableOperations() throws Exception { } private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { - doWriteInsertAndUpsert(testTable, "0000001", "0000002"); + doWriteInsertAndUpsert(testTable, "0000001", "0000002", false); } private void verifyBaseMetadataTable() throws IOException { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index 25dfd292d26e..56c9f016bcc6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -109,10 +109,10 @@ public void clean() throws Exception { cleanupResources(); } - protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2) throws Exception { - testTable.doWriteOperation(commit1, INSERT, asList("p1", "p2"), asList("p1", "p2"), + protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2, boolean nonPartitioned) throws Exception { + testTable.doWriteOperation(commit1, INSERT, nonPartitioned ? asList("") : asList("p1", "p2"), nonPartitioned ? asList("") : asList("p1", "p2"), 4, false); - testTable.doWriteOperation(commit2, UPSERT, asList("p1", "p2"), + testTable.doWriteOperation(commit2, UPSERT, nonPartitioned ? asList("") : asList("p1", "p2"), 4, false); validateMetadata(testTable); } @@ -135,6 +135,18 @@ protected void doWriteOperationAndValidate(HoodieTestTable testTable, String com validateMetadata(testTable); } + protected void doWriteOperationNonPartitioned(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception { + testTable.doWriteOperation(commitTime, operationType, emptyList(), asList(""), 3); + } + + protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType, boolean nonPartitioned) throws Exception { + if (nonPartitioned) { + doWriteOperationNonPartitioned(testTable, commitTime, operationType); + } else { + doWriteOperation(testTable, commitTime, operationType); + } + } + protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception { testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3); } @@ -154,16 +166,28 @@ private void doCleanInternal(HoodieTestTable testTable, String commitTime, List< } } + protected void doCompactionNonPartitioned(HoodieTestTable testTable, String commitTime) throws Exception { + doCompactionInternal(testTable, commitTime, false, true); + } + + protected void doCompaction(HoodieTestTable testTable, String commitTime, boolean nonPartitioned) throws Exception { + doCompactionInternal(testTable, commitTime, false, nonPartitioned); + } + protected void doCompaction(HoodieTestTable testTable, String commitTime) throws Exception { - doCompactionInternal(testTable, commitTime, false); + doCompactionInternal(testTable, commitTime, false, false); + } + + protected void doCompactionNonPartitionedAndValidate(HoodieTestTable testTable, String commitTime) throws Exception { + doCompactionInternal(testTable, commitTime, true, true); } protected void doCompactionAndValidate(HoodieTestTable testTable, String commitTime) throws Exception { - doCompactionInternal(testTable, commitTime, true); + doCompactionInternal(testTable, commitTime, true, false); } - private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate) throws Exception { - testTable.doCompaction(commitTime, asList("p1", "p2")); + private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate, boolean nonPartitioned) throws Exception { + testTable.doCompaction(commitTime, nonPartitioned ? asList("") : asList("p1", "p2")); if (validate) { validateMetadata(testTable); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java index 39a24f05a4fa..baf5e7437dc3 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java @@ -235,7 +235,7 @@ private void bootstrapAndVerifyFailure() throws Exception { } private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception { - doWriteInsertAndUpsert(testTable, "0000100", "0000101"); + doWriteInsertAndUpsert(testTable, "0000100", "0000101", false); } private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchivalCommits) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index bdea909e093d..1e07485d433f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -532,6 +532,9 @@ public void validateMetadata(HoodieTestTable testTable, List inflightCom List fsPartitionPaths = testTable.getAllPartitionPaths(); List fsPartitions = new ArrayList<>(); fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString())); + if (fsPartitions.isEmpty()) { + fsPartitions.add(""); + } List metadataPartitions = tableMetadata.getAllPartitionPaths(); Collections.sort(fsPartitions); @@ -618,7 +621,7 @@ protected void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableM } } } - assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length); + assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length); // Block sizes should be valid Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0)); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index f5c176261039..ff9dbae64630 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -48,6 +48,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { String RECORDKEY_PARTITION_LIST = "__all_partitions__"; // The partition name used for non-partitioned tables String NON_PARTITIONED_NAME = "."; + String EMPTY_PARTITION_NAME = ""; // Base path of the Metadata Table relative to the dataset (.hoodie/metadata) static final String METADATA_TABLE_REL_PATH = HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata"; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index b4dfbbd631f9..7817d14e0036 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -52,6 +52,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; /** @@ -89,7 +90,7 @@ public static List convertMetadataToRecords(HoodieCommitMetadata c List records = new LinkedList<>(); List allPartitions = new LinkedList<>(); commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; + final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; allPartitions.add(partition); Map newFiles = new HashMap<>(writeStats.size()); @@ -133,7 +134,8 @@ public static List convertMetadataToRecords(HoodieCommitMetadata c public static List convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) { List records = new LinkedList<>(); int[] fileDeleteCount = {0}; - cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> { + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; // Files deleted from a partition List deletedFiles = partitionMetadata.getDeletePathPatterns(); HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), @@ -282,12 +284,13 @@ private static List convertFilesToRecords(Map List records = new LinkedList<>(); int[] fileChangeCount = {0, 0}; // deletes, appends - partitionToDeletedFiles.forEach((partition, deletedFiles) -> { + partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> { fileChangeCount[0] += deletedFiles.size(); + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; Option> filesAdded = Option.empty(); - if (partitionToAppendedFiles.containsKey(partition)) { - filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); + if (partitionToAppendedFiles.containsKey(partitionName)) { + filesAdded = Option.of(partitionToAppendedFiles.remove(partitionName)); } HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, @@ -295,7 +298,8 @@ private static List convertFilesToRecords(Map records.add(record); }); - partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { + partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> { + final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName; fileChangeCount[1] += appendedFileMap.size(); // Validate that no appended file has been deleted diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 486c4733a099..d04f71662c18 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -367,7 +367,9 @@ public static List getPartitionPaths(Path basePath) throws IOException { if (Files.notExists(basePath)) { return Collections.emptyList(); } - return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()); + return Files.list(basePath).filter(entry -> (!entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME) + && !entry.getFileName().toString().contains("parquet") && !entry.getFileName().toString().contains("log")) + && !entry.getFileName().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)).collect(Collectors.toList()); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 50c5858edce6..1a8ce69355e5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -602,7 +602,7 @@ public Path getPartitionPath(String partition) { } public List getAllPartitionPaths() throws IOException { - java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent(); + java.nio.file.Path basePathPath = Paths.get(basePath); return FileCreateUtils.getPartitionPaths(basePathPath); } @@ -660,8 +660,10 @@ public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOExcep return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream() .filter(entry -> { boolean toReturn = true; + String filePath = entry.getPath().toString(); String fileName = entry.getPath().getName(); - if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) { + if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE) || (!fileName.contains("log") && !fileName.contains("parquet")) + || filePath.contains("metadata")) { toReturn = false; } else { for (String inflight : inflightCommits) {