Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {

private static final Logger LOG = LogManager.getLogger(TestHoodieBackedMetadata.class);

public static List<Arguments> bootstrapAndTableOperationTestArgs() {
public static List<Arguments> tableTypeAndEnableOperationArgs() {
return asList(
Arguments.of(COPY_ON_WRITE, true),
Arguments.of(COPY_ON_WRITE, false),
Expand All @@ -162,7 +162,7 @@ public static List<Arguments> bootstrapAndTableOperationTestArgs() {
* Metadata Table bootstrap scenarios.
*/
@ParameterizedTest
@MethodSource("bootstrapAndTableOperationTestArgs")
@MethodSource("tableTypeAndEnableOperationArgs")
public void testMetadataTableBootstrap(HoodieTableType tableType, boolean addRollback) throws Exception {
init(tableType, false);
// bootstrap with few commits
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
* Test various table operations sync to Metadata Table correctly.
*/
@ParameterizedTest
@MethodSource("bootstrapAndTableOperationTestArgs")
@MethodSource("tableTypeAndEnableOperationArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan, false, false);
doWriteInsertAndUpsert(testTable);
Expand Down Expand Up @@ -319,6 +319,16 @@ public void testMetadataInsertUpsertClean(HoodieTableType tableType) throws Exce
validateMetadata(testTable, emptyList(), true);
}

@Test
public void testMetadataInsertUpsertCleanNonPartitioned() throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType);
doWriteOperationNonPartitioned(testTable, "0000001", INSERT);
doWriteOperationNonPartitioned(testTable, "0000002", UPSERT);
testTable.doCleanBasedOnCommits("0000003", Arrays.asList("0000001"));
validateMetadata(testTable, emptyList(), true);
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception {
Expand Down Expand Up @@ -509,7 +519,7 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception {
doWriteInsertAndUpsert(testTable);

// trigger an upsert
doWriteOperationAndValidate(testTable, "0000003");
doWriteOperation(testTable, "0000003", UPSERT);

// trigger a commit and rollback
doWriteOperation(testTable, "0000004");
Expand Down Expand Up @@ -549,6 +559,27 @@ public void testRollbackOperations(HoodieTableType tableType) throws Exception {
validateMetadata(testTable, true);
}

@Test
public void testRollbackOperationsNonPartitioned() throws Exception {
HoodieTableType tableType = COPY_ON_WRITE;
init(tableType);
doWriteInsertAndUpsertNonPartitioned(testTable);

// trigger an upsert
doWriteOperationNonPartitioned(testTable, "0000003", UPSERT);

// trigger a commit and rollback
doWriteOperationNonPartitioned(testTable, "0000004", UPSERT);
doRollback(testTable, "0000004", "0000005");
validateMetadata(testTable);

// trigger few upserts and validate
for (int i = 6; i < 10; i++) {
doWriteOperationNonPartitioned(testTable, "000000" + i, UPSERT);
}
validateMetadata(testTable);
}

/**
* Test that manual rollbacks work correctly and enough timeline history is maintained on the metadata table
* timeline.
Expand All @@ -573,7 +604,7 @@ public void testManualRollbacks(final boolean populateMateFields) throws Excepti
.build();

initWriteConfigAndMetatableWriter(writeConfig, true);
doWriteInsertAndUpsert(testTable, "000001", "000002");
doWriteInsertAndUpsert(testTable, "000001", "000002", false);

for (int i = 3; i < 10; i++) {
doWriteOperation(testTable, "00000" + i);
Expand Down Expand Up @@ -674,8 +705,8 @@ private Long getNextCommitTime(long curCommitTime) {
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
@MethodSource("tableTypeAndEnableOperationArgs")
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType, boolean nonPartitionedDataset) throws Exception {
init(tableType, true, true, true, false);
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
for (int i = 1; i < 25; i += 7) {
Expand All @@ -687,17 +718,17 @@ public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) thro
long commitTime6 = getNextCommitTime(commitTime5);
long commitTime7 = getNextCommitTime(commitTime6);
baseCommitTime = commitTime7;
doWriteOperation(testTable, Long.toString(commitTime1), INSERT);
doWriteOperation(testTable, Long.toString(commitTime2));
doWriteOperation(testTable, Long.toString(commitTime1), INSERT, nonPartitionedDataset);
doWriteOperation(testTable, Long.toString(commitTime2), UPSERT, nonPartitionedDataset);
doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1)));
doWriteOperation(testTable, Long.toString(commitTime4));
doWriteOperation(testTable, Long.toString(commitTime4), UPSERT, nonPartitionedDataset);
if (tableType == MERGE_ON_READ) {
doCompaction(testTable, Long.toString(commitTime5));
doCompaction(testTable, Long.toString(commitTime5), nonPartitionedDataset);
}
doWriteOperation(testTable, Long.toString(commitTime6));
doWriteOperation(testTable, Long.toString(commitTime6), UPSERT, nonPartitionedDataset);
doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7));
}
validateMetadata(testTable, emptyList(), true);
validateMetadata(testTable, emptyList(), nonPartitionedDataset);
}

// Some operations are not feasible with test table infra. hence using write client to test those cases.
Expand Down Expand Up @@ -1563,8 +1594,12 @@ private void doPreBootstrapOperations(HoodieTestTable testTable, String commit1,
validateMetadata(testTable);
}

private void doWriteInsertAndUpsertNonPartitioned(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002", true);
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002");
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}

private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testTableOperations() throws Exception {
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000001", "0000002");
doWriteInsertAndUpsert(testTable, "0000001", "0000002", false);
}

private void verifyBaseMetadataTable() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public void clean() throws Exception {
cleanupResources();
}

protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2) throws Exception {
testTable.doWriteOperation(commit1, INSERT, asList("p1", "p2"), asList("p1", "p2"),
protected void doWriteInsertAndUpsert(HoodieTestTable testTable, String commit1, String commit2, boolean nonPartitioned) throws Exception {
testTable.doWriteOperation(commit1, INSERT, nonPartitioned ? asList("") : asList("p1", "p2"), nonPartitioned ? asList("") : asList("p1", "p2"),
4, false);
testTable.doWriteOperation(commit2, UPSERT, asList("p1", "p2"),
testTable.doWriteOperation(commit2, UPSERT, nonPartitioned ? asList("") : asList("p1", "p2"),
4, false);
validateMetadata(testTable);
}
Expand All @@ -135,6 +135,18 @@ protected void doWriteOperationAndValidate(HoodieTestTable testTable, String com
validateMetadata(testTable);
}

protected void doWriteOperationNonPartitioned(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList(""), 3);
}

protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType, boolean nonPartitioned) throws Exception {
if (nonPartitioned) {
doWriteOperationNonPartitioned(testTable, commitTime, operationType);
} else {
doWriteOperation(testTable, commitTime, operationType);
}
}

protected void doWriteOperation(HoodieTestTable testTable, String commitTime, WriteOperationType operationType) throws Exception {
testTable.doWriteOperation(commitTime, operationType, emptyList(), asList("p1", "p2"), 3);
}
Expand All @@ -154,16 +166,28 @@ private void doCleanInternal(HoodieTestTable testTable, String commitTime, List<
}
}

protected void doCompactionNonPartitioned(HoodieTestTable testTable, String commitTime) throws Exception {
doCompactionInternal(testTable, commitTime, false, true);
}

protected void doCompaction(HoodieTestTable testTable, String commitTime, boolean nonPartitioned) throws Exception {
doCompactionInternal(testTable, commitTime, false, nonPartitioned);
}

protected void doCompaction(HoodieTestTable testTable, String commitTime) throws Exception {
doCompactionInternal(testTable, commitTime, false);
doCompactionInternal(testTable, commitTime, false, false);
}

protected void doCompactionNonPartitionedAndValidate(HoodieTestTable testTable, String commitTime) throws Exception {
doCompactionInternal(testTable, commitTime, true, true);
}

protected void doCompactionAndValidate(HoodieTestTable testTable, String commitTime) throws Exception {
doCompactionInternal(testTable, commitTime, true);
doCompactionInternal(testTable, commitTime, true, false);
}

private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate) throws Exception {
testTable.doCompaction(commitTime, asList("p1", "p2"));
private void doCompactionInternal(HoodieTestTable testTable, String commitTime, boolean validate, boolean nonPartitioned) throws Exception {
testTable.doCompaction(commitTime, nonPartitioned ? asList("") : asList("p1", "p2"));
if (validate) {
validateMetadata(testTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ private void bootstrapAndVerifyFailure() throws Exception {
}

private void doWriteInsertAndUpsert(HoodieTestTable testTable) throws Exception {
doWriteInsertAndUpsert(testTable, "0000100", "0000101");
doWriteInsertAndUpsert(testTable, "0000100", "0000101", false);
}

private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchivalCommits) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ public void validateMetadata(HoodieTestTable testTable, List<String> inflightCom
List<java.nio.file.Path> fsPartitionPaths = testTable.getAllPartitionPaths();
List<String> fsPartitions = new ArrayList<>();
fsPartitionPaths.forEach(entry -> fsPartitions.add(entry.getFileName().toString()));
if (fsPartitions.isEmpty()) {
fsPartitions.add("");
}
List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();

Collections.sort(fsPartitions);
Expand Down Expand Up @@ -618,7 +621,7 @@ protected void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableM
}
}
}
assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" + partition).length);
assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length);

// Block sizes should be valid
Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getBlockSize() > 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
String RECORDKEY_PARTITION_LIST = "__all_partitions__";
// The partition name used for non-partitioned tables
String NON_PARTITIONED_NAME = ".";
String EMPTY_PARTITION_NAME = "";

// Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
static final String METADATA_TABLE_REL_PATH = HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;

/**
Expand Down Expand Up @@ -89,7 +90,7 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata c
List<HoodieRecord> records = new LinkedList<>();
List<String> allPartitions = new LinkedList<>();
commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName;
final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName;
allPartitions.add(partition);

Map<String, Long> newFiles = new HashMap<>(writeStats.size());
Expand Down Expand Up @@ -133,7 +134,8 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCommitMetadata c
public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
cleanMetadata.getPartitionMetadata().forEach((partitionName, partitionMetadata) -> {
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Expand Down Expand Up @@ -282,20 +284,22 @@ private static List<HoodieRecord> convertFilesToRecords(Map<String, List<String>
List<HoodieRecord> records = new LinkedList<>();
int[] fileChangeCount = {0, 0}; // deletes, appends

partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
partitionToDeletedFiles.forEach((partitionName, deletedFiles) -> {
fileChangeCount[0] += deletedFiles.size();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull this check into lambda

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has already been refactored into a util method.


Option<Map<String, Long>> filesAdded = Option.empty();
if (partitionToAppendedFiles.containsKey(partition)) {
filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
if (partitionToAppendedFiles.containsKey(partitionName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is prone to error when refactoring the code. This depends on the code ordering of the partitionToDeletedFiles and partitionToAppendedFiles. Instead we can remap the keys for both at once and avoid all these back and forth lookups.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, we create HoodieRecords for metadata table within forEach, and so couldn't do it outside.

filesAdded = Option.of(partitionToAppendedFiles.remove(partitionName));
}

HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
Option.of(new ArrayList<>(deletedFiles)));
records.add(record);
});

partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionName;
fileChangeCount[1] += appendedFileMap.size();

// Validate that no appended file has been deleted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ public static List<Path> getPartitionPaths(Path basePath) throws IOException {
if (Files.notExists(basePath)) {
return Collections.emptyList();
}
return Files.list(basePath).filter(entry -> !entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList());
return Files.list(basePath).filter(entry -> (!entry.getFileName().toString().equals(HoodieTableMetaClient.METAFOLDER_NAME)
&& !entry.getFileName().toString().contains("parquet") && !entry.getFileName().toString().contains("log"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no hardcoding of parquet. lets please replace with standard helpers or constants

&& !entry.getFileName().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)).collect(Collectors.toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public Path getPartitionPath(String partition) {
}

public List<java.nio.file.Path> getAllPartitionPaths() throws IOException {
java.nio.file.Path basePathPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).getParent().getParent();
java.nio.file.Path basePathPath = Paths.get(basePath);
return FileCreateUtils.getPartitionPaths(basePathPath);
}

Expand Down Expand Up @@ -660,8 +660,10 @@ public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOExcep
return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).stream()
.filter(entry -> {
boolean toReturn = true;
String filePath = entry.getPath().toString();
String fileName = entry.getPath().getName();
if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
if (fileName.equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE) || (!fileName.contains("log") && !fileName.contains("parquet"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

|| filePath.contains("metadata")) {
toReturn = false;
} else {
for (String inflight : inflightCommits) {
Expand Down