Skip to content

Commit

Permalink
[BugFix] Fix fe meta error when use merge condition (#13337)
Browse files Browse the repository at this point in the history
(cherry picked from commit 2f9711e)
  • Loading branch information
caneGuy authored and mergify[bot] committed Nov 15, 2022
1 parent 84f057c commit 4f63108
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.PrintableMap;
import com.starrocks.sql.ast.LoadStmt;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -74,6 +75,13 @@ public Map<String, String> getProperties() {
return properties;
}

public String getMergeConditionStr() {
if (properties.containsKey(LoadStmt.MERGE_CONDITION)) {
return properties.get(LoadStmt.MERGE_CONDITION);
}
return "";
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,12 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme
+ tableId + " not found");
}

String mergeCondition = (brokerDesc == null) ? "" : brokerDesc.getMergeConditionStr();
// Generate loading task and init the plan of task
LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
brokerFileGroups, getDeadlineMs(), loadMemLimit,
strictMode, transactionId, this, timezone, timeoutSecond,
createTimestamp, partialUpdate, mergeConditionStr, sessionVariables,
createTimestamp, partialUpdate, mergeCondition, sessionVariables,
context, TLoadJobType.BROKER, priority);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
Expand Down
21 changes: 0 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.AuthorizationInfo;
import com.starrocks.catalog.Database;
Expand Down Expand Up @@ -93,8 +91,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
public static final String UNSELECTED_ROWS = "unselected.rows";
public static final String LOADED_BYTES = "loaded.bytes";

public static final String JSON_MERGE_CONDITION_KEY = "mergeCondition";

private static final int TASK_SUBMIT_RETRY_NUM = 2;

protected long id;
Expand All @@ -118,7 +114,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
// reuse deleteFlag as partialUpdate
// @Deprecated
// protected boolean deleteFlag = false;
protected String mergeConditionStr = "";

protected long createTimestamp = -1;
protected long loadStartTimestamp = -1;
Expand Down Expand Up @@ -310,10 +305,6 @@ protected void setJobProperties(Map<String, String> properties) throws DdlExcept
partialUpdate = Boolean.valueOf(properties.get(LoadStmt.PARTIAL_UPDATE));
}

if (properties.containsKey(LoadStmt.MERGE_CONDITION)) {
mergeConditionStr = properties.get(LoadStmt.MERGE_CONDITION);
}

if (properties.containsKey(LoadStmt.LOAD_MEM_LIMIT)) {
try {
loadMemLimit = Long.parseLong(properties.get(LoadStmt.LOAD_MEM_LIMIT));
Expand Down Expand Up @@ -1015,9 +1006,6 @@ public void write(DataOutput out) throws IOException {
authorizationInfo.write(out);
}
Text.writeString(out, timezone);
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(JSON_MERGE_CONDITION_KEY, mergeConditionStr);
Text.writeString(out, jsonObject.toString());
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -1062,15 +1050,6 @@ public void readFields(DataInput in) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_61) {
timezone = Text.readString(in);
}
String mergeConditionJson = Text.readString(in);
try {
JsonObject jsonObject = JsonParser.parseString(mergeConditionJson).getAsJsonObject();
if (jsonObject.has(JSON_MERGE_CONDITION_KEY)) {
mergeConditionStr = jsonObject.getAsJsonPrimitive(JSON_MERGE_CONDITION_KEY).getAsString();
}
} catch (Exception e) {
LOG.warn("Load job last string is not json " + mergeConditionJson);
}
}

public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
Expand Down

0 comments on commit 4f63108

Please sign in to comment.