Skip to content

Commit

Permalink
[BugFix] Validate query_timeout at analyze stage (#13653)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyehcf authored and wanpengfei-git committed Nov 21, 2022
1 parent 741e0ab commit d2634e6
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public LeaderOpExecutor(StatementBase parsedStmt, OriginStatement originStmt,
// set thriftTimeoutMs to query_timeout + thrift_rpc_timeout_ms
// so that we can return an execution timeout instead of a network timeout
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000 + Config.thrift_rpc_timeout_ms;
if (this.thriftTimeoutMs < 0) {
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
}
this.parsedStmt = parsedStmt;
}

Expand Down
29 changes: 12 additions & 17 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String SQL_SAFE_UPDATES = "sql_safe_updates";
public static final String NET_BUFFER_LENGTH = "net_buffer_length";
public static final String CODEGEN_LEVEL = "codegen_level";
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
public static final String BATCH_SIZE = "batch_size";
public static final String CHUNK_SIZE = "chunk_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
Expand Down Expand Up @@ -292,7 +290,6 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String ENABLE_SCAN_BLOCK_CACHE = "enable_scan_block_cache";
public static final String ENABLE_POPULATE_BLOCK_CACHE = "enable_populate_block_cache";


public static final String ENABLE_QUERY_CACHE = "enable_query_cache";
public static final String QUERY_CACHE_FORCE_POPULATE = "query_cache_force_populate";
public static final String QUERY_CACHE_ENTRY_MAX_BYTES = "query_cache_entry_max_bytes";
Expand All @@ -302,8 +299,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String NESTED_MV_REWRITE_MAX_LEVEL = "nested_mv_rewrite_max_level";
public static final String ENABLE_MATERIALIZED_VIEW_REWRITE = "enable_materialized_view_rewrite";
public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE = "enable_materialized_view_union_rewrite";
public static final String ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE = "enable_rule_based_materialized_view_rewrite";
public static final String ENABLE_COST_BASED_MATERIALIZED_VIEW_REWRITE = "enable_cost_based_materialized_view_rewrite";
public static final String ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE =
"enable_rule_based_materialized_view_rewrite";
public static final String ENABLE_COST_BASED_MATERIALIZED_VIEW_REWRITE =
"enable_cost_based_materialized_view_rewrite";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
Expand All @@ -322,6 +321,12 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
.add("prefer_join_method")
.add("rewrite_count_distinct_to_bitmap_hll").build();

// Limitations
// mem limit can't smaller than bufferpool's default page size
public static final int MIN_EXEC_MEM_LIMIT = 2097152;
// query timeout cannot greater than one month
public static final int MAX_QUERY_TIMEOUT = 259200;

@VariableMgr.VarAttr(name = ENABLE_PIPELINE, alias = ENABLE_PIPELINE_ENGINE, show = ENABLE_PIPELINE_ENGINE)
private boolean enablePipelineEngine = true;

Expand Down Expand Up @@ -717,7 +722,7 @@ public boolean isEnableSortAggregate() {
public void setEnableSortAggregate(boolean enableSortAggregate) {
this.enableSortAggregate = enableSortAggregate;
}

@VariableMgr.VarAttr(name = ENABLE_SCAN_BLOCK_CACHE)
private boolean useScanBlockCache = false;

Expand Down Expand Up @@ -860,16 +865,10 @@ public void setSqlMode(long sqlMode) {
}

public long getSqlSelectLimit() {
if (sqlSelectLimit < 0) {
return DEFAULT_SELECT_LIMIT;
}
return sqlSelectLimit;
}

public void setSqlSelectLimit(long limit) {
if (limit < 0) {
return;
}
this.sqlSelectLimit = limit;
}

Expand All @@ -890,11 +889,7 @@ public void setInnodbReadOnly(boolean innodbReadOnly) {
}

public void setMaxExecMemByte(long maxExecMemByte) {
if (maxExecMemByte < MIN_EXEC_MEM_LIMIT) {
this.maxExecMemByte = MIN_EXEC_MEM_LIMIT;
} else {
this.maxExecMemByte = maxExecMemByte;
}
this.maxExecMemByte = maxExecMemByte;
}

public void setLoadMemLimit(long loadMemLimit) {
Expand Down
27 changes: 20 additions & 7 deletions fe/fe-core/src/main/java/com/starrocks/sql/ast/SetVar.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ public void analyze() {

// Check variable load_mem_limit value is valid
if (getVariable().equalsIgnoreCase(SessionVariable.LOAD_MEM_LIMIT)) {
checkNonNegativeLongVariable(SessionVariable.LOAD_MEM_LIMIT);
checkRangeLongVariable(SessionVariable.LOAD_MEM_LIMIT, 0L, null);
}

if (getVariable().equalsIgnoreCase(SessionVariable.QUERY_MEM_LIMIT)) {
checkNonNegativeLongVariable(SessionVariable.QUERY_MEM_LIMIT);
checkRangeLongVariable(SessionVariable.QUERY_MEM_LIMIT, 0L, null);
}

try {
Expand All @@ -147,6 +147,7 @@ public void analyze() {
}

if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
checkRangeLongVariable(SessionVariable.EXEC_MEM_LIMIT, (long) SessionVariable.MIN_EXEC_MEM_LIMIT, null);
this.expression = new StringLiteral(
Long.toString(ParseUtil.analyzeDataVolumn(getResolvedExpression().getStringValue())));
this.resolvedExpression = (LiteralExpr) this.expression;
Expand All @@ -156,13 +157,22 @@ public void analyze() {
}

if (getVariable().equalsIgnoreCase(SessionVariable.SQL_SELECT_LIMIT)) {
checkNonNegativeLongVariable(SessionVariable.SQL_SELECT_LIMIT);
checkRangeLongVariable(SessionVariable.SQL_SELECT_LIMIT, 0L, null);
}

if (getVariable().equalsIgnoreCase(SessionVariable.QUERY_TIMEOUT)) {
checkRangeLongVariable(SessionVariable.QUERY_TIMEOUT, 1L, (long) SessionVariable.MAX_QUERY_TIMEOUT);
}

if (getVariable().equalsIgnoreCase(SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT)) {
checkRangeLongVariable(SessionVariable.NEW_PLANNER_OPTIMIZER_TIMEOUT, 1L, null);
}

if (getVariable().equalsIgnoreCase(SessionVariable.RESOURCE_GROUP)) {
String wgName = getResolvedExpression().getStringValue();
if (!StringUtils.isEmpty(wgName)) {
ResourceGroup wg = GlobalStateMgr.getCurrentState().getResourceGroupMgr().chooseResourceGroupByName(wgName);
ResourceGroup wg =
GlobalStateMgr.getCurrentState().getResourceGroupMgr().chooseResourceGroupByName(wgName);
if (wg == null) {
throw new SemanticException("resource group not exists: " + wgName);
}
Expand All @@ -174,12 +184,15 @@ public void analyze() {
}
}

private void checkNonNegativeLongVariable(String field) {
private void checkRangeLongVariable(String field, Long min, Long max) {
String value = getResolvedExpression().getStringValue();
try {
long num = Long.parseLong(value);
if (num < 0) {
throw new SemanticException(field + " must be equal or greater than 0.");
if (min != null && num < min) {
throw new SemanticException(String.format("%s must be equal or greater than %d.", field, min));
}
if (max != null && num > max) {
throw new SemanticException(String.format("%s must be equal or smaller than %d.", field, max));
}
} catch (NumberFormatException ex) {
throw new SemanticException(field + " is not a number");
Expand Down
16 changes: 8 additions & 8 deletions fe/fe-core/src/test/java/com/starrocks/qe/VariableMgrTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ public void testNormal() throws IllegalAccessException, NoSuchFieldException, Us
}

// Set global variable
SetVar setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(1234L));
SetVar setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(12999934L));
setVar.analyze();
VariableMgr.setVar(var, setVar, false);
Assert.assertEquals(1234L, var.getMaxExecMemByte());
Assert.assertEquals(12999934L, var.getMaxExecMemByte());
var = VariableMgr.newSessionVariable();
Assert.assertEquals(1234L, var.getMaxExecMemByte());
Assert.assertEquals(12999934L, var.getMaxExecMemByte());

SetVar setVar2 = new SetVar(SetType.GLOBAL, "parallel_fragment_exec_instance_num", new IntLiteral(5L));
setVar2.analyze();
Expand All @@ -136,18 +136,18 @@ public void testNormal() throws IllegalAccessException, NoSuchFieldException, Us
Assert.assertEquals("CST", var.getTimeZone());

// Set session variable
setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(1234L));
setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(12999934L));
setVar.analyze();
VariableMgr.setVar(var, setVar, false);
Assert.assertEquals(1234L, var.getMaxExecMemByte());
Assert.assertEquals(12999934L, var.getMaxExecMemByte());

// onlySessionVar
setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(4321L));
setVar = new SetVar(SetType.GLOBAL, "exec_mem_limit", new IntLiteral(12999935L));
setVar.analyze();
VariableMgr.setVar(var, setVar, true);
Assert.assertEquals(4321L, var.getMaxExecMemByte());
Assert.assertEquals(12999935L, var.getMaxExecMemByte());
var = VariableMgr.newSessionVariable();
Assert.assertEquals(1234L, var.getMaxExecMemByte());
Assert.assertEquals(12999934L, var.getMaxExecMemByte());

setVar3 = new SetVar(SetType.SESSION, "time_zone", new StringLiteral("Asia/Jakarta"));
setVar3.analyze();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void testSqlSelectLimitSession() throws Exception {
Assert.assertTrue(plan.contains("limit: 8888"));
connectContext.getSessionVariable().setSqlSelectLimit(SessionVariable.DEFAULT_SELECT_LIMIT);

connectContext.getSessionVariable().setSqlSelectLimit(-100);
connectContext.getSessionVariable().setSqlSelectLimit(0);
sql = "select * from test_all_type";
plan = getFragmentPlan(sql);
Assert.assertFalse(plan.contains("limit"));
Expand Down

0 comments on commit d2634e6

Please sign in to comment.