diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 79651154ed63b..1ad38d8f9996d 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.aws.sync; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; @@ -380,16 +381,22 @@ public void close() { } @Override - public void updateLastCommitTimeSynced(String tableName) { - if (!getActiveTimeline().lastInstant().isPresent()) { - LOG.warn("No commit in active timeline."); - return; + public void updateLastCommitTimeSynced(String tableName, Option hoodieInstantOption) { + + Option lastCommitSynced; + if (hoodieInstantOption.isPresent()) { + lastCommitSynced = hoodieInstantOption; + } else { + lastCommitSynced = getActiveTimeline().lastInstant(); } - final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp(); - try { - updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false); - } catch (Exception e) { - throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); + if (lastCommitSynced.isPresent()) { + try { + updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get().getTimestamp()), false); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); + } + } else { + LOG.warn("No commit in active timeline."); } } diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java index 1c578b102cfa0..878bbdee98083 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -20,7 +20,9 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -190,21 +192,21 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, b LOG.info("Last commit time synced was found:{}", lastCommitTimeSynced.orElse("null")); // Scan synced partitions - List writtenPartitionsSince; + Pair, Option> writtenPartitionsSincePair; if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { - writtenPartitionsSince = new ArrayList<>(); + writtenPartitionsSincePair = Pair.of(new ArrayList<>(), Option.empty()); } else { - writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + writtenPartitionsSincePair = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); } - LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSince.size()); + LOG.info("Scan partitions complete, partitionNum:{}", writtenPartitionsSincePair.getLeft().size()); // Sync the partitions if needed - syncPartitions(tableName, writtenPartitionsSince); + syncPartitions(tableName, writtenPartitionsSincePair.getLeft()); // Update sync commit time // whether to skip syncing commit time stored in tbl properties, since it is time consuming. if (!config.getBoolean(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC)) { - syncClient.updateLastCommitTimeSynced(tableName); + syncClient.updateLastCommitTimeSynced(tableName, writtenPartitionsSincePair.getRight()); } LOG.info("Sync complete for table:{}", tableName); } diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java index 69ccc49528fd6..95456472e3314 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.adb; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -259,14 +260,23 @@ public Option getLastCommitTimeSynced(String tableName) { } @Override - public void updateLastCommitTimeSynced(String tableName) { + public void updateLastCommitTimeSynced(String tableName, Option hoodieInstantOption) { // Set the last commit time from the TBLProperties - String lastCommitSynced = getActiveTimeline().lastInstant().get().getTimestamp(); - try { - String sql = constructUpdateTblPropertiesSql(tableName, lastCommitSynced); - executeAdbSql(sql); - } catch (Exception e) { - throw new HoodieHiveSyncException("Fail to get update last commit time synced:" + lastCommitSynced, e); + Option lastCommitSynced; + if (hoodieInstantOption.isPresent()) { + lastCommitSynced = hoodieInstantOption; + } else { + lastCommitSynced = getActiveTimeline().lastInstant(); + } + if (lastCommitSynced.isPresent()) { + try { + String sql = constructUpdateTblPropertiesSql(tableName, lastCommitSynced.get().getTimestamp()); + executeAdbSql(sql); + } catch (Exception e) { + throw new HoodieHiveSyncException("Fail to get update last commit time synced:" + hoodieInstantOption.get(), e); + } + } else { + LOG.warn("No commit in active timeline."); } } diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 4c050451c5a22..f830855af9e69 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.sync.common.HoodieSyncClient; import org.apache.hudi.sync.common.HoodieSyncException; @@ -74,8 +75,8 @@ public Option getLastCommitTimeSynced(String tableName) { } @Override - public void updateLastCommitTimeSynced(String tableName) { - updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, getActiveTimeline().lastInstant().get().getTimestamp())); + public void updateLastCommitTimeSynced(String tableName, Option hoodieInstantOption) { + updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, hoodieInstantOption.get().getTimestamp())); } @Override diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java index 567f547a817a0..9d725cc703ab4 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.datahub; +import org.apache.hudi.common.util.Option; import org.apache.hudi.sync.common.HoodieSyncTool; import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; @@ -53,7 +54,7 @@ public DataHubSyncTool(Properties props) { public void syncHoodieTable() { try (DataHubSyncClient syncClient = new DataHubSyncClient(config)) { syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), null); - syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME)); + syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME), Option.empty()); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index b763416e8f255..38325008b5462 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -20,7 +20,9 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; @@ -224,14 +226,14 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName); } LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); - List writtenPartitionsSince = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); - LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); + Pair, Option> writtenPartitionsSincePair = syncClient.getPartitionsWrittenToSince(lastCommitTimeSynced); + LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSincePair.getLeft().size()); // Sync the partitions if needed - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition); + boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSincePair.getLeft(), isDropPartition); boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { - syncClient.updateLastCommitTimeSynced(tableName); + syncClient.updateLastCommitTimeSynced(tableName, writtenPartitionsSincePair.getRight()); } LOG.info("Sync complete for " + tableName); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index d5a85adcbacc2..28dbb5dd71097 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -257,17 +257,24 @@ public void close() { } @Override - public void updateLastCommitTimeSynced(String tableName) { + public void updateLastCommitTimeSynced(String tableName, Option hoodieInstantOption) { // Set the last commit time from the TBLproperties - Option lastCommitSynced = getActiveTimeline().lastInstant().map(HoodieInstant::getTimestamp); + Option lastCommitSynced; + if (hoodieInstantOption.isPresent()) { + lastCommitSynced = hoodieInstantOption; + } else { + lastCommitSynced = getActiveTimeline().lastInstant(); + } if (lastCommitSynced.isPresent()) { try { Table table = client.getTable(databaseName, tableName); - table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get()); + table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get().getTimestamp()); client.alter_table(databaseName, tableName, table); } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e); + throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + hoodieInstantOption.get(), e); } + } else { + LOG.warn("No commit in active timeline."); } } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 170597939072a..9b5025c09376a 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; @@ -188,8 +189,8 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + Pair, Option> writtenPartitionsSincePair = hiveClient.getPartitionsWrittenToSince(Option.empty()); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSincePair.getLeft(), false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); @@ -466,10 +467,10 @@ public void testSyncIncremental(String syncMode) throws Exception { // Lets do the sync reSyncHiveTable(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); - assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + Pair, Option> writtenPartitionsSincePair = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); + assertEquals(1, writtenPartitionsSincePair.getLeft().size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSincePair.getLeft(), false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -745,10 +746,10 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2); reinitHiveSyncClient(); - List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); - assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); + Pair, Option> writtenPartitionsSincePair = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); + assertEquals(1, writtenPartitionsSincePair.getLeft().size(), "We should have one partition written after 100 commit"); List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSincePair.getLeft(), false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -775,7 +776,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { "Table partitions should match the number of partitions we wrote"); assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be updated in the TBLPROPERTIES"); - assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size()); + assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).getLeft().size()); } @ParameterizedTest diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java index 1c16dd13edaa4..4db3e7e130bf8 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.common; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.Partition; @@ -162,7 +163,7 @@ default Option getLastCommitTimeSynced(String tableName) { /** * Update the timestamp of last sync. */ - default void updateLastCommitTimeSynced(String tableName) { + default void updateLastCommitTimeSynced(String tableName, Option hoodieInstantOption) { } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 32ade18d08117..3d3651394640c 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -25,10 +25,12 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; @@ -106,20 +108,22 @@ public MessageType getStorageSchema() { } } - public List getPartitionsWrittenToSince(Option lastCommitTimeSynced) { + public Pair, Option> getPartitionsWrittenToSince(Option lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + config.getString(META_SYNC_BASE_PATH) + ",FS :" + config.getHadoopFileSystem()); HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); - return FSUtils.getAllPartitionPaths(engineContext, + return Pair.of(FSUtils.getAllPartitionPaths(engineContext, config.getString(META_SYNC_BASE_PATH), config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA), - config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); + config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)), Option.empty()); } else { + Option hoodieInstantOption = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); + HoodieTimeline instantsInRange = metaClient.getActiveTimeline().getCommitsTimeline() + .findInstantsInRange(lastCommitTimeSynced.get(), hoodieInstantOption.get().getTimestamp()); LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); - return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline() - .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE)); + return Pair.of(TimelineUtils.getPartitionsWritten(instantsInRange), hoodieInstantOption); } }