From 8d944498a0463d34e8196f6eca841ce58b6b1982 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 6 Jan 2022 17:47:41 +0530 Subject: [PATCH] [HUDI-3173] Add INDEX action type and corresponding commit metadata Fix timeline tests Make MetadataPartitionType#all generic Add instant info to plan and address review comments --- hudi-common/pom.xml | 3 + .../main/avro/HoodieArchivedMetaEntry.avsc | 8 +++ .../main/avro/HoodieIndexCommitMetadata.avsc | 51 +++++++++++++++ .../main/avro/HoodieIndexPartitionInfo.avsc | 48 ++++++++++++++ .../src/main/avro/HoodieIndexPlan.avsc | 43 +++++++++++++ .../hudi/common/model/WriteOperationType.java | 4 ++ .../table/timeline/HoodieActiveTimeline.java | 63 ++++++++++++++++++- .../timeline/HoodieArchivedTimeline.java | 7 ++- .../table/timeline/HoodieDefaultTimeline.java | 17 ++++- .../common/table/timeline/HoodieInstant.java | 4 ++ .../common/table/timeline/HoodieTimeline.java | 36 ++++++++++- .../table/timeline/TimelineMetadataUtils.java | 18 ++++++ .../hudi/metadata/BaseTableMetadata.java | 10 +-- .../hudi/metadata/MetadataPartitionType.java | 3 +- .../timeline/TestHoodieActiveTimeline.java | 15 +++-- 15 files changed, 308 insertions(+), 22 deletions(-) create mode 100644 hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc create mode 100644 hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc create mode 100644 hudi-common/src/main/avro/HoodieIndexPlan.avsc diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index e19070a6f9afe..f1abaa779274f 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -148,6 +148,9 @@ ${basedir}/src/main/avro/HoodieClusteringPlan.avsc ${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc ${basedir}/src/main/avro/HoodieMetadata.avsc + ${basedir}/src/main/avro/HoodieIndexPartitionInfo.avsc + ${basedir}/src/main/avro/HoodieIndexPlan.avsc + ${basedir}/src/main/avro/HoodieIndexCommitMetadata.avsc ${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc index c052147f718ea..81bcaf745e5b8 100644 --- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc +++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc @@ -120,6 +120,14 @@ "HoodieCommitMetadata" ], "default": null + }, + { + "name":"hoodieIndexCommitMetadata", + "type":[ + "null", + "HoodieIndexCommitMetadata" + ], + "default": null } ] } diff --git a/hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc b/hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc new file mode 100644 index 0000000000000..d657ff148d567 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieIndexCommitMetadata.avsc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieIndexCommitMetadata", + "fields": [ + { + "name": "version", + "type": [ + "int", + "null" + ], + "default": 1 + }, + { + "name": "operationType", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name":"partitionToWriteStats", + "type":["null", { + "type":"map", + "values":{ + "type":"array", + "items":"HoodieWriteStat" + } + }], + "default": null + } + ] +} diff --git a/hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc b/hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc new file mode 100644 index 0000000000000..0778be8c21762 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieIndexPartitionInfo.avsc @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieIndexPartitionInfo", + "fields": [ + { + "name": "version", + "type": [ + "int", + "null" + ], + "default": 1 + }, + { + "name": "metadataPartitionPath", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "indexUptoInstant", + "type": [ + "null", + "HoodieInstantInfo" + ], + "default": null + } + ] +} diff --git a/hudi-common/src/main/avro/HoodieIndexPlan.avsc b/hudi-common/src/main/avro/HoodieIndexPlan.avsc new file mode 100644 index 0000000000000..9fb7ec311e34a --- /dev/null +++ b/hudi-common/src/main/avro/HoodieIndexPlan.avsc @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieIndexPlan", + "fields": [ + { + "name": "version", + "type": [ + "int", + "null" + ], + "default": 1 + }, + { + "name": "indexPartitionInfos", + "type": [ + "null", + { + "type": "array", + "items": "HoodieIndexPartitionInfo" + } + ], + "default": null + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java index b5a3cc002366e..a75640547e2bd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java @@ -48,6 +48,8 @@ public enum WriteOperationType { INSERT_OVERWRITE_TABLE("insert_overwrite_table"), // compact COMPACT("compact"), + + INDEX("index"), // used for old version UNKNOWN("unknown"); @@ -86,6 +88,8 @@ public static WriteOperationType fromValue(String value) { return CLUSTER; case "compact": return COMPACT; + case "index": + return INDEX; case "unknown": return UNKNOWN; default: diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index c7473bd7d59d5..606f1dbfacf01 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -72,7 +72,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION, - REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION)); + REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION, + REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -98,7 +99,6 @@ public static String createNewInstantTime() { return HoodieInstantTimeGenerator.createNewInstantTime(0); } - /** * Returns next instant time that adds N milliseconds to current time. * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity @@ -599,6 +599,65 @@ public void saveToRollbackRequested(HoodieInstant instant, Option conten createFileInMetaPath(instant.getFileName(), content, false); } + /** + * Transition index instant state from requested to inflight. + * + * @param requestedInstant Inflight Instant + * @return inflight instant + */ + public HoodieInstant transitionIndexRequestedToInflight(HoodieInstant requestedInstant, Option data) { + ValidationUtils.checkArgument(requestedInstant.getAction().equals(HoodieTimeline.INDEX_ACTION), + String.format("%s is not equal to %s action", requestedInstant.getAction(), INDEX_ACTION)); + ValidationUtils.checkArgument(requestedInstant.isRequested(), + String.format("Instant %s not in requested state", requestedInstant.getTimestamp())); + HoodieInstant inflightInstant = new HoodieInstant(State.INFLIGHT, INDEX_ACTION, requestedInstant.getTimestamp()); + transitionState(requestedInstant, inflightInstant, data); + return inflightInstant; + } + + /** + * Transition index instant state from inflight to completed. + * @param inflightInstant Inflight Instant + * @return completed instant + */ + public HoodieInstant transitionIndexInflightToComplete(HoodieInstant inflightInstant, Option data) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEX_ACTION), + String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEX_ACTION)); + ValidationUtils.checkArgument(inflightInstant.isInflight(), + String.format("Instant %s not inflight", inflightInstant.getTimestamp())); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, INDEX_ACTION, inflightInstant.getTimestamp()); + transitionState(inflightInstant, commitInstant, data); + return commitInstant; + } + + /** + * Revert index instant state from inflight to requested. + * @param inflightInstant Inflight Instant + * @return requested instant + */ + public HoodieInstant revertIndexInflightToRequested(HoodieInstant inflightInstant) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.INDEX_ACTION), + String.format("%s is not equal to %s action", inflightInstant.getAction(), INDEX_ACTION)); + ValidationUtils.checkArgument(inflightInstant.isInflight(), + String.format("Instant %s not inflight", inflightInstant.getTimestamp())); + HoodieInstant requestedInstant = new HoodieInstant(State.REQUESTED, INDEX_ACTION, inflightInstant.getTimestamp()); + if (metaClient.getTimelineLayoutVersion().isNullVersion()) { + transitionState(inflightInstant, requestedInstant, Option.empty()); + } else { + deleteInflight(inflightInstant); + } + return requestedInstant; + } + + /** + * Save content for inflight/requested index instant. + */ + public void saveToPendingIndexCommit(HoodieInstant instant, Option content) { + ValidationUtils.checkArgument(instant.getAction().equals(HoodieTimeline.INDEX_ACTION), + String.format("%s is not equal to %s action", instant.getAction(), INDEX_ACTION)); + createFileInMetaPath(instant.getFileName(), content, false); + } + private void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { Path fullPath = new Path(metaClient.getMetaPath(), filename); if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 5ad3fa7a9f215..ae0d7c02bb556 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -101,7 +101,8 @@ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { * * @deprecated */ - public HoodieArchivedTimeline() {} + public HoodieArchivedTimeline() { + } /** * This method is only used when this object is deserialized in a spark executor. @@ -190,6 +191,8 @@ private Option getMetadataKey(String action) { return Option.of("hoodieCompactionPlan"); case HoodieTimeline.REPLACE_COMMIT_ACTION: return Option.of("hoodieReplaceCommitMetadata"); + case HoodieTimeline.INDEX_ACTION: + return Option.of("hoodieIndexCommitMetadata"); default: LOG.error(String.format("Unknown action in metadata (%s)", action)); return Option.empty(); @@ -328,7 +331,7 @@ private int getArchivedFileSuffix(FileStatus f) { @Override public HoodieDefaultTimeline getWriteTimeline() { // filter in-memory instants - Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); + Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION); return new HoodieDefaultTimeline(getInstants().filter(i -> readCommits.keySet().contains(i.getTimestamp())) .filter(s -> validActions.contains(s.getAction())), details); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 2cf111e91c812..02c5c49984c8c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -75,7 +75,8 @@ public void setInstants(List instants) { * * @deprecated */ - public HoodieDefaultTimeline() {} + public HoodieDefaultTimeline() { + } @Override public HoodieTimeline filterInflights() { @@ -108,7 +109,7 @@ public HoodieTimeline filterCompletedAndCompactionInstants() { @Override public HoodieDefaultTimeline getWriteTimeline() { - Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); + Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION); return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); } @@ -181,6 +182,16 @@ public HoodieTimeline filter(Predicate filter) { return new HoodieDefaultTimeline(instants.stream().filter(filter), details); } + @Override + public HoodieTimeline filterPendingIndexTimeline() { + return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEX_ACTION) && !s.isCompleted()), details); + } + + @Override + public HoodieTimeline filterCompletedIndexTimeline() { + return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(INDEX_ACTION)).filter(HoodieInstant::isCompleted), details); + } + /** * Get all instants (commits, delta commits) that produce new data, in the active timeline. */ @@ -194,7 +205,7 @@ public HoodieTimeline getCommitsTimeline() { */ public HoodieTimeline getAllCommitsTimeline() { return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, - CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION)); + CLEAN_ACTION, COMPACTION_ACTION, SAVEPOINT_ACTION, ROLLBACK_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index a8df62c6496ae..e170ef465de93 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -171,6 +171,10 @@ public String getFileName() { return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp) : isRequested() ? HoodieTimeline.makeRequestedReplaceFileName(timestamp) : HoodieTimeline.makeReplaceFileName(timestamp); + } else if (HoodieTimeline.INDEX_ACTION.equals(action)) { + return isInflight() ? HoodieTimeline.makeInflightIndexFileName(timestamp) + : isRequested() ? HoodieTimeline.makeRequestedIndexFileName(timestamp) + : HoodieTimeline.makeIndexCommitFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 6ea44a83007d1..074bd47954fd3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -55,10 +55,11 @@ public interface HoodieTimeline extends Serializable { String COMPACTION_ACTION = "compaction"; String REQUESTED_EXTENSION = ".requested"; String RESTORE_ACTION = "restore"; + String INDEX_ACTION = "index"; String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION, - COMPACTION_ACTION, REPLACE_COMMIT_ACTION}; + COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEX_ACTION}; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; @@ -83,6 +84,9 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + INFLIGHT_EXTENSION; String REQUESTED_REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION + REQUESTED_EXTENSION; String REPLACE_COMMIT_EXTENSION = "." + REPLACE_COMMIT_ACTION; + String INFLIGHT_INDEX_COMMIT_EXTENSION = "." + INDEX_ACTION + INFLIGHT_EXTENSION; + String REQUESTED_INDEX_COMMIT_EXTENSION = "." + INDEX_ACTION + REQUESTED_EXTENSION; + String INDEX_COMMIT_EXTENSION = "." + INDEX_ACTION; String INVALID_INSTANT_TS = "0"; @@ -197,6 +201,16 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filter(Predicate filter); + /** + * Filter this timeline to just include requested and inflight index instants. + */ + HoodieTimeline filterPendingIndexTimeline(); + + /** + * Filter this timeline to just include completed index instants. + */ + HoodieTimeline filterCompletedIndexTimeline(); + /** * If the timeline has any instants. * @@ -340,6 +354,14 @@ static HoodieInstant getRollbackRequestedInstant(HoodieInstant instant) { return instant.isRequested() ? instant : HoodieTimeline.getRequestedInstant(instant); } + static HoodieInstant getIndexRequestedInstant(final String timestamp) { + return new HoodieInstant(State.REQUESTED, INDEX_ACTION, timestamp); + } + + static HoodieInstant getIndexInflightInstant(final String timestamp) { + return new HoodieInstant(State.INFLIGHT, INDEX_ACTION, timestamp); + } + /** * Returns the inflight instant corresponding to the instant being passed. Takes care of changes in action names * between inflight and completed instants (compaction <=> commit). @@ -449,4 +471,16 @@ static String makeFileNameAsComplete(String fileName) { static String makeFileNameAsInflight(String fileName) { return StringUtils.join(fileName, HoodieTimeline.INFLIGHT_EXTENSION); } + + static String makeIndexCommitFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.INDEX_COMMIT_EXTENSION); + } + + static String makeInflightIndexFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_INDEX_COMMIT_EXTENSION); + } + + static String makeRequestedIndexFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REQUESTED_INDEX_COMMIT_EXTENSION); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 32e42ee58ac27..347577489a36c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -22,6 +22,8 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; +import org.apache.hudi.avro.model.HoodieIndexPlan; import org.apache.hudi.avro.model.HoodieInstantInfo; import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; @@ -134,6 +136,14 @@ public static Option serializeRequestedReplaceMetadata(HoodieRequestedRe return serializeAvroMetadata(clusteringPlan, HoodieRequestedReplaceMetadata.class); } + public static Option serializeIndexPlan(HoodieIndexPlan indexPlan) throws IOException { + return serializeAvroMetadata(indexPlan, HoodieIndexPlan.class); + } + + public static Option serializeIndexCommitMetadata(HoodieIndexCommitMetadata indexCommitMetadata) throws IOException { + return serializeAvroMetadata(indexCommitMetadata, HoodieIndexCommitMetadata.class); + } + public static Option serializeAvroMetadata(T metadata, Class clazz) throws IOException { DatumWriter datumWriter = new SpecificDatumWriter<>(clazz); @@ -177,6 +187,14 @@ public static HoodieReplaceCommitMetadata deserializeHoodieReplaceMetadata(byte[ return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class); } + public static HoodieIndexPlan deserializeIndexPlan(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieIndexPlan.class); + } + + public static HoodieIndexCommitMetadata deserializeIndexCommitMetadata(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieIndexCommitMetadata.class); + } + public static T deserializeAvroMetadata(byte[] bytes, Class clazz) throws IOException { DatumReader reader = new SpecificDatumReader<>(clazz); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index ccd421e677651..cd7203d2892a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -156,7 +156,7 @@ protected List fetchAllPartitionPaths() throws IOException { List partitions = Collections.emptyList(); if (hoodieRecord.isPresent()) { - mayBeHandleSpuriousDeletes(hoodieRecord, "\"all partitions\""); + handleSpuriousDeletes(hoodieRecord, "\"all partitions\""); partitions = hoodieRecord.get().getData().getFilenames(); // Partition-less tables have a single empty partition if (partitions.contains(NON_PARTITIONED_NAME)) { @@ -186,7 +186,7 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { FileStatus[] statuses = {}; if (hoodieRecord.isPresent()) { - mayBeHandleSpuriousDeletes(hoodieRecord, partitionName); + handleSpuriousDeletes(hoodieRecord, partitionName); statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath); } @@ -221,7 +221,7 @@ Map fetchAllFilesInPartitionPaths(List partitionPath for (Pair>> entry: partitionsFileStatus) { if (entry.getValue().isPresent()) { - mayBeHandleSpuriousDeletes(entry.getValue(), entry.getKey()); + handleSpuriousDeletes(entry.getValue(), entry.getKey()); result.put(partitionInfo.get(entry.getKey()).toString(), entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), partitionInfo.get(entry.getKey()))); } } @@ -231,11 +231,11 @@ Map fetchAllFilesInPartitionPaths(List partitionPath } /** - * May be handle spurious deletes. Depending on config, throw an exception or log a warn msg. + * Handle spurious deletes. Depending on config, throw an exception or log a warn msg. * @param hoodieRecord instance of {@link HoodieRecord} of interest. * @param partitionName partition name of interest. */ - private void mayBeHandleSpuriousDeletes(Option> hoodieRecord, String partitionName) { + private void handleSpuriousDeletes(Option> hoodieRecord, String partitionName) { if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { if (!metadataConfig.ignoreSpuriousDeletes()) { throw new HoodieMetadataException("Metadata record for " + partitionName + " is inconsistent: " diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java index 380f4d04d34a6..78f30836f87db 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; public enum MetadataPartitionType { FILES("files", "files-"); @@ -43,6 +44,6 @@ public String getFileIdPrefix() { } public static List all() { - return Arrays.asList(MetadataPartitionType.FILES.partitionPath()); + return Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::partitionPath).collect(Collectors.toList()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 9d89c2a6b5feb..12d5e07711c04 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -217,20 +217,19 @@ public void testTimelineGetOperations() { // Test that various types of getXXX operations from HoodieActiveTimeline // return the correct set of Instant - checkTimeline.accept(timeline.getCommitsTimeline(), - CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); - checkTimeline.accept(timeline.getWriteTimeline(), - CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCommitsTimeline(), CollectionUtils.createSet( + HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); + checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet( + HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.INDEX_ACTION)); checkTimeline.accept(timeline.getCommitTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION)); checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION)); checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION)); checkTimeline.accept(timeline.getRestoreTimeline(), Collections.singleton(HoodieTimeline.RESTORE_ACTION)); checkTimeline.accept(timeline.getSavePointTimeline(), Collections.singleton(HoodieTimeline.SAVEPOINT_ACTION)); - checkTimeline.accept(timeline.getAllCommitsTimeline(), - CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, - HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION, - HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION)); + checkTimeline.accept(timeline.getAllCommitsTimeline(), CollectionUtils.createSet( + HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, + HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.INDEX_ACTION)); // Get some random Instants Random rand = new Random();