diff --git a/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc b/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc new file mode 100644 index 0000000000000..ce2db3cb9dd0e --- /dev/null +++ b/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* + * Note that all 'replace' instants are read for every query + * So it is important to keep this small. Please be careful + * before tracking additional information in this file. + * This will be used for 'insert_overwrite' (RFC-18) and also 'clustering' (RFC-19) + */ +{"namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieReplaceMetadata", + "fields": [ + {"name": "totalFilesReplaced", "type": "int"}, + {"name": "command", "type": "string"}, + {"name": "partitionMetadata", "type": { + "type" : "map", "values" : { + "type": "array", + "items": "string" + } + } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 36e2b3de1e4ed..ae22a439a8b9f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -65,7 +65,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION)); + INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, + INFLIGHT_REPLACE_EXTENSION, REPLACE_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -304,6 +305,22 @@ public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedI return inflight; } + /** + * Transition Clean State from inflight to Committed. + * + * @param inflightInstant Inflight instant + * @param data Extra Metadata + * @return commit instant + */ + public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightInstant, Option data) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_ACTION)); + ValidationUtils.checkArgument(inflightInstant.isInflight()); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, REPLACE_ACTION, inflightInstant.getTimestamp()); + // Then write to timeline + transitionState(inflightInstant, commitInstant, data); + return commitInstant; + } + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { transitionState(fromInstant, toInstant, data, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c7a6230ffd93c..1aa8e586370bb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -113,6 +113,18 @@ public HoodieDefaultTimeline getCommitsAndCompactionTimeline() { return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); } + @Override + public HoodieTimeline getCompletedAndReplaceTimeline() { + Set commitActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION); + Set validCommitTimes = instants.stream().filter(s -> s.isCompleted()) + .filter(s -> commitActions.contains(s.getAction())) + .map(s -> s.getTimestamp()).collect(Collectors.toSet()); + + return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(REPLACE_ACTION)) + .filter(s -> s.isCompleted()) + .filter(instant -> validCommitTimes.contains(instant.getTimestamp())), details); + } + @Override public HoodieTimeline filterPendingCompactionTimeline() { return new HoodieDefaultTimeline( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index aea414bc7e227..909e5f5bd1bd1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -162,6 +162,9 @@ public String getFileName() { } else if (HoodieTimeline.RESTORE_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp) : HoodieTimeline.makeRestoreFileName(timestamp); + } else if (HoodieTimeline.REPLACE_ACTION.equals(action)) { + return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp) + : HoodieTimeline.makeReplaceFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 151c18bc28445..6fddbce029d82 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -48,6 +48,7 @@ public interface HoodieTimeline extends Serializable { String CLEAN_ACTION = "clean"; String ROLLBACK_ACTION = "rollback"; String SAVEPOINT_ACTION = "savepoint"; + String REPLACE_ACTION = "replace"; String INFLIGHT_EXTENSION = ".inflight"; // With Async Compaction, compaction instant can be in 3 states : // (compaction-requested), (compaction-inflight), (completed) @@ -57,7 +58,7 @@ public interface HoodieTimeline extends Serializable { String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION, - COMPACTION_ACTION}; + COMPACTION_ACTION, REPLACE_ACTION}; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; @@ -78,6 +79,8 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION; String RESTORE_EXTENSION = "." + RESTORE_ACTION; + String INFLIGHT_REPLACE_EXTENSION = "." + REPLACE_ACTION + INFLIGHT_EXTENSION; + String REPLACE_EXTENSION = "." + REPLACE_ACTION; String INVALID_INSTANT_TS = "0"; @@ -126,6 +129,13 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline getCommitsAndCompactionTimeline(); + /** + * Timeline to just include replace instants that have valid (commit/deltacommit) actions. + * + * @return + */ + HoodieTimeline getCompletedAndReplaceTimeline(); + /** * Filter this timeline to just include requested and inflight compaction instants. * @@ -348,6 +358,14 @@ static String makeInflightRestoreFileName(String instant) { return StringUtils.join(instant, HoodieTimeline.INFLIGHT_RESTORE_EXTENSION); } + static String makeReplaceFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REPLACE_EXTENSION); + } + + static String makeInflightReplaceFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_REPLACE_EXTENSION); + } + static String makeDeltaFileName(String instantTime) { return instantTime + HoodieTimeline.DELTA_COMMIT_EXTENSION; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index f95bfc3427dd9..4e823002fa4c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; @@ -109,6 +110,10 @@ public static Option serializeRestoreMetadata(HoodieRestoreMetadata rest return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class); } + public static Option serializeReplaceMetadata(HoodieReplaceMetadata metadata) throws IOException { + return serializeAvroMetadata(metadata, HoodieReplaceMetadata.class); + } + public static Option serializeAvroMetadata(T metadata, Class clazz) throws IOException { DatumWriter datumWriter = new SpecificDatumWriter<>(clazz); @@ -140,6 +145,10 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[] return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class); } + public static HoodieReplaceMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieReplaceMetadata.class); + } + public static T deserializeAvroMetadata(byte[] bytes, Class clazz) throws IOException { DatumReader reader = new SpecificDatumReader<>(clazz); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 34c705516e077..019f8a4dd20a3 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -399,6 +399,43 @@ public void testFiltering() { .forEach(i -> assertFalse(t2.containsInstant(i))); } + @Test + public void testReplaceActionsTimeline() { + int instantTime = 1; + List allInstants = new ArrayList<>(); + HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + + // create replace instant with maching commit instant + String replaceInstant = instant.getTimestamp(); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, replaceInstant); + allInstants.add(instant); + + //replace instant with no matching commit + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, "999"); + allInstants.add(instant); + + HoodieInstant inflightCommitInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + + // replace instant with matching commit but inflight + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, inflightCommitInstant.getTimestamp()); + allInstants.add(instant); + + timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(allInstants); + List validReplaceInstants = + timeline.getCompletedAndReplaceTimeline().getInstants().collect(Collectors.toList()); + + assertEquals(1, validReplaceInstants.size()); + assertEquals(replaceInstant, validReplaceInstants.get(0).getTimestamp()); + assertEquals(HoodieTimeline.REPLACE_ACTION, validReplaceInstants.get(0).getAction()); + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant @@ -416,7 +453,7 @@ private List getAllInstants() { // not be generating them. if (state == State.REQUESTED) { if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION) - || action.equals(HoodieTimeline.ROLLBACK_ACTION)) { + || action.equals(HoodieTimeline.ROLLBACK_ACTION) || action.equals(HoodieTimeline.REPLACE_ACTION)) { continue; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java index 9d6c1b81b044c..716e8de46e2ab 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java @@ -19,12 +19,23 @@ package org.apache.hudi.common.util; import org.apache.avro.util.Utf8; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.junit.jupiter.api.Test; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -52,6 +63,38 @@ public void testSerDeser() throws IOException { verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); } + @Test + public void testSerDeserReplaceMetadata() throws Exception { + Random r = new Random(); + int numPartitions = 1; + HoodieReplaceMetadata replaceMetadata = new HoodieReplaceMetadata(); + int numFiles = 30; + Map> partitionToReplaceFileId = new HashMap<>(); + + for (int i = 0; i < numFiles; i++) { + int partition = r.nextInt(numPartitions); + String partitionStr = "2020/07/0" + partition; + partitionToReplaceFileId.putIfAbsent(partitionStr, new ArrayList<>()); + partitionToReplaceFileId.get(partitionStr).add(FSUtils.createNewFileIdPfx()); + } + String command = "clustering"; + replaceMetadata.setVersion(1); + replaceMetadata.setTotalFilesReplaced(numFiles); + replaceMetadata.setCommand(command); + replaceMetadata.setPartitionMetadata(partitionToReplaceFileId); + String filePath = "/tmp/myreplacetest"; + byte[] data = TimelineMetadataUtils.serializeReplaceMetadata(replaceMetadata).get(); + FileOutputStream fos = new FileOutputStream(filePath); + fos.write(data); + fos.close(); + + data = Files.readAllBytes(Paths.get(filePath)); + HoodieReplaceMetadata replaceMetadataD = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(data); + assertEquals(replaceMetadataD.getTotalFilesReplaced(), numFiles); + assertEquals(command, replaceMetadataD.getCommand()); + assertEquals(partitionToReplaceFileId, replaceMetadataD.getPartitionMetadata()); + } + private void verifyObject(T expectedValue) throws IOException { byte[] serializedObject = SerializationUtils.serialize(expectedValue); assertNotNull(serializedObject);