Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public LogParserAccumulator init() {

@Override
public Object result(LogParserAccumulator acc) {
if (acc.size() == 0) {
if (acc.size() == 0 && acc.logSize() == 0) {
return null;
}

Expand Down Expand Up @@ -89,7 +89,7 @@ public LogParserAccumulator add(
this.variableCountThreshold = variableCountThreshold;
this.thresholdPercentage = thresholdPercentage;
acc.evaluate(field);
if (bufferLimit > 0 && acc.size() == bufferLimit) {
if (bufferLimit > 0 && acc.logSize() == bufferLimit) {
acc.partialMerge(
maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken);
acc.clearBuffer();
Expand Down Expand Up @@ -152,6 +152,10 @@ public static class LogParserAccumulator implements Accumulator {
public Map<String, Map<String, Object>> patternGroupMap = new HashMap<>();

public int size() {
return patternGroupMap.size();
}

public int logSize() {
return logMessages.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public static Object evalAgg(
@Parameter(name = "field") String field,
@Parameter(name = "aggObject") Object aggObject,
@Parameter(name = "showNumberedToken") Boolean showNumberedToken) {
if (Strings.isBlank(field)) {
if (Strings.isBlank(field) || aggObject == null) {
return EMPTY_RESULT;
}
List<Map<String, Object>> aggResult = (List<Map<String, Object>>) aggObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,8 @@ public void testBrainLabelMode_NotShowNumberedToken() throws IOException {
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
+ " blk_-6620182933895093708",
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*>"
+ " blk_<*>"));
"<*> NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*> blk_<*>"));
}

@Test
Expand Down Expand Up @@ -268,21 +267,23 @@ public void testBrainLabelMode_ShowNumberedToken() throws IOException {
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
+ " blk_-6620182933895093708",
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_<token1>_<token2>_r_<token3>_<token4>/part<token5>"
+ " blk_<token6>",
"<token1> NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_<token2>_<token3>_r_<token4>_<token5>/part<token6>"
+ " blk_<token7>",
ImmutableMap.of(
"<token1>",
ImmutableList.of("200811092030"),
ImmutableList.of("BLOCK*"),
"<token2>",
ImmutableList.of("0002"),
ImmutableList.of("200811092030"),
"<token3>",
ImmutableList.of("000296"),
ImmutableList.of("0002"),
"<token4>",
ImmutableList.of("0"),
ImmutableList.of("000296"),
"<token5>",
ImmutableList.of("-00296."),
ImmutableList.of("0"),
"<token6>",
ImmutableList.of("-00296."),
"<token7>",
ImmutableList.of("-6620182933895093708"))));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled: true
- do:
bulk:
index: hdfs_logs
refresh: true
body:
- '{ "index": { "_id": 1 } }'
- '{ "date": "20081109", "time": "203615", "pid": 148, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864" }'
- '{ "index": { "_id": 2 } }'
- '{ "date": "20081109", "time": "204132", "pid": 26, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864" }'
- '{ "index": { "_id": 3 } }'
- '{ "date": "20081109", "time": "204925", "pid": 663, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_6996194389878584395" }'
- '{ "index": { "_id": 4 } }'
- '{ "date": "20081109", "time": "205035", "pid": 31, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_-1547954353065580372" }'


---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false


---
"Patterns with specified max_sample_count should return correct result":
- skip:
features:
- headers
- allowed_warnings
- do:
allowed_warnings:
- 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled'
headers:
Content-Type: 'application/json'
ppl:
body:
query: 'source=hdfs_logs | patterns content method=brain mode=aggregation max_sample_count=2 variable_count_threshold=3'
- match: {"total": 2}
- match: {"schema": [{"name": "patterns_field", "type": "string"}, {"name": "pattern_count", "type": "bigint"}, {"name": "sample_logs", "type": "array"}]}
- match: {"datarows": [
[
"PacketResponder failed for blk_<*>",
2,
[
"PacketResponder failed for blk_6996194389878584395",
"PacketResponder failed for blk_-1547954353065580372"
]
],
[
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <*IP*> is added to blk_<*> size <*>",
2,
[
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864",
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864"
]
]
]}

Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,7 @@ public UnresolvedPlan visitPatternsCommand(OpenSearchPPLParser.PatternsCommandCo
AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_MAX_SAMPLE_COUNT)));
Literal patternBufferLimit =
cmdOptions.getOrDefault(
"max_sample_count",
AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT)));
"buffer_limit", AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT)));
Literal showNumberedToken =
cmdOptions.getOrDefault(
"show_numbered_token",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,39 @@ public void testPatternsAggregationModeWithGroupBy_ShowNumberedToken_ForSimplePa
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testPatternsAggregationMode_SpecifyAllParameters_ForBrainMethod() {
String ppl =
"source=EMP | patterns ENAME method=BRAIN mode=aggregation max_sample_count=2"
+ " buffer_limit=1000 show_numbered_token=false variable_count_threshold=3"
+ " frequency_threshold_percentage=0.1";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))],"
+ " pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))],"
+ " sample_logs=[SAFE_CAST(ITEM($1, 'sample_logs'))])\n"
+ " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n"
+ " LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2, $3, $4, $5)])\n"
+ " LogicalProject(ENAME=[$1], $f8=[2], $f9=[1000], $f10=[false],"
+ " $f11=[0.1:DECIMAL(2, 1)], $f12=[3])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n"
+ " Uncollect\n"
+ " LogicalProject(patterns_field=[$cor0.patterns_field])\n"
+ " LogicalValues(tuples=[[{ 0 }]])\n";
verifyLogical(root, expectedLogical);

String expectedSparkSql =
"SELECT TRY_CAST(`t20`.`patterns_field`['pattern'] AS STRING) `patterns_field`,"
+ " TRY_CAST(`t20`.`patterns_field`['pattern_count'] AS BIGINT) `pattern_count`,"
+ " TRY_CAST(`t20`.`patterns_field`['sample_logs'] AS ARRAY< STRING >) `sample_logs`\n"
+ "FROM (SELECT `pattern`(`ENAME`, 2, 1000, FALSE, 0.1, 3) `patterns_field`\n"
+ "FROM `scott`.`EMP`) `$cor0`,\n"
+ "LATERAL UNNEST((SELECT `$cor0`.`patterns_field`\n"
+ "FROM (VALUES (0)) `t` (`ZERO`))) `t20` (`patterns_field`)";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testPatternsAggregationMode_NotShowNumberedToken_ForBrainMethod() {
String ppl = "source=EMP | patterns ENAME method=BRAIN mode=aggregation";
Expand Down
Loading