diff --git a/fe/fe-core/src/main/java/com/starrocks/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/com/starrocks/analysis/BrokerDesc.java index 5bf80b8e3f5b7..84a5cb4d7c3a0 100644 --- a/fe/fe-core/src/main/java/com/starrocks/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/com/starrocks/analysis/BrokerDesc.java @@ -21,6 +21,7 @@ import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; import com.starrocks.common.util.PrintableMap; +import com.starrocks.sql.ast.LoadStmt; import java.io.DataInput; import java.io.DataOutput; @@ -74,6 +75,13 @@ public Map getProperties() { return properties; } + public String getMergeConditionStr() { + if (properties.containsKey(LoadStmt.MERGE_CONDITION)) { + return properties.get(LoadStmt.MERGE_CONDITION); + } + return ""; + } + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, name); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java index 4193c7e580b41..a2dd3501ab40b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/BrokerLoadJob.java @@ -226,11 +226,12 @@ private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachme + tableId + " not found"); } + String mergeCondition = (brokerDesc == null) ? "" : brokerDesc.getMergeConditionStr(); // Generate loading task and init the plan of task LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, brokerFileGroups, getDeadlineMs(), loadMemLimit, strictMode, transactionId, this, timezone, timeoutSecond, - createTimestamp, partialUpdate, mergeConditionStr, sessionVariables, + createTimestamp, partialUpdate, mergeCondition, sessionVariables, context, TLoadJobType.BROKER, priority); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java index 1bad51eabd4cd..035e623866e0b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadJob.java @@ -25,8 +25,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import com.google.gson.annotations.SerializedName; import com.starrocks.catalog.AuthorizationInfo; import com.starrocks.catalog.Database; @@ -93,8 +91,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public static final String UNSELECTED_ROWS = "unselected.rows"; public static final String LOADED_BYTES = "loaded.bytes"; - public static final String JSON_MERGE_CONDITION_KEY = "mergeCondition"; - private static final int TASK_SUBMIT_RETRY_NUM = 2; protected long id; @@ -118,7 +114,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements // reuse deleteFlag as partialUpdate // @Deprecated // protected boolean deleteFlag = false; - protected String mergeConditionStr = ""; protected long createTimestamp = -1; protected long loadStartTimestamp = -1; @@ -310,10 +305,6 @@ protected void setJobProperties(Map properties) throws DdlExcept partialUpdate = Boolean.valueOf(properties.get(LoadStmt.PARTIAL_UPDATE)); } - if (properties.containsKey(LoadStmt.MERGE_CONDITION)) { - mergeConditionStr = properties.get(LoadStmt.MERGE_CONDITION); - } - if (properties.containsKey(LoadStmt.LOAD_MEM_LIMIT)) { try { loadMemLimit = Long.parseLong(properties.get(LoadStmt.LOAD_MEM_LIMIT)); @@ -1015,9 +1006,6 @@ public void write(DataOutput out) throws IOException { authorizationInfo.write(out); } Text.writeString(out, timezone); - JsonObject jsonObject = new JsonObject(); - jsonObject.addProperty(JSON_MERGE_CONDITION_KEY, mergeConditionStr); - Text.writeString(out, jsonObject.toString()); } public void readFields(DataInput in) throws IOException { @@ -1062,15 +1050,6 @@ public void readFields(DataInput in) throws IOException { if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_61) { timezone = Text.readString(in); } - String mergeConditionJson = Text.readString(in); - try { - JsonObject jsonObject = JsonParser.parseString(mergeConditionJson).getAsJsonObject(); - if (jsonObject.has(JSON_MERGE_CONDITION_KEY)) { - mergeConditionStr = jsonObject.getAsJsonPrimitive(JSON_MERGE_CONDITION_KEY).getAsString(); - } - } catch (Exception e) { - LOG.warn("Load job last string is not json " + mergeConditionJson); - } } public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {