diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 099b79cbba0ab..d51b27d7a14f7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -116,6 +116,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String FILEID_PARAM = "fileid"; public static final String LAST_INSTANT_TS = "lastinstantts"; public static final String TIMELINE_HASH = "timelinehash"; + public static final String NUM_INSTANTS = "numinstants"; public static final String REFRESH_OFF = "refreshoff"; public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction"; @@ -162,6 +163,7 @@ private T executeRequest(String requestPath, Map queryParame // Adding mandatory parameters - Last instants affecting file-slice timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp())); builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash()); + builder.addParameter(NUM_INSTANTS, timeline.countInstants() + ""); String url = builder.toString(); LOG.info("Sending request : (" + url + ")"); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 1d3bb583a0861..08dadae74d252 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -121,10 +121,9 @@ private boolean isLocalViewBehind(Context ctx) { String lastKnownInstantFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS); String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); + String numInstantsFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.NUM_INSTANTS, "-1"); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants(); - String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp() - : HoodieTimeline.INVALID_INSTANT_TS; if (LOG.isDebugEnabled()) { LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient + "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList())); @@ -138,7 +137,8 @@ private boolean isLocalViewBehind(Context ctx) { String localTimelineHash = localTimeline.getTimelineHash(); // refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled) if (!localTimelineHash.equals(timelineHashFromClient) - && (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit || HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient))) { + && (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit + || localTimelineBehind(localTimeline, lastKnownInstantFromClient, numInstantsFromClient))) { return true; } @@ -146,6 +146,22 @@ private boolean isLocalViewBehind(Context ctx) { return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient); } + private static boolean localTimelineBehind(HoodieTimeline localTimeline, String lastKnownInstantFromClient, String numInstantsFromClient) { + String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp() + : HoodieTimeline.INVALID_INSTANT_TS; + // Why comparing the num commits ? + // Assumes there are 4 commits on the timeline: + // timestamp(action): ts_0(commit), ts_1(commit), ts_2(clean), ts_3(commit) + // when ts_1 is in INFLIGHT state, ts_2 clean action is already finished, + // after ts_1 triggers #sync, the local timeline is refreshed as [ts_0, ts_2], + // when ts_1 switches state from INFLIGHT to COMPLETED, no #sync triggers. + // at ts_3, when the fs view snapshot is requested, the ts_3 client timeline should be [ts_0, ts_1, ts_2], + // if we only compare the latest commit, the local timeline is NOT behind, but the fs view is not complete + // because ts_1 is lost. + return HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient) + || localTimeline.countInstants() < Integer.parseInt(numInstantsFromClient); + } + /** * Syncs data-set view if local view is behind. */