From 867d1a1df76f088f3a47a1ee7e5b86d1c8b750e5 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Tue, 13 Sep 2022 13:28:35 +0530 Subject: [PATCH 1/4] [HUDI-4832] Fix drop partition meta sync --- .../metadata/HoodieTableMetadataUtil.java | 18 ------- .../org/apache/hudi/hive/HiveSyncTool.java | 12 ++--- .../apache/hudi/hive/TestHiveSyncTool.java | 33 ++++++++---- .../hudi/hive/testutils/HiveTestUtil.java | 2 +- .../hudi/sync/common/HoodieSyncClient.java | 50 +++++++++++++++---- 5 files changed, 72 insertions(+), 43 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index c7a0df5d6ad33..19634700fb927 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -1357,22 +1357,4 @@ public static Set getInflightAndCompletedMetadataPartitions(HoodieTableC inflightAndCompletedPartitions.addAll(tableConfig.getMetadataPartitions()); return inflightAndCompletedPartitions; } - - /** - * Get Last commit's Metadata. - */ - public static Option getLatestCommitMetadata(HoodieTableMetaClient metaClient) { - try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - HoodieInstant instant = timeline.lastInstant().get(); - byte[] data = timeline.getInstantDetails(instant).get(); - return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); - } else { - return Option.empty(); - } - } catch (Exception e) { - throw new HoodieException("Failed to get commit metadata", e); - } - } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 0374686b7166b..2d81b818a032f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; @@ -199,9 +200,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Check if the necessary table exists boolean tableExists = syncClient.tableExists(tableName); - // check if isDropPartition - boolean isDropPartition = syncClient.isDropPartition(); - // Get the parquet schema for this table looking at the latest commit MessageType schema = syncClient.getStorageSchema(); @@ -229,7 +227,9 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition); + // find dropped partitions, if any, in the latest commit + Set droppedPartitions = syncClient.getDroppedPartitions(); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { syncClient.updateLastCommitTimeSynced(tableName); @@ -310,12 +310,12 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). */ - private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDropPartition) { + private boolean syncPartitions(String tableName, List writtenPartitionsSince, Set droppedPartitions) { boolean partitionsChanged; try { List hivePartitions = syncClient.getAllPartitions(tableName); List partitionEvents = - syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition); + syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions); List newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); if (!newPartitions.isEmpty()) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 0673e08489ef7..f532fec6dc38d 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -88,8 +88,8 @@ public class TestHiveSyncTool { private static final List SYNC_MODES = Arrays.asList( - "hiveql", - "hms", + /*"hiveql", + "hms",*/ "jdbc"); private static Iterable syncMode() { @@ -198,7 +198,7 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); @@ -478,7 +478,7 @@ public void testSyncIncremental(String syncMode) throws Exception { List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -757,7 +757,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -854,17 +854,32 @@ public void testDropPartition(String syncMode) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); + // add a partition but do not sync + String instantTime2 = "101"; + String newPartition = "2010/02/01"; + HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2); + HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME); + partitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(1, partitions.size(), + "Table partitions should match the number of partitions we wrote"); + assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); + + // create a replace commit to delete current partitions String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/"); - // create a replace commit to delete current partitions+ - HiveTestUtil.createReplaceCommit("101", partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true); + String instantTime3 = "102"; + HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true); // sync drop partitions reinitHiveSyncClient(); reSyncHiveTable(); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - assertEquals(0, hivePartitions.size(), - "Table should have 0 partition because of the drop the only one partition"); + assertEquals(1, hivePartitions.size(), + "Table should have 1 partition that was added for instant " + instantTime2); + assertEquals(newPartition, hivePartitions.get(0).getValues().get(0).replace("-", "/")); + assertEquals(instantTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), + "The last commit that was synced should be updated in the TBLPROPERTIES"); } @ParameterizedTest diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index f3c8b3da5e380..a58f835dab29e 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -493,7 +493,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String fsout.close(); } - public static void createReplaceCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { + public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeReplaceFileName(instantTime)); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 32ade18d08117..4156329b4148d 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -21,15 +21,17 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -40,9 +42,11 @@ import org.apache.parquet.schema.MessageType; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; @@ -83,18 +87,24 @@ public boolean isBootstrap() { return metaClient.getTableConfig().getBootstrapBasePath().isPresent(); } - public boolean isDropPartition() { + /** + * Get the set of dropped partitions based on the latest commit metadata. + * Returns empty set if the latest commit was not due to DELETE_PARTITION operation. + */ + public Set getDroppedPartitions() { try { - Option hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient); + Option hoodieCommitMetadata = getLatestCommitMetadata(metaClient); if (hoodieCommitMetadata.isPresent() && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) { - return true; + Map> partitionToReplaceFileIds = + ((HoodieReplaceCommitMetadata) hoodieCommitMetadata.get()).getPartitionToReplaceFileIds(); + return partitionToReplaceFileIds.keySet(); } } catch (Exception e) { throw new HoodieSyncException("Failed to get commit metadata", e); } - return false; + return Collections.emptySet(); } @Override @@ -118,8 +128,11 @@ public List getPartitionsWrittenToSince(Option lastCommitTimeSyn config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); + return TimelineUtils.getPartitionsWritten( + metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) + .mergeTimeline(metaClient.getActiveTimeline()) + .getCommitsTimeline() + .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); } } @@ -127,7 +140,7 @@ public List getPartitionsWrittenToSince(Option lastCommitTimeSyn * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. * Generate a list of PartitionEvent based on the changes required. */ - public List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDropPartition) { + public List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, Set droppedPartitions) { Map paths = new HashMap<>(); for (Partition tablePartition : tablePartitions) { List hivePartitionValues = tablePartition.getValues(); @@ -143,7 +156,7 @@ public List getPartitionEvents(List tablePartitions, // Check if the partition values or if hdfs path is the same List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (isDropPartition) { + if (droppedPartitions.contains(storagePartition)) { events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); } else { if (!storagePartitionValues.isEmpty()) { @@ -158,4 +171,23 @@ public List getPartitionEvents(List tablePartitions, } return events; } + + /** + * Get Last commit's Metadata. + */ + private static Option getLatestCommitMetadata(HoodieTableMetaClient metaClient) { + try { + HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + if (timeline.lastInstant().isPresent()) { + HoodieInstant instant = timeline.lastInstant().get(); + byte[] data = timeline.getInstantDetails(instant).get(); + return HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) ? Option.of(HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class)) : + Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); + } else { + return Option.empty(); + } + } catch (Exception e) { + throw new HoodieException("Failed to get commit metadata", e); + } + } } From 808b4b33eac28357d213c7b2fd3a0d1a5b250dfd Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Thu, 15 Sep 2022 18:53:49 +0530 Subject: [PATCH 2/4] Address feedback --- .../src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index f532fec6dc38d..c3931b7c3b877 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -88,8 +88,8 @@ public class TestHiveSyncTool { private static final List SYNC_MODES = Arrays.asList( - /*"hiveql", - "hms",*/ + "hiveql", + "hms", "jdbc"); private static Iterable syncMode() { From a6b676c8a983aad9ef485d73ec1dc7dd462a055a Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 16 Sep 2022 13:43:05 +0530 Subject: [PATCH 3/4] Pick dropped partitions since last synced commit --- .../common/table/timeline/TimelineUtils.java | 24 ++++++++++++++++ .../org/apache/hudi/hive/HiveSyncTool.java | 2 +- .../apache/hudi/hive/TestHiveSyncTool.java | 13 +++++---- .../hudi/sync/common/HoodieSyncClient.java | 28 +++++++------------ 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index 9f5f4c23d0761..d4ccb591b0622 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -57,6 +58,29 @@ public static List getPartitionsWritten(HoodieTimeline timeline) { return getAffectedPartitions(timelineToSync); } + /** + * Returns partitions that have been deleted or marked for deletion in the given timeline. + * Does not include internal operations such as clean in the timeline. + */ + public static List getPartitionsDropped(HoodieTimeline timeline) { + HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline(); + + return replaceCommitTimeline.getInstants().flatMap(instant -> { + try { + HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes( + replaceCommitTimeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + if (WriteOperationType.DELETE_PARTITION.equals(commitMetadata.getOperationType())) { + Map> partitionToReplaceFileIds = commitMetadata.getPartitionToReplaceFileIds(); + return partitionToReplaceFileIds.keySet().stream(); + } else { + return Stream.empty(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to get partitions modified at " + instant, e); + } + }).distinct().filter(partition -> !partition.isEmpty()).collect(Collectors.toList()); + } + /** * Returns partitions that have been modified including internal operations such as clean in the passed timeline. */ diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 2d81b818a032f..919a3783c5ebe 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -228,7 +228,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // Sync the partitions if needed // find dropped partitions, if any, in the latest commit - Set droppedPartitions = syncClient.getDroppedPartitions(); + Set droppedPartitions = syncClient.getDroppedPartitions(lastCommitTimeSynced); boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index c3931b7c3b877..f941349925661 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -865,20 +865,21 @@ public void testDropPartition(String syncMode) throws Exception { assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); - // create a replace commit to delete current partitions + // create two replace commits to delete current partitions, but do not sync in between String partitiontoDelete = partitions.get(0).getValues().get(0).replace("-", "/"); String instantTime3 = "102"; HiveTestUtil.createReplaceCommit(instantTime3, partitiontoDelete, WriteOperationType.DELETE_PARTITION, true, true); + String instantTime4 = "103"; + HiveTestUtil.createReplaceCommit(instantTime4, newPartition, WriteOperationType.DELETE_PARTITION, true, true); - // sync drop partitions + // now run hive sync reinitHiveSyncClient(); reSyncHiveTable(); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - assertEquals(1, hivePartitions.size(), - "Table should have 1 partition that was added for instant " + instantTime2); - assertEquals(newPartition, hivePartitions.get(0).getValues().get(0).replace("-", "/")); - assertEquals(instantTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), + assertEquals(0, hivePartitions.size(), + "Table should have no partitions"); + assertEquals(instantTime4, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 4156329b4148d..48efd0af31cb5 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -42,8 +41,8 @@ import org.apache.parquet.schema.MessageType; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -88,23 +87,16 @@ public boolean isBootstrap() { } /** - * Get the set of dropped partitions based on the latest commit metadata. - * Returns empty set if the latest commit was not due to DELETE_PARTITION operation. + * Get the set of dropped partitions since the last synced commit. + * If last sync time is not known then consider only active timeline. + * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. */ - public Set getDroppedPartitions() { - try { - Option hoodieCommitMetadata = getLatestCommitMetadata(metaClient); - - if (hoodieCommitMetadata.isPresent() - && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) { - Map> partitionToReplaceFileIds = - ((HoodieReplaceCommitMetadata) hoodieCommitMetadata.get()).getPartitionToReplaceFileIds(); - return partitionToReplaceFileIds.keySet(); - } - } catch (Exception e) { - throw new HoodieSyncException("Failed to get commit metadata", e); - } - return Collections.emptySet(); + public Set getDroppedPartitions(Option lastCommitTimeSynced) { + HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) + .mergeTimeline(metaClient.getActiveTimeline()) + .getCommitsTimeline() + .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); + return new HashSet<>(TimelineUtils.getPartitionsDropped(timeline)); } @Override From 05774542e7e99184781292e69747f7b20de24e22 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 19 Sep 2022 11:54:05 +0530 Subject: [PATCH 4/4] Address feedback, remove unused method, rename methods --- .../common/table/timeline/TimelineUtils.java | 4 +-- .../hudi/common/table/TestTimelineUtils.java | 4 +-- .../org/apache/hudi/sync/adb/AdbSyncTool.java | 2 +- .../org/apache/hudi/hive/HiveSyncTool.java | 4 +-- .../apache/hudi/hive/TestHiveSyncTool.java | 8 ++--- .../hudi/sync/common/HoodieSyncClient.java | 31 +++---------------- 6 files changed, 15 insertions(+), 38 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index d4ccb591b0622..75493e7b463e1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -53,7 +53,7 @@ public class TimelineUtils { * Returns partitions that have new data strictly after commitTime. * Does not include internal operations such as clean in the timeline. */ - public static List getPartitionsWritten(HoodieTimeline timeline) { + public static List getWrittenPartitions(HoodieTimeline timeline) { HoodieTimeline timelineToSync = timeline.getWriteTimeline(); return getAffectedPartitions(timelineToSync); } @@ -62,7 +62,7 @@ public static List getPartitionsWritten(HoodieTimeline timeline) { * Returns partitions that have been deleted or marked for deletion in the given timeline. * Does not include internal operations such as clean in the timeline. */ - public static List getPartitionsDropped(HoodieTimeline timeline) { + public static List getDroppedPartitions(HoodieTimeline timeline) { HoodieTimeline replaceCommitTimeline = timeline.getWriteTimeline().filterCompletedInstants().getCompletedReplaceTimeline(); return replaceCommitTimeline.getInstants().flatMap(instant -> { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 380c4c5212553..da078372b5c3b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -130,11 +130,11 @@ public void testGetPartitions() throws IOException { assertEquals(partitions, Arrays.asList(new String[] {"0", "2", "3", "4"})); // verify only commit actions - partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); + partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10)); assertEquals(4, partitions.size()); assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4", "5"})); - partitions = TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); + partitions = TimelineUtils.getWrittenPartitions(metaClient.getActiveTimeline().findInstantsInRange("1", "4")); assertEquals(3, partitions.size()); assertEquals(partitions, Arrays.asList(new String[] {"2", "3", "4"})); } diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java index 1c578b102cfa0..5f4a36631a22d 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -194,7 +194,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { writtenPartitionsSince = new ArrayList<>(); } else { - writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); } LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size()); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 919a3783c5ebe..adfe52f920d1c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -223,12 +223,12 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName); } LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); - List writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + List writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); // Sync the partitions if needed // find dropped partitions, if any, in the latest commit - Set droppedPartitions = syncClient.getDroppedPartitions(lastCommitTimeSynced); + Set droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced); boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index f941349925661..1d454786859e9 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -197,7 +197,7 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty()); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, @@ -475,7 +475,7 @@ public void testSyncIncremental(String syncMode) throws Exception { // Lets do the sync reSyncHiveTable(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(commitTime1)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); @@ -754,7 +754,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); reinitHiveSyncClient(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + List writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.of(instantTime)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, Collections.emptySet()); @@ -784,7 +784,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); - assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); + assertEquals(1, hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size()); } @ParameterizedTest diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 48efd0af31cb5..af06f5908ce39 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -20,17 +20,13 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -91,12 +87,12 @@ public boolean isBootstrap() { * If last sync time is not known then consider only active timeline. * Going through archive timeline is a costly operation, and it should be avoided unless some start time is given. */ - public Set getDroppedPartitions(Option lastCommitTimeSynced) { + public Set getDroppedPartitionsSince(Option lastCommitTimeSynced) { HoodieTimeline timeline = lastCommitTimeSynced.isPresent() ? metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) .mergeTimeline(metaClient.getActiveTimeline()) .getCommitsTimeline() .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE) : metaClient.getActiveTimeline(); - return new HashSet<>(TimelineUtils.getPartitionsDropped(timeline)); + return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline)); } @Override @@ -108,7 +104,7 @@ public MessageType getStorageSchema() { } } - public List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { + public List getWrittenPartitionsSince(Option lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + config.getString(META_SYNC_BASE_PATH) @@ -120,7 +116,7 @@ public List getPartitionsWrittenToSince(Option lastCommitTimeSyn config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - return TimelineUtils.getPartitionsWritten( + return TimelineUtils.getWrittenPartitions( metaClient.getArchivedTimeline(lastCommitTimeSynced.get()) .mergeTimeline(metaClient.getActiveTimeline()) .getCommitsTimeline() @@ -163,23 +159,4 @@ public List getPartitionEvents(List tablePartitions, } return events; } - - /** - * Get Last commit's Metadata. - */ - private static Option getLatestCommitMetadata(HoodieTableMetaClient metaClient) { - try { - HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - if (timeline.lastInstant().isPresent()) { - HoodieInstant instant = timeline.lastInstant().get(); - byte[] data = timeline.getInstantDetails(instant).get(); - return HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()) ? Option.of(HoodieReplaceCommitMetadata.fromBytes(data, HoodieReplaceCommitMetadata.class)) : - Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); - } else { - return Option.empty(); - } - } catch (Exception e) { - throw new HoodieException("Failed to get commit metadata", e); - } - } }