diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index a506c8030a557..056822898b45f 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -269,7 +269,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m } HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream, - (Function> & Serializable) metaClient.getActiveTimeline()::getInstantDetails); + (Function> & Serializable) metaClient.getActiveTimeline()::getInstantDetails, timeline.getLastUpdateTime()); return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0])); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 138e40a90c6e0..bda113eeef649 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -491,7 +491,7 @@ private boolean deleteAllInstantsOlderOrEqualsInAuxMetaFolder(HoodieInstant thre metaClient.scanHoodieInstantsFromFileSystem( new Path(metaClient.getMetaAuxiliaryPath()), HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, - false); + false).getKey(); } catch (FileNotFoundException e) { /* * On some FSs deletion of all files in the directory can auto remove the directory itself. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index b9a3673960fb3..d71a831e6f4d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; @@ -55,6 +56,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.OptionalLong; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; @@ -489,8 +491,8 @@ public String getCommitActionType() { * @return List of Hoodie Instants generated * @throws IOException in case of failure */ - public List scanHoodieInstantsFromFileSystem(Set includedExtensions, - boolean applyLayoutVersionFilters) throws IOException { + public Pair, Long> scanHoodieInstantsFromFileSystem(Set includedExtensions, + boolean applyLayoutVersionFilters) throws IOException { return scanHoodieInstantsFromFileSystem(new Path(metaPath), includedExtensions, applyLayoutVersionFilters); } @@ -504,20 +506,22 @@ public List scanHoodieInstantsFromFileSystem(Set included * @return List of Hoodie Instants generated * @throws IOException in case of failure */ - public List scanHoodieInstantsFromFileSystem(Path timelinePath, Set includedExtensions, + public Pair, Long> scanHoodieInstantsFromFileSystem(Path timelinePath, Set includedExtensions, boolean applyLayoutVersionFilters) throws IOException { - Stream instantStream = Arrays.stream( - HoodieTableMetaClient - .scanFiles(getFs(), timelinePath, path -> { - // Include only the meta files with extensions that needs to be included - String extension = HoodieInstant.getTimelineFileExtension(path.getName()); - return includedExtensions.contains(extension); - })).map(HoodieInstant::new); + + FileStatus[] instantsFileStatus = HoodieTableMetaClient + .scanFiles(getFs(), timelinePath, path -> { + // Include only the meta files with extensions that needs to be included + String extension = HoodieInstant.getTimelineFileExtension(path.getName()); + return includedExtensions.contains(extension); + }); + OptionalLong lastUpdatedTime = Arrays.stream(instantsFileStatus).mapToLong(FileStatus::getModificationTime).max(); + Stream instantStream = Arrays.stream(instantsFileStatus).map(HoodieInstant::new); if (applyLayoutVersionFilters) { instantStream = TimelineLayout.getLayout(getTimelineLayoutVersion()).filterHoodieInstants(instantStream); } - return instantStream.sorted().collect(Collectors.toList()); + return Pair.of(instantStream.sorted().collect(Collectors.toList()), lastUpdatedTime.orElse(0L)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 1fa3845bf38f7..63d86ac6acb7d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -118,7 +118,8 @@ protected HoodieActiveTimeline(HoodieTableMetaClient metaClient, Set inc // Filter all the filter in the metapath and include only the extensions passed and // convert them into HoodieInstant try { - this.setInstants(metaClient.scanHoodieInstantsFromFileSystem(includedExtensions, applyLayoutFilters)); + Pair, Long> instantsResult = metaClient.scanHoodieInstantsFromFileSystem(includedExtensions, applyLayoutFilters); + this.setInstants(instantsResult.getKey(), instantsResult.getValue()); } catch (IOException e) { throw new HoodieIOException("Failed to scan metadata", e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 5ad3fa7a9f215..2ad397aba043e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.generic.GenericRecord; @@ -90,7 +91,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { */ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { this.metaClient = metaClient; - setInstants(this.loadInstants(false)); + Pair, Long> instantsResult = this.loadInstants(false); + setInstants(instantsResult.getKey(), instantsResult.getValue()); // multiple casts will make this lambda serializable - // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16 this.details = (Function> & Serializable) this::getInstantDetails; @@ -196,15 +198,15 @@ private Option getMetadataKey(String action) { } } - private List loadInstants(boolean loadInstantDetails) { + private Pair, Long> loadInstants(boolean loadInstantDetails) { return loadInstants(null, loadInstantDetails); } - private List loadInstants(String startTs, String endTs) { + private Pair, Long> loadInstants(String startTs, String endTs) { return loadInstants(new TimeRangeFilter(startTs, endTs), true); } - private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { + private Pair, Long> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { return loadInstants(filter, loadInstantDetails, record -> true); } @@ -214,18 +216,21 @@ private List loadInstants(TimeRangeFilter filter, boolean loadIns * If filter is specified, only the filtered instants are loaded * If commitsFilter is specified, only the filtered records are loaded */ - private List loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, + private Pair, Long> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function commitsFilter) { try { // List all files FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + long lastUpdateTime = 0L; // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); Set instantsInRange = new HashSet<>(); for (FileStatus fs : fsStatuses) { + // update lastUpdateTime + lastUpdateTime = Math.max(lastUpdateTime, fs.getModificationTime()); // Read the archived file try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { @@ -279,7 +284,7 @@ private List loadInstants(TimeRangeFilter filter, boolean loadIns ArrayList result = new ArrayList<>(instantsInRange); Collections.sort(result); - return result; + return Pair.of(result, lastUpdateTime); } catch (IOException e) { throw new HoodieIOException( "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e); @@ -331,6 +336,6 @@ public HoodieDefaultTimeline getWriteTimeline() { Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); return new HoodieDefaultTimeline(getInstants().filter(i -> readCommits.keySet().contains(i.getTimestamp())) - .filter(s -> validActions.contains(s.getAction())), details); + .filter(s -> validActions.contains(s.getAction())), details, getLastUpdateTime()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 2cf111e91c812..13597d1588261 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -51,14 +51,20 @@ public class HoodieDefaultTimeline implements HoodieTimeline { protected transient Function> details; private List instants; private String timelineHash; + private long lastUpdatedTime; - public HoodieDefaultTimeline(Stream instants, Function> details) { + public HoodieDefaultTimeline(Stream instants, Function> details, long lastUpdatedTime) { this.details = details; - setInstants(instants.collect(Collectors.toList())); + setInstants(instants.collect(Collectors.toList()), lastUpdatedTime); } public void setInstants(List instants) { + setInstants(instants, System.currentTimeMillis()); + } + + public void setInstants(List instants, long lastUpdatedTime) { this.instants = instants; + this.lastUpdatedTime = lastUpdatedTime; final MessageDigest md; try { md = MessageDigest.getInstance(HASHING_ALGORITHM); @@ -79,106 +85,106 @@ public HoodieDefaultTimeline() {} @Override public HoodieTimeline filterInflights() { - return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details); + return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isInflight), details, lastUpdatedTime); } @Override public HoodieTimeline filterInflightsAndRequested() { return new HoodieDefaultTimeline( instants.stream().filter(i -> i.getState().equals(State.REQUESTED) || i.getState().equals(State.INFLIGHT)), - details); + details, lastUpdatedTime); } @Override public HoodieTimeline filterPendingExcludingCompaction() { return new HoodieDefaultTimeline(instants.stream().filter(instant -> (!instant.isCompleted()) - && (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))), details); + && (!instant.getAction().equals(HoodieTimeline.COMPACTION_ACTION))), details, lastUpdatedTime); } @Override public HoodieTimeline filterCompletedInstants() { - return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details); + return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details, lastUpdatedTime); } @Override public HoodieTimeline filterCompletedAndCompactionInstants() { return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted() - || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details); + || s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)), details, lastUpdatedTime); } @Override public HoodieDefaultTimeline getWriteTimeline() { Set validActions = CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION); - return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details); + return new HoodieDefaultTimeline(instants.stream().filter(s -> validActions.contains(s.getAction())), details, lastUpdatedTime); } @Override public HoodieTimeline getCompletedReplaceTimeline() { return new HoodieDefaultTimeline( - instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted), details); + instants.stream().filter(s -> s.getAction().equals(REPLACE_COMMIT_ACTION)).filter(HoodieInstant::isCompleted), details, lastUpdatedTime); } @Override public HoodieTimeline filterPendingReplaceTimeline() { return new HoodieDefaultTimeline(instants.stream().filter( - s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details); + s -> s.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) && !s.isCompleted()), details, lastUpdatedTime); } @Override public HoodieTimeline filterPendingRollbackTimeline() { return new HoodieDefaultTimeline(instants.stream().filter( - s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details); + s -> s.getAction().equals(HoodieTimeline.ROLLBACK_ACTION) && !s.isCompleted()), details, lastUpdatedTime); } @Override public HoodieTimeline filterPendingCompactionTimeline() { return new HoodieDefaultTimeline( - instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) && !s.isCompleted()), details); + instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) && !s.isCompleted()), details, lastUpdatedTime); } @Override public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) { return new HoodieDefaultTimeline( - instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details); + instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details, lastUpdatedTime); } @Override public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)).limit(numCommits), - details); + details, lastUpdatedTime); } @Override public HoodieTimeline findInstantsAfter(String instantTime) { return new HoodieDefaultTimeline(instants.stream() - .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details); + .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, instantTime)), details, lastUpdatedTime); } @Override public HoodieDefaultTimeline findInstantsAfterOrEquals(String commitTime, int numCommits) { return new HoodieDefaultTimeline(instants.stream() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, commitTime)) - .limit(numCommits), details); + .limit(numCommits), details, lastUpdatedTime); } @Override public HoodieDefaultTimeline findInstantsBefore(String instantTime) { return new HoodieDefaultTimeline(instants.stream() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN, instantTime)), - details); + details, lastUpdatedTime); } @Override public HoodieDefaultTimeline findInstantsBeforeOrEquals(String instantTime) { return new HoodieDefaultTimeline(instants.stream() .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS, instantTime)), - details); + details, lastUpdatedTime); } @Override public HoodieTimeline filter(Predicate filter) { - return new HoodieDefaultTimeline(instants.stream().filter(filter), details); + return new HoodieDefaultTimeline(instants.stream().filter(filter), details, lastUpdatedTime); } /** @@ -209,7 +215,7 @@ public HoodieTimeline getCommitTimeline() { */ public HoodieTimeline getDeltaCommitTimeline() { return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } /** @@ -219,7 +225,7 @@ public HoodieTimeline getDeltaCommitTimeline() { */ public HoodieTimeline getTimelineOfActions(Set actions) { return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } /** @@ -227,7 +233,7 @@ public HoodieTimeline getTimelineOfActions(Set actions) { */ public HoodieTimeline getCleanerTimeline() { return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } /** @@ -235,7 +241,7 @@ public HoodieTimeline getCleanerTimeline() { */ public HoodieTimeline getRollbackTimeline() { return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } /** @@ -250,7 +256,7 @@ public HoodieTimeline getRollbackAndRestoreTimeline() { */ public HoodieTimeline getSavePointTimeline() { return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } /** @@ -258,7 +264,7 @@ public HoodieTimeline getSavePointTimeline() { */ public HoodieTimeline getRestoreTimeline() { return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION), - (Function> & Serializable) this::getInstantDetails); + (Function> & Serializable) this::getInstantDetails, lastUpdatedTime); } protected Stream filterInstantsByAction(String action) { @@ -351,6 +357,11 @@ public Option getInstantDetails(HoodieInstant instant) { return details.apply(instant); } + @Override + public long getLastUpdateTime() { + return lastUpdatedTime; + } + @Override public boolean isEmpty(HoodieInstant instant) { return getInstantDetails(instant).get().length == 0; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 25b9c2ec6f2e4..f2cf863916c39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -282,6 +282,8 @@ public interface HoodieTimeline extends Serializable { */ Option getInstantDetails(HoodieInstant instant); + long getLastUpdateTime(); + boolean isEmpty(HoodieInstant instant); /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/TimelineDTO.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/TimelineDTO.java index b25b8e4bc51da..2758aa7814f70 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/TimelineDTO.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/TimelineDTO.java @@ -37,15 +37,19 @@ public class TimelineDTO { @JsonProperty("instants") List instants; + @JsonProperty("lastUpdatedTime") + Long lastUpdatedTime; + public static TimelineDTO fromTimeline(HoodieTimeline timeline) { TimelineDTO dto = new TimelineDTO(); dto.instants = timeline.getInstants().map(InstantDTO::fromInstant).collect(Collectors.toList()); + dto.lastUpdatedTime = timeline.getLastUpdateTime(); return dto; } public static HoodieTimeline toTimeline(TimelineDTO dto, HoodieTableMetaClient metaClient) { // TODO: For Now, we will assume, only active-timeline will be transferred. return new HoodieDefaultTimeline(dto.instants.stream().map(InstantDTO::toInstant), - metaClient.getActiveTimeline()::getInstantDetails); + metaClient.getActiveTimeline()::getInstantDetails, dto.lastUpdatedTime); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 099b79cbba0ab..132e72332ca59 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -118,7 +118,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String TIMELINE_HASH = "timelinehash"; public static final String REFRESH_OFF = "refreshoff"; public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction"; - + public static final String TIMELINE_LAST_UPDATED_TIME = "timelinelastupdatedtime"; private static final Logger LOG = LogManager.getLogger(RemoteHoodieTableFileSystemView.class); @@ -162,6 +162,7 @@ private T executeRequest(String requestPath, Map queryParame // Adding mandatory parameters - Last instants affecting file-slice timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp())); builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash()); + builder.addParameter(TIMELINE_LAST_UPDATED_TIME, Long.toString(timeline.getLastUpdateTime())); String url = builder.toString(); LOG.info("Sending request : (" + url + ")"); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 5fd59bc932872..845d0ca5d8f1d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -748,7 +748,7 @@ public static HoodieTableFileSystemView getFileSystemView(HoodieTableMetaClient if (timeline.empty()) { final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime()); - timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails); + timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails, metaClient.getActiveTimeline().getLastUpdateTime()); } return new HoodieTableFileSystemView(metaClient, timeline); } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 4744fbb6b4e1c..bc248416a7c4b 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -121,11 +121,15 @@ private boolean isLocalViewBehind(Context ctx) { String lastKnownInstantFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS); String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); + long timelineLastUpdatedTime = + Long.parseLong(ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_LAST_UPDATED_TIME, + Long.toString(Long.MAX_VALUE))); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants(); if (LOG.isDebugEnabled()) { LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient - + "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList())); + + ", LastUpdatedTime=" + timelineLastUpdatedTime + "], localTimeline=" + + localTimeline.getInstants().collect(Collectors.toList())); } if ((localTimeline.getInstants().count() == 0) @@ -134,7 +138,8 @@ private boolean isLocalViewBehind(Context ctx) { } String localTimelineHash = localTimeline.getTimelineHash(); - if (!localTimelineHash.equals(timelineHashFromClient)) { + // refresh if timeline hash mismatches and if local's local updated time is < client's last updated time for timeline + if (!localTimelineHash.equals(timelineHashFromClient) && localTimeline.getLastUpdateTime() < timelineLastUpdatedTime) { return true; } diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index f9a6172b5ec39..021643e802311 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -42,9 +42,6 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst private static final Logger LOG = LogManager.getLogger(TestRemoteHoodieTableFileSystemView.class); - private TimelineService server; - private RemoteHoodieTableFileSystemView view; - protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { FileSystemViewStorageConfig sConf = FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); @@ -52,6 +49,7 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build(); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + TimelineService server; try { server = new TimelineService(localEngineContext, new Configuration(), TimelineService.Config.builder().serverPort(0).build(), FileSystem.get(new Configuration()), @@ -61,7 +59,6 @@ protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { throw new RuntimeException(ex); } LOG.info("Connecting to Timeline Server :" + server.getServerPort()); - view = new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); - return view; + return new RemoteHoodieTableFileSystemView("localhost", server.getServerPort(), metaClient); } }