Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator;
import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadataGenerator;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -122,7 +122,7 @@ private LinkedHashMap<HoodieInstant, Integer[]> generateMixedData() throws Excep
for (Map.Entry<HoodieInstant, Integer[]> entry : replaceCommitData.entrySet()) {
String key = entry.getKey().getTimestamp();
Integer[] value = entry.getValue();
HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath, key,
HoodieTestReplaceCommitMetadataGenerator.createReplaceCommitFileWithMetadata(tablePath, key,
Option.of(value[0]), Option.of(value[1]), metaClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;

public class HoodieTestReplaceCommitMetadatGenerator extends HoodieTestCommitMetadataGenerator {
public class HoodieTestReplaceCommitMetadataGenerator extends HoodieTestCommitMetadataGenerator {
public static void createReplaceCommitFileWithMetadata(String basePath, String commitTime, Option<Integer> writes, Option<Integer> updates,
HoodieTableMetaClient metaclient) throws Exception {

HoodieReplaceCommitMetadata replaceMetadata = generateReplaceCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(),
UUID.randomUUID().toString(), writes, updates);
HoodieRequestedReplaceMetadata requestedReplaceMetadata = getHoodieRequestedReplaceMetadata();

HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, requestedReplaceMetadata, replaceMetadata);
HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, Option.ofNullable(requestedReplaceMetadata), Option.empty(), replaceMetadata);
}

private static HoodieRequestedReplaceMetadata getHoodieRequestedReplaceMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static org.apache.hudi.avro.model.HoodieReplaceCommitMetadata convertRepl
public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient,
TableFileSystemView fileSystemView,
HoodieInstant instant, List<String> replacedPartitions) {
// There is no file id to be replaced in the very first replace commit file for insert overwrite operation
if (replacedPartitions.isEmpty()) {
LOG.warn("Found no partition files to replace");
return true;
}
context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups");
List<Boolean> f = context.map(replacedPartitions, partition -> {
Stream<FileSlice> fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;

import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
Expand All @@ -37,8 +38,8 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;

/**
* Helper class to convert between different action related payloads and {@link HoodieArchivedMetaEntry}.
Expand Down Expand Up @@ -72,10 +73,21 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
} else if (hoodieInstant.isInflight()) {
// inflight replacecommit files have the same meta data body as HoodieCommitMetadata
// so we could re-use it without further creating an inflight extension.
// Or inflight replacecommit files are empty under clustering circumstance
Option<HoodieCommitMetadata> inflightCommitMetadata = getInflightReplaceMetadata(metaClient, hoodieInstant);
if (inflightCommitMetadata.isPresent()) {
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is setHoodieInflightReplaceMetadata function?

}
} else {
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
ClusteringUtils.getRequestedReplaceMetadata(metaClient, hoodieInstant).get();
Copy link
Contributor Author

@ssdong ssdong Apr 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ClusteringUtils.getRequestedReplaceMetadata is confusing to the reader even though it produces the same result. Part of this method's purpose is to ignore the inflight replacecommit file and delegate it to requested replacecommit file to retrieve the clustering plan. However, what we merely need is a fetch&deserialization method for requested metadata. Hence, I have created a separate method for this single purpose.

archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata);
// we may have cases with empty HoodieRequestedReplaceMetadata e.g. insert_overwrite_table or insert_overwrite
// without clustering. However, we should revisit the requested commit file standardization
Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = getRequestedReplaceMetadata(metaClient, hoodieInstant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about have a replaceUtils? I think maybe it is a good timing to decouple logic of replace commit from clustering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetadataConversionUtils should suffice to group such logic. They are all about converting commit files to Avro schema based metadata. 🤔

if (requestedReplaceMetadata.isPresent()) {
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
}
}
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
break;
Expand Down Expand Up @@ -107,14 +119,25 @@ public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInst
return archivedMetaWrapper;
}

public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant hoodieInstant,
HoodieCommitMetadata hoodieCommitMetadata) {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(hoodieCommitMetadata));
archivedMetaWrapper.setActionType(ActionType.commit.name());
return archivedMetaWrapper;
public static Option<HoodieCommitMetadata> getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException {
Option<byte[]> inflightContent = metaClient.getActiveTimeline().getInstantDetails(instant);
if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
// inflight files can be empty in some certain cases, e.g. when users opt in clustering
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishkotha what is the original reason for this?

return Option.empty();
}
return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(), HoodieCommitMetadata.class));
}

public static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException {
Option<byte[]> requestedContent = metaClient.getActiveTimeline().getInstantDetails(instant);
if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
// requested commit files can be empty in some certain cases, e.g. insert_overwrite or insert_overwrite_table.
// However, it appears requested files are supposed to contain meta data and we should revisit the standardization
// of requested commit files
// TODO revisit requested commit file standardization https://issues.apache.org/jira/browse/HUDI-1739
return Option.empty();
}
return Option.of(TimelineMetadataUtils.deserializeRequestedReplaceMetadata(requestedContent.get()));
}

public static org.apache.hudi.avro.model.HoodieCommitMetadata convertCommitMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ private boolean deleteAllInstantsOlderorEqualsInAuxMetaFolder(HoodieInstant thre

public void archive(HoodieEngineContext context, List<HoodieInstant> instants) throws HoodieCommitException {
try {
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
Expand All @@ -308,7 +307,7 @@ public void archive(HoodieEngineContext context, List<HoodieInstant> instants) t
}
try {
deleteAnyLeftOverMarkerFiles(context, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
records.add(convertToAvroRecord(hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
Expand Down Expand Up @@ -365,8 +364,8 @@ private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) thro
}
}

private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant)
throws IOException {
private IndexedRecord convertToAvroRecord(HoodieInstant hoodieInstant)
throws IOException {
return MetadataConversionUtils.createMetaWrapper(hoodieInstant, metaClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private void createReplaceRequested(String instantTime) throws Exception {
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
HoodieTestTable.of(metaClient)
.addRequestedReplace(instantTime, requestedReplaceMetadata)
.addRequestedReplace(instantTime, Option.of(requestedReplaceMetadata))
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

Expand Down Expand Up @@ -413,7 +413,7 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
HoodieTestTable.of(metaClient)
.addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata)
.addReplaceCommit(instantTime, Option.of(requestedReplaceMetadata), Option.empty(), replaceMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,13 +81,43 @@ public void testCompletedClean() throws Exception {
@Test
public void testCompletedReplace() throws Exception {
String newCommitTime = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE);
createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE, true);
HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper(
new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient);
assertEquals(metaEntry.getActionState(), State.COMPLETED.toString());
assertEquals(metaEntry.getHoodieReplaceCommitMetadata().getOperationType(), WriteOperationType.INSERT_OVERWRITE.toString());
}

@Test
public void testEmptyRequestedReplace() throws Exception {
String newCommitTime = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, false);
HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper(
new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient);
assertEquals(metaEntry.getActionState(), State.REQUESTED.toString());
assertNull(metaEntry.getHoodieRequestedReplaceMetadata());
}

@Test
public void testEmptyInflightReplace() throws Exception {
String newCommitTime = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, true);
HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient);
assertEquals(metaEntry.getActionState(), State.INFLIGHT.toString());
assertNull(metaEntry.getHoodieInflightReplaceMetadata());
}

@Test
public void testNonEmptyInflightReplace() throws Exception {
String newCommitTime = HoodieTestTable.makeNewCommitTime();
createReplace(newCommitTime, WriteOperationType.INSERT_OVERWRITE_TABLE, false);
HoodieArchivedMetaEntry metaEntry = MetadataConversionUtils.createMetaWrapper(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, newCommitTime), metaClient);
assertEquals(metaEntry.getActionState(), State.INFLIGHT.toString());
assertEquals(metaEntry.getHoodieInflightReplaceMetadata().getOperationType(), WriteOperationType.INSERT_OVERWRITE_TABLE.name());
}

@Test
public void testCompletedCommitOrDeltaCommit() throws Exception {
String newCommitTime = HoodieTestTable.makeNewCommitTime();
Expand Down Expand Up @@ -169,7 +200,8 @@ private void createCommitMetadata(String instantTime) throws Exception {
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

private void createReplace(String instantTime, WriteOperationType writeOperationType) throws Exception {
private void createReplace(String instantTime, WriteOperationType writeOperationType, Boolean isClustering)
throws Exception {
String fileId1 = "file-1";
String fileId2 = "file-2";

Expand All @@ -182,18 +214,29 @@ private void createReplace(String instantTime, WriteOperationType writeOperation
writeStat.setFileId("file-1");
replaceMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
replaceMetadata.setOperationType(writeOperationType);
// create replace instant to mark fileId1 as deleted
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
requestedReplaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE.name());
HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
clusteringGroup.setSlices(Arrays.asList(sliceInfo));
clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
// some cases requestedReplaceMetadata will be null
// e.g. insert_overwrite_table or insert_overwrite without clustering
HoodieRequestedReplaceMetadata requestedReplaceMetadata = null;
HoodieCommitMetadata inflightReplaceMetadata = null;
if (isClustering) {
requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
requestedReplaceMetadata.setOperationType(writeOperationType.name());
HoodieClusteringPlan clusteringPlan = new HoodieClusteringPlan();
HoodieClusteringGroup clusteringGroup = new HoodieClusteringGroup();
HoodieSliceInfo sliceInfo = new HoodieSliceInfo();
clusteringGroup.setSlices(Arrays.asList(sliceInfo));
clusteringPlan.setInputGroups(Arrays.asList(clusteringGroup));
requestedReplaceMetadata.setClusteringPlan(clusteringPlan);
requestedReplaceMetadata.setVersion(TimelineLayoutVersion.CURR_VERSION);
} else {
// inflightReplaceMetadata will be null in clustering but not null
// in insert_overwrite or insert_overwrite_table
inflightReplaceMetadata = new HoodieCommitMetadata();
inflightReplaceMetadata.setOperationType(writeOperationType);
inflightReplaceMetadata.setCompacted(false);
}
HoodieTestTable.of(metaClient)
.addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata)
.addReplaceCommit(instantTime, Option.ofNullable(requestedReplaceMetadata), Option.ofNullable(inflightReplaceMetadata), replaceMetadata)
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
}

Expand Down
Loading