Skip to content

Commit

Permalink
[Feature] support big query log (StarRocks#13352)
Browse files Browse the repository at this point in the history
  • Loading branch information
silverbullet233 authored Nov 16, 2022
1 parent 63792e1 commit e0a5989
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 0 deletions.
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/AuditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class AuditLog {

public static final AuditLog SLOW_AUDIT = new AuditLog("audit.slow_query");
public static final AuditLog QUERY_AUDIT = new AuditLog("audit.query");
public static final AuditLog BIG_QUERY_AUDIT = new AuditLog("big_query.query");

private Logger logger;

Expand All @@ -36,6 +37,10 @@ public static AuditLog getSlowAudit() {
return SLOW_AUDIT;
}

public static AuditLog getBigQueryAudit() {
return BIG_QUERY_AUDIT;
}

public AuditLog(String auditName) {
logger = LogManager.getLogger(auditName);
}
Expand Down
41 changes: 41 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,47 @@ public class Config extends ConfigBase {
@ConfField
public static String dump_log_delete_age = "7d";

/**
* big_query_log_dir:
* This specifies FE big query log dir.
* Dump log fe.big_query.log contains all information about big query.
* The structure of each log record is very similar to the audit log.
* If the cpu cost of a query exceeds big_query_log_cpu_second_threshold,
* or scan rows exceeds big_query_log_scan_rows_threshold,
* or scan bytes exceeds big_query_log_scan_bytes_threshold,
* we will consider it as a big query.
* These thresholds are defined by the user.
* <p>
* big_query_log_roll_num:
* Maximal FE log files to be kept within an big_query_log_roll_interval.
* <p>
* big_query_log_modules:
* Informations for all big queries.
* <p>
* big_query_log_roll_interval:
* DAY: log suffix is yyyyMMdd
* HOUR: log suffix is yyyyMMddHH
* <p>
* big_query_log_delete_age:
* default is 7 days, if log's last modify time is 7 days ago, it will be deleted.
* support format:
* 7d 7 days
* 10h 10 hours
* 60m 60 mins
* 120s 120 seconds
*/
@ConfField
public static String big_query_log_dir = StarRocksFE.STARROCKS_HOME_DIR + "/log";
@ConfField
public static int big_query_log_roll_num = 10;
@ConfField
public static String[] big_query_log_modules = {"query"};
@ConfField
public static String big_query_log_roll_interval = "DAY";
@ConfField
public static String big_query_log_delete_age = "7d";


/**
* plugin_dir:
* plugin install directory
Expand Down
43 changes: 43 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Log4jConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ public class Log4jConfig extends XmlConfiguration {
" </Delete>\n" +
" </DefaultRolloverStrategy>\n" +
" </RollingFile>\n" +
" <RollingFile name=\"BigQueryFile\" fileName=\"${big_query_log_dir}/fe.big_query.log\" filePattern=\"${big_query_log_dir}/fe.big_query.log.${big_query_file_pattern}-%i\">\n" +
" <PatternLayout charset=\"UTF-8\">\n" +
" <Pattern>%d{yyyy-MM-dd HH:mm:ss,SSS} [%c{1}] %m%n</Pattern>\n" +
" </PatternLayout>\n" +
" <Policies>\n" +
" <TimeBasedTriggeringPolicy/>\n" +
" <SizeBasedTriggeringPolicy size=\"${big_query_roll_maxsize}MB\"/>\n" +
" </Policies>\n" +
" <DefaultRolloverStrategy max=\"${sys_roll_num}\" fileIndex=\"min\">\n" +
" <Delete basePath=\"${big_query_log_dir}/\" maxDepth=\"1\">\n" +
" <IfFileName glob=\"fe.big_query.log.*\" />\n" +
" <IfLastModified age=\"${big_query_log_delete_age}\" />\n" +
" </Delete>\n" +
" </DefaultRolloverStrategy>\n" +
" </RollingFile>\n" +
" </Appenders>\n" +
" <Loggers>\n" +
" <Root level=\"${sys_log_level}\">\n" +
Expand All @@ -115,6 +130,9 @@ public class Log4jConfig extends XmlConfiguration {
" <Logger name=\"dump\" level=\"ERROR\" additivity=\"false\">\n" +
" <AppenderRef ref=\"dumpFile\"/>\n" +
" </Logger>\n" +
" <Logger name=\"big_query\" level=\"ERROR\" additivity=\"false\">\n" +
" <AppenderRef ref=\"BigQueryFile\"/>\n" +
" </Logger>\n" +
" <Logger name=\"org.apache.thrift\" level=\"DEBUG\"> \n" +
" <AppenderRef ref=\"Sys\"/>\n" +
" </Logger>\n" +
Expand All @@ -136,6 +154,7 @@ public class Log4jConfig extends XmlConfiguration {
private static String[] verboseModules;
private static String[] auditModules;
private static String[] dumpModules;
private static String[] bigQueryModules;

private static void reconfig() throws IOException {
String newXmlConfTemplate = xmlConfTemplate;
Expand Down Expand Up @@ -191,6 +210,20 @@ private static void reconfig() throws IOException {
throw new IOException("dump_log_roll_interval config error: " + Config.dump_log_roll_interval);
}

// big query log config
String bigQueryLogDir = Config.big_query_log_dir;
String bigQueryLogRollPattern = "%d{yyyyMMdd}";
String bigQueryLogRollNum = String.valueOf(Config.big_query_log_roll_num);
String bigQueryLogRollMaxSize = String.valueOf(Config.log_roll_size_mb);
String bigQueryLogDeleteAge = String.valueOf(Config.big_query_log_delete_age);
if (Config.big_query_log_roll_interval.equals("HOUR")) {
bigQueryLogRollPattern = "%d{yyyyMMddHH}";
} else if (Config.big_query_log_roll_interval.equals("DAY")) {
bigQueryLogRollPattern = "%d{yyyyMMdd}";
} else {
throw new IOException("big_query_log_roll_interval config error: " + Config.big_query_log_roll_interval);
}

// verbose modules and audit log modules
StringBuilder sb = new StringBuilder();
for (String s : verboseModules) {
Expand All @@ -202,6 +235,9 @@ private static void reconfig() throws IOException {
for (String s : dumpModules) {
sb.append("<Logger name='dump." + s + "' level='INFO'/>");
}
for (String s : bigQueryModules) {
sb.append("<Logger name='big_query." + s + "' level='INFO'/>");
}

newXmlConfTemplate = newXmlConfTemplate.replaceAll("<!--REPLACED BY AUDIT AND VERBOSE MODULE NAMES-->",
sb.toString());
Expand All @@ -226,6 +262,12 @@ private static void reconfig() throws IOException {
properties.put("dump_roll_num", dumpRollNum);
properties.put("dump_log_delete_age", dumpDeleteAge);

properties.put("big_query_log_dir", bigQueryLogDir);
properties.put("big_query_file_pattern", bigQueryLogRollPattern);
properties.put("big_query_roll_maxsize", bigQueryLogRollMaxSize);
properties.put("big_query_roll_num", bigQueryLogRollNum);
properties.put("big_query_log_delete_age", bigQueryLogDeleteAge);

strSub = new StrSubstitutor(new Interpolator(properties));
newXmlConfTemplate = strSub.replace(newXmlConfTemplate);

Expand Down Expand Up @@ -269,6 +311,7 @@ public static synchronized void initLogging() throws IOException {
verboseModules = Config.sys_log_verbose_modules;
auditModules = Config.audit_log_modules;
dumpModules = Config.dump_log_modules;
bigQueryModules = Config.big_query_log_modules;
reconfig();
}

Expand Down
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public enum EventType {
public double planMemCosts = 0.0;
@AuditField(value = "PendingTimeMs")
public long pendingTimeMs = 0;
@AuditField(value = "BigQueryLogCPUSecondThreshold")
public long bigQueryLogCPUSecondThreshold = -1;
@AuditField(value = "BigQueryLogScanBytesThreshold")
public long bigQueryLogScanBytesThreshold = -1;
@AuditField(value = "BigQueryLogScanRowsThreshold")
public long bigQueryLogScanRowsThreshold = -1;

public static class AuditEventBuilder {

Expand Down Expand Up @@ -242,6 +248,21 @@ public AuditEventBuilder setPendingTimeMs(long pendingTimeMs) {
return this;
}

public AuditEventBuilder setBigQueryLogCPUSecondThreshold(long bigQueryLogCPUSecondThreshold) {
auditEvent.bigQueryLogCPUSecondThreshold = bigQueryLogCPUSecondThreshold;
return this;
}

public AuditEventBuilder setBigQueryLogScanBytesThreshold(long bigQueryLogScanBytesThreshold) {
auditEvent.bigQueryLogScanBytesThreshold = bigQueryLogScanBytesThreshold;
return this;
}

public AuditEventBuilder setBigQueryLogScanRowsThreshold(long bigQueryLogScanRowsThreshold) {
auditEvent.bigQueryLogScanRowsThreshold = bigQueryLogScanRowsThreshold;
return this;
}

public AuditEvent build() {
return this.auditEvent;
}
Expand Down
30 changes: 30 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/AuditLogBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ public void exec(AuditEvent event) {
continue;
}

// fields related to big queries are not written into audit log by default,
// they will be written into big query log.
if (af.value().equals("BigQueryLogCPUSecondThreshold") ||
af.value().equals("BigQueryLogScanBytesThreshold") ||
af.value().equals("BigQueryLogScanRowsThreshold")) {
continue;
}

if (af.value().equals("Time")) {
queryTime = (long) f.get(event);
}
Expand All @@ -89,8 +97,30 @@ public void exec(AuditEvent event) {
if (queryTime > Config.qe_slow_log_ms) {
AuditLog.getSlowAudit().log(auditLog);
}

if (isBigQuery(event)) {
sb.append("|bigQueryLogCPUSecondThreshold=").append(event.bigQueryLogCPUSecondThreshold);
sb.append("|bigQueryLogScanBytesThreshold=").append(event.bigQueryLogScanBytesThreshold);
sb.append("|bigQueryLogScanRowsThreshold=").append(event.bigQueryLogScanRowsThreshold);
String bigQueryLog = sb.toString();
AuditLog.getBigQueryAudit().log(bigQueryLog);
}
} catch (Exception e) {
LOG.debug("failed to process audit event", e);
}
}

private boolean isBigQuery(AuditEvent event) {
if (event.bigQueryLogCPUSecondThreshold >= 0 &&
event.cpuCostNs > event.bigQueryLogCPUSecondThreshold * 1000000000L) {
return true;
}
if (event.bigQueryLogScanBytesThreshold >= 0 && event.scanBytes > event.bigQueryLogScanBytesThreshold) {
return true;
}
if (event.bigQueryLogScanRowsThreshold >= 0 && event.scanRows > event.bigQueryLogScanRowsThreshold) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,14 @@ public void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStat
}
}
ctx.getAuditEventBuilder().setIsQuery(true);
if (ctx.getSessionVariable().isEnableBigQueryLog()) {
ctx.getAuditEventBuilder().setBigQueryLogCPUSecondThreshold(
ctx.getSessionVariable().getBigQueryLogCPUSecondThreshold());
ctx.getAuditEventBuilder().setBigQueryLogScanBytesThreshold(
ctx.getSessionVariable().getBigQueryLogScanBytesThreshold());
ctx.getAuditEventBuilder().setBigQueryLogScanRowsThreshold(
ctx.getSessionVariable().getBigQueryLogScanRowsThreshold());
}
} else {
ctx.getAuditEventBuilder().setIsQuery(false);
}
Expand Down
57 changes: 57 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE = "enable_materialized_view_union_rewrite";
public static final String ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE = "enable_rule_based_materialized_view_rewrite";

public static final String ENABLE_BIG_QUERY_LOG = "enable_big_query_log";
public static final String BIG_QUERY_LOG_CPU_SECOND_THRESHOLD = "big_query_log_cpu_second_threshold";
public static final String BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD = "big_query_log_scan_bytes_threshold";
public static final String BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD = "big_query_log_scan_rows_threshold";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(ENABLE_SPILLING)
Expand Down Expand Up @@ -753,6 +758,26 @@ public boolean getUseScanBlockCache() {
@VarAttr(name = ENABLE_RULE_BASED_MATERIALIZED_VIEW_REWRITE)
private boolean enableRuleBasedMaterializedViewRewrite = true;

// if enable_big_query_log = true and cpu/io cost of a query exceeds the related threshold,
// the information will be written to the big query log
@VarAttr(name = ENABLE_BIG_QUERY_LOG)
private boolean enableBigQueryLog = true;
// the value is set for testing,
// if a query needs to perform 10s for computing tasks at full load on three 16-core machines,
// we treat it as a big query, so set this value to 480(10 * 16 * 3).
// Users need to set up according to their own scenario.
@VarAttr(name = BIG_QUERY_LOG_CPU_SECOND_THRESHOLD)
private long bigQueryLogCPUSecondThreshold = 480;
// the value is set for testing, if a query needs to scan more than 10GB of data, we treat it as a big query.
// Users need to set up according to their own scenario.
@VarAttr(name = BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD)
private long bigQueryLogScanBytesThreshold = 1024L * 1024 * 1024 * 10;
// the value is set for testing, if a query need to scan more than 1 billion rows of data,
// we treat it as a big query.
// Users need to set up according to their own scenario.
@VarAttr(name = BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD)
private long bigQueryLogScanRowsThreshold = 1000000000L;

public boolean getEnablePopulateBlockCache() {
return enablePopulateBlockCache;
}
Expand Down Expand Up @@ -1421,6 +1446,38 @@ public void setEnableRuleBasedMaterializedViewRewrite(boolean enableRuleBasedMat
this.enableRuleBasedMaterializedViewRewrite = enableRuleBasedMaterializedViewRewrite;
}

public boolean isEnableBigQueryLog() {
return enableBigQueryLog;
}

public void setEnableBigQueryLog(boolean enableBigQueryLog) {
this.enableBigQueryLog = enableBigQueryLog;
}

public long getBigQueryLogCPUSecondThreshold() {
return this.bigQueryLogCPUSecondThreshold;
}

public void setBigQueryLogCpuSecondThreshold(long bigQueryLogCPUSecondThreshold) {
this.bigQueryLogCPUSecondThreshold = bigQueryLogCPUSecondThreshold;
}

public long getBigQueryLogScanBytesThreshold() {
return bigQueryLogScanBytesThreshold;
}

public void setBigQueryLogScanBytesThreshold(long bigQueryLogScanBytesThreshold) {
this.bigQueryLogScanBytesThreshold = bigQueryLogScanBytesThreshold;
}

public long getBigQueryLogScanRowsThreshold() {
return bigQueryLogScanRowsThreshold;
}

public void setBigQueryLogScanRowsThreshold(long bigQueryLogScanRowsThreshold) {
this.bigQueryLogScanRowsThreshold = bigQueryLogScanRowsThreshold;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down

0 comments on commit e0a5989

Please sign in to comment.