Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions hudi-common/src/main/avro/HoodieReplaceMetadata.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Note that all 'replace' instants are read for every query
* So it is important to keep this small. Please be careful
* before tracking additional information in this file.
* This will be used for 'insert_overwrite' (RFC-18) and also 'clustering' (RFC-19)
*/
{"namespace": "org.apache.hudi.avro.model",
"type": "record",
"name": "HoodieReplaceMetadata",
"fields": [
{"name": "totalFilesReplaced", "type": "int"},
{"name": "command", "type": "string"},
{"name": "partitionMetadata", "type": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the partitionMetadata contain partitionPath -> new file groups ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be obtained from related commit file. Every t.replace has a corresponding t.[delta]commit (we decided to write two files to reduce overhead of reading all commit files in the query path)

Plus, it helps on query side to keep replace metadata files small. I also didn't want to repeat same information in two places.

Let me know if you think its important to store new file group info here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level Question : To make sure we are all on the same page : Is this metadata enough to achieve clustering ? Do you foresee any changes that needs to happen to this metadata to support clustering ? The PR mentions that this is for both clustering and overwrite. Hence, asking this question.

"type" : "map", "values" : {
"type": "array",
"items": "string"
}
}
},
{
"name":"version",
"type":["int", "null"],
"default": 1
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION,
INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION,
INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION));
INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the intention of this PR is to simply introduce a new action, do we need to change the VALID_ACTION in this PR ? Do the other methods you implemented in the ActiveTimeline require this ? I'm wondering if we can avoid this change in case we want to land this PR incrementally

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this for tests. But can split this and related test out into a different review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to avoid this for rollout purpose. In case, this PR gets landed before the next and a release cut, then we need to worry about ordering of rollout between readers and writers.

INFLIGHT_REPLACE_EXTENSION, REPLACE_EXTENSION));

private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;
Expand Down Expand Up @@ -304,6 +305,22 @@ public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedI
return inflight;
}

/**
* Transition Clean State from inflight to Committed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Clean/Replace

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update. thanks

*
* @param inflightInstant Inflight instant
* @param data Extra Metadata
* @return commit instant
*/
public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightInstant, Option<byte[]> data) {
ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_ACTION));
ValidationUtils.checkArgument(inflightInstant.isInflight());
HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, REPLACE_ACTION, inflightInstant.getTimestamp());
// Then write to timeline
transitionState(inflightInstant, commitInstant, data);
return commitInstant;
}

private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
transitionState(fromInstant, toInstant, data, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details);
}

@Override
public HoodieTimeline getCompletedAndReplaceTimeline() {
Set<String> commitActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION);
Set<String> validCommitTimes = instants.stream().filter(s -> s.isCompleted())
.filter(s -> commitActions.contains(s.getAction()))
.map(s -> s.getTimestamp()).collect(Collectors.toSet());

return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(REPLACE_ACTION))
.filter(s -> s.isCompleted())
.filter(instant -> validCommitTimes.contains(instant.getTimestamp())), details);
}

@Override
public HoodieTimeline filterPendingCompactionTimeline() {
return new HoodieDefaultTimeline(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ public String getFileName() {
} else if (HoodieTimeline.RESTORE_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp)
: HoodieTimeline.makeRestoreFileName(timestamp);
} else if (HoodieTimeline.REPLACE_ACTION.equals(action)) {
return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp)
: HoodieTimeline.makeReplaceFileName(timestamp);
}
throw new IllegalArgumentException("Cannot get file name for unknown action " + action);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public interface HoodieTimeline extends Serializable {
String CLEAN_ACTION = "clean";
String ROLLBACK_ACTION = "rollback";
String SAVEPOINT_ACTION = "savepoint";
String REPLACE_ACTION = "replace";
String INFLIGHT_EXTENSION = ".inflight";
// With Async Compaction, compaction instant can be in 3 states :
// (compaction-requested), (compaction-inflight), (completed)
Expand All @@ -57,7 +58,7 @@ public interface HoodieTimeline extends Serializable {

String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION};
COMPACTION_ACTION, REPLACE_ACTION};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above


String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
Expand All @@ -78,6 +79,8 @@ public interface HoodieTimeline extends Serializable {
String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION;
String RESTORE_EXTENSION = "." + RESTORE_ACTION;
String INFLIGHT_REPLACE_EXTENSION = "." + REPLACE_ACTION + INFLIGHT_EXTENSION;
String REPLACE_EXTENSION = "." + REPLACE_ACTION;

String INVALID_INSTANT_TS = "0";

Expand Down Expand Up @@ -126,6 +129,13 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline getCommitsAndCompactionTimeline();

/**
* Timeline to just include replace instants that have valid (commit/deltacommit) actions.
*
* @return
*/
HoodieTimeline getCompletedAndReplaceTimeline();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be you have used this API in a different PR, but where exactly are we going to use this specific method ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is used to build filesystem view in #1859, can move this to that PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the return timeline contains only replace timeline ? The naming is confusing to me. How about getValidReplaceTimeline or anything else to reflect the intention


/**
* Filter this timeline to just include requested and inflight compaction instants.
*
Expand Down Expand Up @@ -348,6 +358,14 @@ static String makeInflightRestoreFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_RESTORE_EXTENSION);
}

static String makeReplaceFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.REPLACE_EXTENSION);
}

static String makeInflightReplaceFileName(String instant) {
return StringUtils.join(instant, HoodieTimeline.INFLIGHT_REPLACE_EXTENSION);
}

static String makeDeltaFileName(String instantTime) {
return instantTime + HoodieTimeline.DELTA_COMMIT_EXTENSION;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
Expand Down Expand Up @@ -109,6 +110,10 @@ public static Option<byte[]> serializeRestoreMetadata(HoodieRestoreMetadata rest
return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class);
}

public static Option<byte[]> serializeReplaceMetadata(HoodieReplaceMetadata metadata) throws IOException {
return serializeAvroMetadata(metadata, HoodieReplaceMetadata.class);
}

public static <T extends SpecificRecordBase> Option<byte[]> serializeAvroMetadata(T metadata, Class<T> clazz)
throws IOException {
DatumWriter<T> datumWriter = new SpecificDatumWriter<>(clazz);
Expand Down Expand Up @@ -140,6 +145,10 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[]
return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class);
}

public static HoodieReplaceMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException {
return deserializeAvroMetadata(bytes, HoodieReplaceMetadata.class);
}

public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz)
throws IOException {
DatumReader<T> reader = new SpecificDatumReader<>(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,43 @@ public void testFiltering() {
.forEach(i -> assertFalse(t2.containsInstant(i)));
}

@Test
public void testReplaceActionsTimeline() {
int instantTime = 1;
List<HoodieInstant> allInstants = new ArrayList<>();
HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
allInstants.add(instant);
instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
allInstants.add(instant);
instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
allInstants.add(instant);

// create replace instant with maching commit instant
String replaceInstant = instant.getTimestamp();
instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, replaceInstant);
allInstants.add(instant);

//replace instant with no matching commit
instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, "999");
allInstants.add(instant);

HoodieInstant inflightCommitInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++));
allInstants.add(instant);

// replace instant with matching commit but inflight
instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, inflightCommitInstant.getTimestamp());
allInstants.add(instant);

timeline = new HoodieActiveTimeline(metaClient);
timeline.setInstants(allInstants);
List<HoodieInstant> validReplaceInstants =
timeline.getCompletedAndReplaceTimeline().getInstants().collect(Collectors.toList());

assertEquals(1, validReplaceInstants.size());
assertEquals(replaceInstant, validReplaceInstants.get(0).getTimestamp());
assertEquals(HoodieTimeline.REPLACE_ACTION, validReplaceInstants.get(0).getAction());
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand All @@ -416,7 +453,7 @@ private List<HoodieInstant> getAllInstants() {
// not be generating them.
if (state == State.REQUESTED) {
if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION)
|| action.equals(HoodieTimeline.ROLLBACK_ACTION)) {
|| action.equals(HoodieTimeline.ROLLBACK_ACTION) || action.equals(HoodieTimeline.REPLACE_ACTION)) {
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@
package org.apache.hudi.common.util;

import org.apache.avro.util.Utf8;
import org.apache.hudi.avro.model.HoodieReplaceMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.junit.jupiter.api.Test;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -52,6 +63,38 @@ public void testSerDeser() throws IOException {
verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
}

@Test
public void testSerDeserReplaceMetadata() throws Exception {
Random r = new Random();
int numPartitions = 1;
HoodieReplaceMetadata replaceMetadata = new HoodieReplaceMetadata();
int numFiles = 30;
Map<String, List<String>> partitionToReplaceFileId = new HashMap<>();

for (int i = 0; i < numFiles; i++) {
int partition = r.nextInt(numPartitions);
String partitionStr = "2020/07/0" + partition;
partitionToReplaceFileId.putIfAbsent(partitionStr, new ArrayList<>());
partitionToReplaceFileId.get(partitionStr).add(FSUtils.createNewFileIdPfx());
}
String command = "clustering";
replaceMetadata.setVersion(1);
replaceMetadata.setTotalFilesReplaced(numFiles);
replaceMetadata.setCommand(command);
replaceMetadata.setPartitionMetadata(partitionToReplaceFileId);
String filePath = "/tmp/myreplacetest";
byte[] data = TimelineMetadataUtils.serializeReplaceMetadata(replaceMetadata).get();
FileOutputStream fos = new FileOutputStream(filePath);
fos.write(data);
fos.close();

data = Files.readAllBytes(Paths.get(filePath));
HoodieReplaceMetadata replaceMetadataD = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(data);
assertEquals(replaceMetadataD.getTotalFilesReplaced(), numFiles);
assertEquals(command, replaceMetadataD.getCommand());
assertEquals(partitionToReplaceFileId, replaceMetadataD.getPartitionMetadata());
}

private <T> void verifyObject(T expectedValue) throws IOException {
byte[] serializedObject = SerializationUtils.serialize(expectedValue);
assertNotNull(serializedObject);
Expand Down