diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index c94f30b3daeca..317ae68ed6148 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -18,11 +18,19 @@ package org.apache.hudi.table; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -46,12 +54,6 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -62,8 +64,10 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -263,6 +267,7 @@ public void archive(List instants) throws HoodieCommitException { LOG.info("Wrapper schema " + wrapperSchema.toString()); List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { + deleteReplacedFiles(hoodieInstant); try { records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { @@ -285,6 +290,43 @@ public Path getArchiveFilePath() { return archiveFilePath; } + private void deleteReplacedFiles(HoodieInstant instant) { + if (!instant.isCompleted() || !HoodieTimeline.COMMIT_ACTION.equals(instant.getAction())) { + // only delete files for completed instants + return; + } + Option replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline() + .filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant(); + + replaceInstantOption.ifPresent(replaceInstant -> { + try { + HoodieReplaceMetadata metadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata( + metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get()); + + metadata.getPartitionMetadata().entrySet().stream().forEach(entry -> + deleteFileGroups(entry.getKey(), new HashSet<>(entry.getValue()), instant) + ); + } catch (IOException e) { + throw new HoodieCommitException("Failed to archive because cannot delete replace files", e); + } + }); + } + + private void deleteFileGroups(String partitionPath, Set fileIdsToDelete, HoodieInstant instant) { + try { + FileStatus[] statuses = metaClient.getFs().listStatus(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath)); + for (FileStatus status : statuses) { + String fileId = FSUtils.getFileIdFromFilePath(status.getPath()); + if (fileIdsToDelete.contains(fileId)) { + LOG.info("Delete " + status.getPath() + " to archive " + instant); + metaClient.getFs().delete(status.getPath()); + } + } + } catch (IOException e) { + LOG.error("unable to delete file groups that are replaced", e); + } + } + private void writeToFile(Schema wrapperSchema, List records) throws Exception { if (records.size() > 0) { Map header = new HashMap<>(); diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index 127cface3cc70..638779d46a0f8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -18,6 +18,8 @@ package org.apache.hudi.io; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -40,10 +42,13 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -208,6 +213,40 @@ public void testArchiveTableWithArchival() throws IOException { verifyInflightInstants(metaClient, 2); } + @Test + public void testArchiveTableWithReplacedFiles() throws IOException { + HoodieTestUtils.init(hadoopConf, basePath); + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) + .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + + int numCommits = 4; + int commitInstant = 100; + for (int i = 0; i < numCommits; i++) { + createCommitAndReplaceMetadata(commitInstant); + commitInstant += 100; + } + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); + HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, metaClient); + boolean result = archiveLog.archiveIfRequired(hadoopConf); + assertTrue(result); + + FileStatus[] allFiles = metaClient.getFs().listStatus(new Path(basePath + "/" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)); + Set allFileIds = Arrays.stream(allFiles).map(fs -> FSUtils.getFileIdFromFilePath(fs.getPath())).collect(Collectors.toSet()); + + // verify 100-1,200-1 are deleted by archival + assertFalse(allFileIds.contains("file-100-1")); + assertFalse(allFileIds.contains("file-200-1")); + assertTrue(allFileIds.contains("file-100-2")); + assertTrue(allFileIds.contains("file-200-2")); + assertTrue(allFileIds.contains("file-300-1")); + assertTrue(allFileIds.contains("file-400-1")); + } + @Test public void testArchiveTableWithNoArchival() throws IOException { HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath) @@ -430,4 +469,19 @@ public void testConvertCommitMetadata() { org.apache.hudi.avro.model.HoodieCommitMetadata expectedCommitMetadata = archiveLog.convertCommitMetadata(hoodieCommitMetadata); assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString()); } + + private void createCommitAndReplaceMetadata(int commitInstant) throws IOException { + String commitTime = "" + commitInstant; + String fileId1 = "file-" + commitInstant + "-1"; + String fileId2 = "file-" + commitInstant + "-2"; + + // create replace instant to mark fileId1 as deleted + Map> partitionToReplaceFiles = new HashMap<>(); + partitionToReplaceFiles.putIfAbsent(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, new ArrayList<>()); + partitionToReplaceFiles.get(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).add(fileId1); + HoodieTestUtils.createReplaceInstant(partitionToReplaceFiles, commitTime, metaClient); + HoodieTestDataGenerator.createCommitFile(basePath, commitTime, dfs.getConf()); + HoodieTestDataGenerator.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1, dfs.getConf()); + HoodieTestDataGenerator.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId2, dfs.getConf()); + } } diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java index 0eb88e24d2fab..a151f43642044 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieTestDataGenerator.java @@ -365,6 +365,14 @@ public static void createCompactionRequestedFile(String basePath, String instant createEmptyFile(basePath, commitFile, configuration); } + public static void createDataFile(String basePath, String partitionPath, String instantTime, String fileID, Configuration configuration) + throws IOException { + + Path dataFilePath = new Path(basePath + "/" + partitionPath + "/" + + FSUtils.makeDataFileName(instantTime, "1_0_1", fileID)); + createEmptyFile(basePath, dataFilePath, configuration); + } + private static void createEmptyFile(String basePath, Path filePath, Configuration configuration) throws IOException { FileSystem fs = FSUtils.getFs(basePath, configuration); FSDataOutputStream os = fs.create(filePath, true); diff --git a/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc b/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc new file mode 100644 index 0000000000000..689a3e0073210 --- /dev/null +++ b/hudi-common/src/main/avro/HoodieReplaceMetadata.avsc @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* + * Note that all 'replace' instants are read for every query + * So it is important to keep this small. Please be careful + * before tracking additional information in this file. + * This will be used for 'insert_overwrite' (RFC-18) and also 'clustering' (RFC-19) + */ +{"namespace": "org.apache.hudi.avro.model", + "type": "record", + "name": "HoodieReplaceMetadata", + "fields": [ + {"name": "totalFileGroupsReplaced", "type": "int"}, + {"name": "command", "type": "string"}, + {"name": "partitionMetadata", "type": { + "type" : "map", "values" : { + "type": "array", + "items": "string" + } + } + }, + { + "name":"version", + "type":["int", "null"], + "default": 1 + } + ] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 36e2b3de1e4ed..ae22a439a8b9f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -65,7 +65,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { COMMIT_EXTENSION, INFLIGHT_COMMIT_EXTENSION, REQUESTED_COMMIT_EXTENSION, DELTA_COMMIT_EXTENSION, INFLIGHT_DELTA_COMMIT_EXTENSION, REQUESTED_DELTA_COMMIT_EXTENSION, SAVEPOINT_EXTENSION, INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION, - INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION)); + INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, + INFLIGHT_REPLACE_EXTENSION, REPLACE_EXTENSION)); private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; @@ -304,6 +305,22 @@ public HoodieInstant transitionCleanRequestedToInflight(HoodieInstant requestedI return inflight; } + /** + * Transition Clean State from inflight to Committed. + * + * @param inflightInstant Inflight instant + * @param data Extra Metadata + * @return commit instant + */ + public HoodieInstant transitionReplaceInflightToComplete(HoodieInstant inflightInstant, Option data) { + ValidationUtils.checkArgument(inflightInstant.getAction().equals(HoodieTimeline.REPLACE_ACTION)); + ValidationUtils.checkArgument(inflightInstant.isInflight()); + HoodieInstant commitInstant = new HoodieInstant(State.COMPLETED, REPLACE_ACTION, inflightInstant.getTimestamp()); + // Then write to timeline + transitionState(inflightInstant, commitInstant, data); + return commitInstant; + } + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { transitionState(fromInstant, toInstant, data, false); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index c7a6230ffd93c..2406c3d7f72cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -113,6 +113,24 @@ public HoodieDefaultTimeline getCommitsAndCompactionTimeline() { return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); } + @Override + public HoodieDefaultTimeline getCommitsReplaceAndCompactionTimeline() { + Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_ACTION); + return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); + } + + @Override + public HoodieTimeline getCompletedAndReplaceTimeline() { + Set commitActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION); + Set validCommitTimes = instants.stream().filter(s -> s.isCompleted()) + .filter(s -> commitActions.contains(s.getAction())) + .map(s -> s.getTimestamp()).collect(Collectors.toSet()); + + return new HoodieDefaultTimeline(instants.stream().filter(s -> s.getAction().equals(REPLACE_ACTION)) + .filter(s -> s.isCompleted()) + .filter(instant -> validCommitTimes.contains(instant.getTimestamp())), details); + } + @Override public HoodieTimeline filterPendingCompactionTimeline() { return new HoodieDefaultTimeline( diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index aea414bc7e227..909e5f5bd1bd1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -162,6 +162,9 @@ public String getFileName() { } else if (HoodieTimeline.RESTORE_ACTION.equals(action)) { return isInflight() ? HoodieTimeline.makeInflightRestoreFileName(timestamp) : HoodieTimeline.makeRestoreFileName(timestamp); + } else if (HoodieTimeline.REPLACE_ACTION.equals(action)) { + return isInflight() ? HoodieTimeline.makeInflightReplaceFileName(timestamp) + : HoodieTimeline.makeReplaceFileName(timestamp); } throw new IllegalArgumentException("Cannot get file name for unknown action " + action); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 151c18bc28445..17a3fe979f33d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -48,6 +48,7 @@ public interface HoodieTimeline extends Serializable { String CLEAN_ACTION = "clean"; String ROLLBACK_ACTION = "rollback"; String SAVEPOINT_ACTION = "savepoint"; + String REPLACE_ACTION = "replace"; String INFLIGHT_EXTENSION = ".inflight"; // With Async Compaction, compaction instant can be in 3 states : // (compaction-requested), (compaction-inflight), (completed) @@ -57,7 +58,7 @@ public interface HoodieTimeline extends Serializable { String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION, - COMPACTION_ACTION}; + COMPACTION_ACTION, REPLACE_ACTION}; String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; @@ -78,6 +79,8 @@ public interface HoodieTimeline extends Serializable { String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION); String INFLIGHT_RESTORE_EXTENSION = "." + RESTORE_ACTION + INFLIGHT_EXTENSION; String RESTORE_EXTENSION = "." + RESTORE_ACTION; + String INFLIGHT_REPLACE_EXTENSION = "." + REPLACE_ACTION + INFLIGHT_EXTENSION; + String REPLACE_EXTENSION = "." + REPLACE_ACTION; String INVALID_INSTANT_TS = "0"; @@ -126,6 +129,21 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline getCommitsAndCompactionTimeline(); + /** + * Timeline to just include commits (commit/deltacommit), replace and compaction actions. + * + * @return + */ + HoodieDefaultTimeline getCommitsReplaceAndCompactionTimeline(); + + + /** + * Timeline to just include replace instants that have valid (commit/deltacommit) actions. + * + * @return + */ + HoodieTimeline getCompletedAndReplaceTimeline(); + /** * Filter this timeline to just include requested and inflight compaction instants. * @@ -348,6 +366,14 @@ static String makeInflightRestoreFileName(String instant) { return StringUtils.join(instant, HoodieTimeline.INFLIGHT_RESTORE_EXTENSION); } + static String makeReplaceFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.REPLACE_EXTENSION); + } + + static String makeInflightReplaceFileName(String instant) { + return StringUtils.join(instant, HoodieTimeline.INFLIGHT_REPLACE_EXTENSION); + } + static String makeDeltaFileName(String instantTime) { return instantTime + HoodieTimeline.DELTA_COMMIT_EXTENSION; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index f95bfc3427dd9..4e823002fa4c5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -21,6 +21,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata; @@ -109,6 +110,10 @@ public static Option serializeRestoreMetadata(HoodieRestoreMetadata rest return serializeAvroMetadata(restoreMetadata, HoodieRestoreMetadata.class); } + public static Option serializeReplaceMetadata(HoodieReplaceMetadata metadata) throws IOException { + return serializeAvroMetadata(metadata, HoodieReplaceMetadata.class); + } + public static Option serializeAvroMetadata(T metadata, Class clazz) throws IOException { DatumWriter datumWriter = new SpecificDatumWriter<>(clazz); @@ -140,6 +145,10 @@ public static HoodieSavepointMetadata deserializeHoodieSavepointMetadata(byte[] return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class); } + public static HoodieReplaceMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieReplaceMetadata.class); + } + public static T deserializeAvroMetadata(byte[] bytes, Class clazz) throws IOException { DatumReader reader = new SpecificDatumReader<>(clazz); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 4fdf20b1842e5..48534e139abb1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -18,6 +18,9 @@ package org.apache.hudi.common.table.view; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; @@ -28,20 +31,19 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -103,7 +105,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi * @param visibleActiveTimeline Visible Active Timeline */ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { - this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsAndCompactionTimeline(); + this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline.getCommitsReplaceAndCompactionTimeline(); } /** @@ -111,6 +113,11 @@ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { */ protected List addFilesToView(FileStatus[] statuses) { HoodieTimer timer = new HoodieTimer().startTimer(); + final Map> partitionFileIdsToExclude = getFileIdsToExclude(visibleCommitsAndCompactionTimeline); + partitionFileIdsToExclude.entrySet().stream().forEach(entry -> { + storePartitionExcludedFiles(entry.getKey(), entry.getValue()); + }); + List fileGroups = buildFileGroups(statuses, visibleCommitsAndCompactionTimeline, true); long fgBuildTimeTakenMs = timer.endTimer(); timer.startTimer(); @@ -156,28 +163,56 @@ protected List buildFileGroups(Stream baseFileS List fileGroups = new ArrayList<>(); fileIdSet.forEach(pair -> { String fileId = pair.getValue(); - HoodieFileGroup group = new HoodieFileGroup(pair.getKey(), fileId, timeline); - if (baseFiles.containsKey(pair)) { - baseFiles.get(pair).forEach(group::addBaseFile); - } - if (logFiles.containsKey(pair)) { - logFiles.get(pair).forEach(group::addLogFile); - } - if (addPendingCompactionFileSlice) { - Option> pendingCompaction = - getPendingCompactionOperationWithInstant(group.getFileGroupId()); - if (pendingCompaction.isPresent()) { - // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears - // so that any new ingestion uses the correct base-instant - group.addNewFileSliceAtInstant(pendingCompaction.get().getKey()); + String partitionPath = pair.getKey(); + if (isExcludeFileGroup(partitionPath, fileId)) { + LOG.info("excluding file " + fileId + " from view for partition " + partitionPath); + } else { + HoodieFileGroup group = new HoodieFileGroup(partitionPath, fileId, timeline); + if (baseFiles.containsKey(pair)) { + baseFiles.get(pair).forEach(group::addBaseFile); + } + if (logFiles.containsKey(pair)) { + logFiles.get(pair).forEach(group::addLogFile); + } + if (addPendingCompactionFileSlice) { + Option> pendingCompaction = + getPendingCompactionOperationWithInstant(group.getFileGroupId()); + if (pendingCompaction.isPresent()) { + // If there is no delta-commit after compaction request, this step would ensure a new file-slice appears + // so that any new ingestion uses the correct base-instant + group.addNewFileSliceAtInstant(pendingCompaction.get().getKey()); + } } + fileGroups.add(group); } - fileGroups.add(group); }); return fileGroups; } + /** + * Get file groups to exclude by looking at all commit instants. + */ + private Map> getFileIdsToExclude(HoodieTimeline timeline) { + // for each REPLACE instant, get map of (partitionPath -> deleteFileGroup) + Stream>> resultStream = timeline.getCompletedAndReplaceTimeline().getInstants().flatMap(instant -> { + try { + HoodieReplaceMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata( + timeline.getInstantDetails(instant).get()); + + // get delete file groups for each partition + return replaceMetadata.getPartitionMetadata().entrySet().stream() + .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), new HashSet<>(entry.getValue()))); + } catch (IOException e) { + throw new HoodieIOException("error reading commit metadata for " + instant); + } + }); + + // merge all maps + return resultStream.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue(), + (v1, v2) -> Stream.concat(v1.stream(), v2.stream()).collect(Collectors.toSet()))); + } + /** * Clears the partition Map and reset view states. */ @@ -623,6 +658,11 @@ protected abstract Option> getPendingCompactio */ abstract Stream fetchAllStoredFileGroups(); + abstract boolean isExcludeFileGroup(String partitionPath, String fileId); + + protected abstract void storePartitionExcludedFiles(final String partition, final Set fileIdsToExclude); + + /** * Check if the view is already closed. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java index 56ae22d5cd43a..69a0cf0a1cf9a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java @@ -34,8 +34,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,6 +54,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem // mapping from partition paths to file groups contained within them protected Map> partitionToFileGroupsMap; + protected Map> partitionToExcludeFileGroupsMap; /** * PartitionPath + File-Id to pending compaction instant time. @@ -86,6 +89,7 @@ public HoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimelin @Override public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) { this.partitionToFileGroupsMap = createPartitionToFileGroups(); + this.partitionToExcludeFileGroupsMap = createPartitionToExcludeFileGroups(); super.init(metaClient, visibleActiveTimeline); } @@ -99,12 +103,17 @@ public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveT protected void resetViewState() { this.fgIdToPendingCompaction = null; this.partitionToFileGroupsMap = null; + this.partitionToExcludeFileGroupsMap = null; } protected Map> createPartitionToFileGroups() { return new ConcurrentHashMap<>(); } + protected Map> createPartitionToExcludeFileGroups() { + return new ConcurrentHashMap<>(); + } + protected Map> createFileIdToPendingCompactionMap( Map> fileIdToPendingCompaction) { return fileIdToPendingCompaction; @@ -202,11 +211,27 @@ protected void storePartitionView(String partitionPath, List fi partitionToFileGroupsMap.put(partitionPath, newList); } + @Override + protected void storePartitionExcludedFiles(final String partition, final Set fileIdsToExclude) { + LOG.info("Ignore file-ids for partition :" + partition + ", #FileGroups=" + fileIdsToExclude.size()); + partitionToExcludeFileGroupsMap.put(partition, fileIdsToExclude); + } + @Override public Stream fetchAllStoredFileGroups() { return partitionToFileGroupsMap.values().stream().flatMap(Collection::stream); } + @Override + public boolean isExcludeFileGroup(String partitionPath, String fileId) { + return partitionToExcludeFileGroupsMap.getOrDefault(partitionPath, Collections.emptySet()).contains(fileId); + } + + @Override + public Stream getAllExcludeFileGroups(String partitionPath) { + return partitionToExcludeFileGroupsMap.getOrDefault(partitionPath, Collections.emptySet()).stream(); + } + @Override public void close() { closed = true; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 200d4216914cf..3a6710c071604 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -193,6 +193,11 @@ public Stream getAllFileGroups(String partitionPath) { return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } + @Override + public Stream getAllExcludeFileGroups(final String partitionPath) { + return execute(partitionPath, preferredView::getAllExcludeFileGroups, secondaryView::getAllExcludeFileGroups); + } + @Override public Stream> getPendingCompactionOperations() { return execute(preferredView::getPendingCompactionOperations, secondaryView::getPendingCompactionOperations); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index c99cc98861bba..1607452263293 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -18,6 +18,11 @@ package org.apache.hudi.common.table.view; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.client.utils.URIBuilder; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -36,12 +41,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieRemoteException; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.http.client.fluent.Request; -import org.apache.http.client.fluent.Response; -import org.apache.http.client.utils.URIBuilder; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -87,6 +86,9 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = String.format("%s/%s", BASE_URL, "filegroups/all/partition/"); + public static final String ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL = + String.format("%s/%s", BASE_URL, "filegroups/exclude/partition/"); + public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last"); public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last"); @@ -355,6 +357,18 @@ public Stream getAllFileGroups(String partitionPath) { } } + @Override + public Stream getAllExcludeFileGroups(final String partitionPath) { + Map paramsMap = getParamsWithPartitionPath(partitionPath); + try { + List fileGroups = executeRequest(ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL, paramsMap, + new TypeReference>() {}, RequestMethod.GET); + return fileGroups.stream(); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + public boolean refresh() { Map paramsMap = getParams(); try { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index 00172d714ae5a..b26a4b61d490d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -38,10 +38,13 @@ import org.apache.log4j.Logger; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -68,10 +71,13 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste private boolean closed = false; + protected Map> partitionToExcludeFileGroupsMap; + public RocksDbBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileSystemViewStorageConfig config) { super(config.isIncrementalTimelineSyncEnabled()); this.config = config; + this.partitionToExcludeFileGroupsMap = new ConcurrentHashMap<>(); this.schemaHelper = new RocksDBSchemaHelper(metaClient); this.rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); init(metaClient, visibleActiveTimeline); @@ -187,6 +193,25 @@ protected void storePartitionView(String partitionPath, List fi + config.getRocksdbBasePath() + ", Total file-groups=" + fileGroups.size()); } + @Override + protected void storePartitionExcludedFiles(final String partition, final Set fileIdsToExclude) { + //TODO: should we change this to store info in rocksdb instead of in-memory representation? + // this is expected to be small size, so probably not needed. + LOG.info("Ignore file-ids for partition :" + partition + ", #FileGroups=" + fileIdsToExclude.size()); + partitionToExcludeFileGroupsMap.put(partition, fileIdsToExclude); + + } + + @Override + public boolean isExcludeFileGroup(String partitionPath, String fileId) { + return partitionToExcludeFileGroupsMap.getOrDefault(partitionPath, Collections.emptySet()).contains(fileId); + } + + @Override + public Stream getAllExcludeFileGroups(String partitionPath) { + return partitionToExcludeFileGroupsMap.getOrDefault(partitionPath, Collections.emptySet()).stream(); + } + @Override /* * This is overridden to incrementally apply file-slices to rocks DB diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 4ae4ee4442e3e..5a43ba00690c0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; /** @@ -76,6 +77,13 @@ protected Map> createPartitionToFileGroups() { } } + @Override + protected Map> createPartitionToExcludeFileGroups() { + // TODO should we create another spillable directory under baseStoreDir? + // the exclude file group is expected to be small, so use parent class in-memory representation + return super.createPartitionToExcludeFileGroups(); + } + @Override protected Map> createFileIdToPendingCompactionMap( Map> fgIdToPendingCompaction) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 414f8e906e266..0719f52f6155b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -148,6 +148,9 @@ interface SliceView extends SliceViewWithLatestSlice { */ Stream getAllFileGroups(String partitionPath); + Stream getAllExcludeFileGroups(String partitionPath); + + /** * Return Pending Compaction Operations. * diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 34c705516e077..019f8a4dd20a3 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -399,6 +399,43 @@ public void testFiltering() { .forEach(i -> assertFalse(t2.containsInstant(i))); } + @Test + public void testReplaceActionsTimeline() { + int instantTime = 1; + List allInstants = new ArrayList<>(); + HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + + // create replace instant with maching commit instant + String replaceInstant = instant.getTimestamp(); + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, replaceInstant); + allInstants.add(instant); + + //replace instant with no matching commit + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, "999"); + allInstants.add(instant); + + HoodieInstant inflightCommitInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant); + + // replace instant with matching commit but inflight + instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_ACTION, inflightCommitInstant.getTimestamp()); + allInstants.add(instant); + + timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(allInstants); + List validReplaceInstants = + timeline.getCompletedAndReplaceTimeline().getInstants().collect(Collectors.toList()); + + assertEquals(1, validReplaceInstants.size()); + assertEquals(replaceInstant, validReplaceInstants.get(0).getTimestamp()); + assertEquals(HoodieTimeline.REPLACE_ACTION, validReplaceInstants.get(0).getAction()); + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant @@ -416,7 +453,7 @@ private List getAllInstants() { // not be generating them. if (state == State.REQUESTED) { if (action.equals(HoodieTimeline.SAVEPOINT_ACTION) || action.equals(HoodieTimeline.RESTORE_ACTION) - || action.equals(HoodieTimeline.ROLLBACK_ACTION)) { + || action.equals(HoodieTimeline.ROLLBACK_ACTION) || action.equals(HoodieTimeline.REPLACE_ACTION)) { continue; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 5e833791067cb..79346c1dc929f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; @@ -51,6 +52,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -1163,6 +1165,150 @@ private static void saveAsComplete(HoodieActiveTimeline timeline, HoodieInstant } } + @Test + public void testReplaceWithTimeTravel() throws IOException { + String partitionPath1 = "2020/06/27"; + new File(basePath + "/" + partitionPath1).mkdirs(); + + // create 2 fileId in partition1 - fileId1 is replaced later on. + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + + assertFalse(roView.getLatestBaseFiles(partitionPath1) + .anyMatch(dfile -> dfile.getFileId().equals(fileId1) || dfile.getFileId().equals(fileId2)), + "No commit, should not find any data file"); + // Only one commit + String commitTime1 = "1"; + String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile(); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + saveAsComplete(commitTimeline, instant1, Option.empty()); + refreshFsView(); + assertEquals(1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId1)).count()); + assertEquals(1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId2)).count()); + + // create commit2 - fileId1 is replaced by fileId3,fileId4 + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + String fileName3 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId3); + String fileName4 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName4).createNewFile(); + + String commitTime2 = "2"; + Map> partitionToReplaceFiles = new HashMap<>(); + partitionToReplaceFiles.putIfAbsent(partitionPath1, new ArrayList<>()); + partitionToReplaceFiles.get(partitionPath1).add(fileId1); + HoodieTestUtils.createReplaceInstant(partitionToReplaceFiles, commitTime2, metaClient); + commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime2); + saveAsComplete(commitTimeline, instant2, Option.empty()); + + //make sure view doesnt include fileId1 + refreshFsView(); + assertEquals(0, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId1)).count()); + assertEquals(1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId2)).count()); + assertEquals(1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId3)).count()); + assertEquals(1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId4)).count()); + + //exclude commit 2 and make sure fileId1 shows up in view. + BaseFileOnlyView filteredView = getFileSystemView(metaClient.getActiveTimeline().findInstantsBefore("2")); + assertEquals(1, filteredView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId1)).count()); + assertEquals(1, filteredView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId2)).count()); + assertEquals(1, filteredView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId3)).count()); + assertEquals(1, filteredView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId4)).count()); + } + + @Test + public void testReplaceFileIdIsExcludedInView() throws IOException { + String partitionPath1 = "2020/06/27"; + String partitionPath2 = "2020/07/14"; + new File(basePath + "/" + partitionPath1).mkdirs(); + new File(basePath + "/" + partitionPath2).mkdirs(); + + // create 2 fileId in partition1 - fileId1 is replaced later on. + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + + // create 2 fileId in partition2 - fileId3, fileId4 is replaced later on. + String fileId3 = UUID.randomUUID().toString(); + String fileId4 = UUID.randomUUID().toString(); + + assertFalse(roView.getLatestBaseFiles(partitionPath1) + .anyMatch(dfile -> dfile.getFileId().equals(fileId1) || dfile.getFileId().equals(fileId2)), + "No commit, should not find any data file"); + assertFalse(roView.getLatestBaseFiles(partitionPath2) + .anyMatch(dfile -> dfile.getFileId().equals(fileId3) || dfile.getFileId().equals(fileId4)), + "No commit, should not find any data file"); + + // Only one commit + String commitTime1 = "1"; + String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1); + String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2); + String fileName3 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId3); + String fileName4 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId4); + new File(basePath + "/" + partitionPath1 + "/" + fileName1).createNewFile(); + new File(basePath + "/" + partitionPath1 + "/" + fileName2).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile(); + new File(basePath + "/" + partitionPath2 + "/" + fileName4).createNewFile(); + + Map> partitionToReplaceFiles = new HashMap<>(); + partitionToReplaceFiles.putIfAbsent(partitionPath1, new ArrayList<>()); + partitionToReplaceFiles.putIfAbsent(partitionPath2, new ArrayList<>()); + partitionToReplaceFiles.get(partitionPath1).add(fileId1); + partitionToReplaceFiles.get(partitionPath2).add(fileId3); + partitionToReplaceFiles.get(partitionPath2).add(fileId4); + HoodieTestUtils.createReplaceInstant(partitionToReplaceFiles, commitTime1, metaClient); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1); + saveAsComplete(commitTimeline, instant1, Option.empty()); + refreshFsView(); + assertEquals(0, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId1)).count()); + assertEquals(fileName2, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId2)).findFirst().get().getFileName()); + assertEquals(0, roView.getLatestBaseFiles(partitionPath2) + .filter(dfile -> dfile.getFileId().equals(fileId3)).count()); + assertEquals(0, roView.getLatestBaseFiles(partitionPath2) + .filter(dfile -> dfile.getFileId().equals(fileId4)).count()); + + assertEquals(1, fsView.getAllExcludeFileGroups(partitionPath1).count()); + assertEquals(fileId1, fsView.getAllExcludeFileGroups(partitionPath1).findFirst().get()); + assertEquals(2, fsView.getAllExcludeFileGroups(partitionPath2).count()); + + //remove replace instant + HoodieInstant replaceInstant = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline().firstInstant().get(); + assertTrue(new File(metaClient.getMetaPath() + "/" + replaceInstant.getFileName()).delete()); + refreshFsView(); + // verify fileId1 shows up in timeline + assertEquals(fileName1, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId1)).findFirst().get().getFileName()); + assertEquals(fileName2, roView.getLatestBaseFiles(partitionPath1) + .filter(dfile -> dfile.getFileId().equals(fileId2)).findFirst().get().getFileName()); + assertEquals(fileName3, roView.getLatestBaseFiles(partitionPath2) + .filter(dfile -> dfile.getFileId().equals(fileId3)).findFirst().get().getFileName()); + assertEquals(fileName4, roView.getLatestBaseFiles(partitionPath2) + .filter(dfile -> dfile.getFileId().equals(fileId4)).findFirst().get().getFileName()); + + assertEquals(0, fsView.getAllExcludeFileGroups(partitionPath1).count()); + assertEquals(0, fsView.getAllExcludeFileGroups(partitionPath2).count()); + } + @Override protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index caae246eccf51..8099fc05ba0de 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -205,6 +206,21 @@ public static void createInflightCommitFiles(String basePath, String... instantT } } + public static void createReplaceInstant(Map> partitionToReplaceFiles, + String instantTime, + HoodieTableMetaClient metaClient) throws IOException { + HoodieReplaceMetadata replaceMetadata = new HoodieReplaceMetadata(); + replaceMetadata.setVersion(1); + replaceMetadata.setPartitionMetadata(partitionToReplaceFiles); + replaceMetadata.setTotalFileGroupsReplaced(1); + replaceMetadata.setCommand("test"); + + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant replaceInstant = new HoodieInstant(true, HoodieTimeline.REPLACE_ACTION, instantTime); + commitTimeline.createNewInstant(replaceInstant); + commitTimeline.transitionReplaceInflightToComplete(replaceInstant, TimelineMetadataUtils.serializeReplaceMetadata(replaceMetadata)); + } + public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) { for (String instantTime : instantTimes) { Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime), diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java index 9d6c1b81b044c..d7dcbd49f58c6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java @@ -19,12 +19,23 @@ package org.apache.hudi.common.util; import org.apache.avro.util.Utf8; +import org.apache.hudi.avro.model.HoodieReplaceMetadata; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.junit.jupiter.api.Test; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Random; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -52,6 +63,38 @@ public void testSerDeser() throws IOException { verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); } + @Test + public void testSerDeserReplaceMetadata() throws Exception { + Random r = new Random(); + int numPartitions = 1; + HoodieReplaceMetadata replaceMetadata = new HoodieReplaceMetadata(); + int numFiles = 30; + Map> partitionToReplaceFileId = new HashMap<>(); + + for (int i = 0; i < numFiles; i++) { + int partition = r.nextInt(numPartitions); + String partitionStr = "2020/07/0" + partition; + partitionToReplaceFileId.putIfAbsent(partitionStr, new ArrayList<>()); + partitionToReplaceFileId.get(partitionStr).add(FSUtils.createNewFileIdPfx()); + } + String command = "clustering"; + replaceMetadata.setVersion(1); + replaceMetadata.setTotalFileGroupsReplaced(numFiles); + replaceMetadata.setCommand(command); + replaceMetadata.setPartitionMetadata(partitionToReplaceFileId); + String filePath = "/tmp/myreplacetest"; + byte[] data = TimelineMetadataUtils.serializeReplaceMetadata(replaceMetadata).get(); + FileOutputStream fos = new FileOutputStream(filePath); + fos.write(data); + fos.close(); + + data = Files.readAllBytes(Paths.get(filePath)); + HoodieReplaceMetadata replaceMetadataD = TimelineMetadataUtils.deserializeHoodieReplaceMetadata(data); + assertEquals(replaceMetadataD.getTotalFileGroupsReplaced(), numFiles); + assertEquals(command, replaceMetadataD.getCommand()); + assertEquals(partitionToReplaceFileId, replaceMetadataD.getPartitionMetadata()); + } + private void verifyObject(T expectedValue) throws IOException { byte[] serializedObject = SerializationUtils.serialize(expectedValue); assertNotNull(serializedObject); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index 683eb0658ced6..1b5a12e640620 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -284,6 +284,13 @@ private void registerFileSlicesAPI() { writeValueAsString(ctx, dtos); }, true)); + app.get(RemoteHoodieTableFileSystemView.ALL_EXCLUDE_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> { + List excludeFileGroups = sliceHandler.getExcludeFileGroups( + ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), + ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + writeValueAsString(ctx, excludeFileGroups); + }, true)); + app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> { boolean success = sliceHandler .refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index c6c3afcebed25..e8e05818e9ae4 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -89,6 +89,10 @@ public List getAllFileGroups(String basePath, String partitionPath .collect(Collectors.toList()); } + public List getExcludeFileGroups(String basePath, String partitionPath) { + return viewManager.getFileSystemView(basePath).getAllExcludeFileGroups(partitionPath).collect(Collectors.toList()); + } + public boolean refreshTable(String basePath) { viewManager.clearFileSystemView(basePath); return true;