Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void startServer() throws IOException {
.serverPort(writeConfig.getEmbeddedTimelineServerPort())
.numThreads(writeConfig.getEmbeddedTimelineServerThreads())
.compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
.async(writeConfig.getEmbeddedTimelineServerUseAsync())
.refreshTimelineBasedOnLatestCommit(writeConfig.isRefreshTimelineServerBasedOnLatestCommit());
.async(writeConfig.getEmbeddedTimelineServerUseAsync());
// Only passing marker-related write configs to timeline server
// if timeline-server-based markers are used.
if (writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");

public static final ConfigProperty<Boolean> REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
.key("hoodie.refresh.timeline.server.based.on.latest.commit")
.defaultValue(true)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true).");

public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L)
Expand Down Expand Up @@ -1105,10 +1100,6 @@ public boolean isFailOnTimelineArchivingEnabled() {
return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
}

public boolean isRefreshTimelineServerBasedOnLatestCommit() {
return getBoolean(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT);
}

public int getMaxConsistencyChecks() {
return getInt(MAX_CONSISTENCY_CHECKS);
}
Expand Down Expand Up @@ -2514,11 +2505,6 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
return this;
}

public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) {
writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,7 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
return getConfigBuilder(schema)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withAvroSchemaValidate(true)
// The test has rollback instants on the timeline,
// these rollback instants use real time as instant time, whose instant time is always greater than
// the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761:
// The last client instant is always the rollback instant but not the normal commit.
// Always refresh the timeline when client and server have different timeline.
.withRefreshTimelineServerBasedOnLatestCommit(false);
.withAvroSchemaValidate(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String FILEID_PARAM = "fileid";
public static final String LAST_INSTANT_TS = "lastinstantts";
public static final String TIMELINE_HASH = "timelinehash";
public static final String NUM_INSTANTS = "numinstants";
public static final String REFRESH_OFF = "refreshoff";
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";

Expand Down Expand Up @@ -163,7 +162,6 @@ private <T> T executeRequest(String requestPath, Map<String, String> queryParame
// Adding mandatory parameters - Last instants affecting file-slice
timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
builder.addParameter(NUM_INSTANTS, timeline.countInstants() + "");

String url = builder.toString();
LOG.info("Sending request : (" + url + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ private boolean isLocalViewBehind(Context ctx) {
String lastKnownInstantFromClient =
ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS);
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
String numInstantsFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.NUM_INSTANTS, "-1");
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
if (LOG.isDebugEnabled()) {
Expand All @@ -135,33 +134,15 @@ private boolean isLocalViewBehind(Context ctx) {
}

String localTimelineHash = localTimeline.getTimelineHash();
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled)
if (!localTimelineHash.equals(timelineHashFromClient)
&& (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit
|| localTimelineBehind(localTimeline, lastKnownInstantFromClient, numInstantsFromClient))) {
// refresh if timeline hash mismatches
if (!localTimelineHash.equals(timelineHashFromClient)) {
return true;
}

// As a safety check, even if hash is same, ensure instant is present
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
}

private static boolean localTimelineBehind(HoodieTimeline localTimeline, String lastKnownInstantFromClient, String numInstantsFromClient) {
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
: HoodieTimeline.INVALID_INSTANT_TS;
// Why comparing the num commits ?
// Assumes there are 4 commits on the timeline:
// timestamp(action): ts_0(commit), ts_1(commit), ts_2(clean), ts_3(commit)
// when ts_1 is in INFLIGHT state, ts_2 clean action is already finished,
// after ts_1 triggers #sync, the local timeline is refreshed as [ts_0, ts_2],
// when ts_1 switches state from INFLIGHT to COMPLETED, no #sync triggers.
// at ts_3, when the fs view snapshot is requested, the ts_3 client timeline should be [ts_0, ts_1, ts_2],
// if we only compare the latest commit, the local timeline is NOT behind, but the fs view is not complete
// because ts_1 is lost.
return HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)
|| localTimeline.countInstants() < Integer.parseInt(numInstantsFromClient);
}

/**
* Syncs data-set view if local view is behind.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ public static class Config implements Serializable {
@Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
public int markerParallelism = 100;

@Parameter(names = {"--refreshTimelineBasedOnLatestCommit"}, description = "Refresh local timeline based on latest commit in addition to timeline hash value")
public boolean refreshTimelineBasedOnLatestCommit = true;

@Parameter(names = {"--help", "-h"})
public Boolean help = false;

Expand All @@ -150,7 +147,6 @@ public static class Builder {
private int markerBatchNumThreads = 20;
private long markerBatchIntervalMs = 50L;
private int markerParallelism = 100;
private boolean refreshTimelineBasedOnLatestCommit = true;

public Builder() {
}
Expand Down Expand Up @@ -200,11 +196,6 @@ public Builder compress(boolean compress) {
return this;
}

public Builder refreshTimelineBasedOnLatestCommit(boolean refreshTimelineBasedOnLatestCommit) {
this.refreshTimelineBasedOnLatestCommit = refreshTimelineBasedOnLatestCommit;
return this;
}

public Builder enableMarkerRequests(boolean enableMarkerRequests) {
this.enableMarkerRequests = enableMarkerRequests;
return this;
Expand Down Expand Up @@ -240,7 +231,6 @@ public Config build() {
config.markerBatchNumThreads = this.markerBatchNumThreads;
config.markerBatchIntervalMs = this.markerBatchIntervalMs;
config.markerParallelism = this.markerParallelism;
config.refreshTimelineBasedOnLatestCommit = this.refreshTimelineBasedOnLatestCommit;
return config;
}
}
Expand Down