Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,14 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieI
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
if (hoodieInstant.isRequested()) {
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(commitTimeline.getInstantDetails(hoodieInstant).get()));
} else if (hoodieInstant.isCompleted()) {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
}
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
Expand Down Expand Up @@ -495,11 +496,16 @@ private void createReplaceMetadata(String instantTime) throws Exception {
String fileId2 = "file-" + instantTime + "-2";

// create replace instant to mark fileId1 as deleted
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
.setVersion(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the CURRENT_VERSION from timeline layout ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash This is not related to timeline layout version. This is tracking version of HoodieRequestedReplaceMetadata (in case we add new fields to replace metadata avro structure, we can increase version).

In the test, we were actually already configured to use default timeline layout version 1. But test was only creating commit file and was not creating requested files. I modified to add requested files as it happens with timeline layout version 1.

.setExtraMetadata(Collections.emptyMap())
.build();
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
HoodieTestTable.of(metaClient)
.addReplaceCommit(instantTime, replaceMetadata)
.addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.HoodieCleanStat;
Expand Down Expand Up @@ -845,7 +850,8 @@ public void testCleanWithReplaceCommits() throws Exception {
// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
String file2P0C1 = partitionAndFileId002.get(p0);
testTable.addReplaceCommit("00000000000002", generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1);
testTable.addReplaceCommit("00000000000002", replaceMetadata.getKey(), replaceMetadata.getValue());

// run cleaner
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
Expand All @@ -857,7 +863,8 @@ public void testCleanWithReplaceCommits() throws Exception {
// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
String file3P1C2 = partitionAndFileId003.get(p1);
testTable.addReplaceCommit("00000000000003", generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2);
testTable.addReplaceCommit("00000000000003", replaceMetadata.getKey(), replaceMetadata.getValue());

// run cleaner
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
Expand All @@ -870,7 +877,8 @@ public void testCleanWithReplaceCommits() throws Exception {
// make next replacecommit, with 1 clustering operation. Replace data in p0 again
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
String file4P0C3 = partitionAndFileId004.get(p0);
testTable.addReplaceCommit("00000000000004", generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3);
testTable.addReplaceCommit("00000000000004", replaceMetadata.getKey(), replaceMetadata.getValue());

// run cleaner
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
Expand All @@ -884,7 +892,8 @@ public void testCleanWithReplaceCommits() throws Exception {
// make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
String file4P1C4 = partitionAndFileId005.get(p1);
testTable.addReplaceCommit("00000000000005", generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4);
testTable.addReplaceCommit("00000000000005", replaceMetadata.getKey(), replaceMetadata.getValue());

List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
Expand All @@ -894,7 +903,23 @@ public void testCleanWithReplaceCommits() throws Exception {
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}

private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partition, String replacedFileId, String newFileId) {
private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(String partition,
String replacedFileId,
String newFileId) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
requestedReplaceMetadata.setVersion(1);
HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
clusteringGroups.add(HoodieClusteringGroup.newBuilder()
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
.setSlices(Collections.singletonList(sliceInfo)).build());
requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
.setVersion(1).setExtraMetadata(Collections.emptyMap())
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
.setInputGroups(clusteringGroups).build());

HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
replaceMetadata.addReplaceFileId(partition, replacedFileId);
replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
Expand All @@ -905,7 +930,7 @@ private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partiti
writeStat.setFileId(newFileId);
replaceMetadata.addWriteStat(partition, writeStat);
}
return replaceMetadata;
return Pair.of(requestedReplaceMetadata, replaceMetadata);
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion hudi-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieReplaceCommitMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
<import>${basedir}/src/main/avro/HoodiePath.avsc</import>
<import>${basedir}/src/main/avro/HoodieFSPermission.avsc</import>
<import>${basedir}/src/main/avro/HoodieFileStatus.avsc</import>
Expand All @@ -90,6 +89,7 @@
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
</imports>
</configuration>
</plugin>
Expand Down
8 changes: 8 additions & 0 deletions hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@
"HoodieReplaceCommitMetadata"
],
"default": null
},
{
"name":"hoodieRequestedReplaceMetadata",
"type":[
"null",
"HoodieRequestedReplaceMetadata"
],
"default": null
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -147,8 +149,9 @@ public static void createReplaceCommit(String basePath, String instantTime, Hood
createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
}

public static void createRequestedReplaceCommit(String basePath, String instantTime) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION);
public static void createRequestedReplaceCommit(String basePath, String instantTime, HoodieRequestedReplaceMetadata requestedMetadata) throws IOException {
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedMetadata).get());
}

public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
Expand Down Expand Up @@ -163,8 +164,8 @@ public HoodieTestTable addDeltaCommit(String instantTime) throws Exception {
return this;
}

public HoodieTestTable addReplaceCommit(String instantTime, HoodieReplaceCommitMetadata metadata) throws Exception {
createRequestedReplaceCommit(basePath, instantTime);
public HoodieTestTable addReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception {
createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata);
createInflightReplaceCommit(basePath, instantTime);
createReplaceCommit(basePath, instantTime, metadata);
currentInstantTime = instantTime;
Expand Down