diff --git a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java index b60d53f145f..f93a0e7c49d 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java +++ b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java @@ -36,7 +36,7 @@ public LogParserAccumulator init() { @Override public Object result(LogParserAccumulator acc) { - if (acc.size() == 0) { + if (acc.size() == 0 && acc.logSize() == 0) { return null; } @@ -89,7 +89,7 @@ public LogParserAccumulator add( this.variableCountThreshold = variableCountThreshold; this.thresholdPercentage = thresholdPercentage; acc.evaluate(field); - if (bufferLimit > 0 && acc.size() == bufferLimit) { + if (bufferLimit > 0 && acc.logSize() == bufferLimit) { acc.partialMerge( maxSampleCount, variableCountThreshold, thresholdPercentage, showNumberedToken); acc.clearBuffer(); @@ -152,6 +152,10 @@ public static class LogParserAccumulator implements Accumulator { public Map> patternGroupMap = new HashMap<>(); public int size() { + return patternGroupMap.size(); + } + + public int logSize() { return logMessages.size(); } diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java b/core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java index 878186cd3b4..e4f7f1f9d1c 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PatternParserFunctionImpl.java @@ -105,7 +105,7 @@ public static Object evalAgg( @Parameter(name = "field") String field, @Parameter(name = "aggObject") Object aggObject, @Parameter(name = "showNumberedToken") Boolean showNumberedToken) { - if (Strings.isBlank(field)) { + if (Strings.isBlank(field) || aggObject == null) { return EMPTY_RESULT; } List> aggResult = (List>) aggObject; diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java index cefb46e4b53..46df914e611 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java @@ -230,9 +230,8 @@ public void testBrainLabelMode_NotShowNumberedToken() throws IOException { "BLOCK* NameSystem.allocateBlock:" + " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296." + " blk_-6620182933895093708", - "BLOCK* NameSystem.allocateBlock:" - + " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*>" - + " blk_<*>")); + "<*> NameSystem.allocateBlock:" + + " /user/root/sortrand/_temporary/_task_<*>_<*>_r_<*>_<*>/part<*> blk_<*>")); } @Test @@ -268,21 +267,23 @@ public void testBrainLabelMode_ShowNumberedToken() throws IOException { "BLOCK* NameSystem.allocateBlock:" + " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296." + " blk_-6620182933895093708", - "BLOCK* NameSystem.allocateBlock:" - + " /user/root/sortrand/_temporary/_task___r__/part" - + " blk_", + " NameSystem.allocateBlock:" + + " /user/root/sortrand/_temporary/_task___r__/part" + + " blk_", ImmutableMap.of( "", - ImmutableList.of("200811092030"), + ImmutableList.of("BLOCK*"), "", - ImmutableList.of("0002"), + ImmutableList.of("200811092030"), "", - ImmutableList.of("000296"), + ImmutableList.of("0002"), "", - ImmutableList.of("0"), + ImmutableList.of("000296"), "", - ImmutableList.of("-00296."), + ImmutableList.of("0"), "", + ImmutableList.of("-00296."), + "", ImmutableList.of("-6620182933895093708")))); } diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4866.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4866.yml new file mode 100644 index 00000000000..e2ae4c86803 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4866.yml @@ -0,0 +1,65 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled: true + - do: + bulk: + index: hdfs_logs + refresh: true + body: + - '{ "index": { "_id": 1 } }' + - '{ "date": "20081109", "time": "203615", "pid": 148, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864" }' + - '{ "index": { "_id": 2 } }' + - '{ "date": "20081109", "time": "204132", "pid": 26, "level": "INFO", "component": "dfs.FSNamesystem", "content": "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864" }' + - '{ "index": { "_id": 3 } }' + - '{ "date": "20081109", "time": "204925", "pid": 663, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_6996194389878584395" }' + - '{ "index": { "_id": 4 } }' + - '{ "date": "20081109", "time": "205035", "pid": 31, "level": "WARN", "component": "dfs.DataNode$PacketResponder", "content": "PacketResponder failed for blk_-1547954353065580372" }' + + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + + +--- +"Patterns with specified max_sample_count should return correct result": + - skip: + features: + - headers + - allowed_warnings + - do: + allowed_warnings: + - 'Loading the fielddata on the _id field is deprecated and will be removed in future versions. If you require sorting or aggregating on this field you should also include the id in the body of your documents, and map this field as a keyword field that has [doc_values] enabled' + headers: + Content-Type: 'application/json' + ppl: + body: + query: 'source=hdfs_logs | patterns content method=brain mode=aggregation max_sample_count=2 variable_count_threshold=3' + - match: {"total": 2} + - match: {"schema": [{"name": "patterns_field", "type": "string"}, {"name": "pattern_count", "type": "bigint"}, {"name": "sample_logs", "type": "array"}]} + - match: {"datarows": [ + [ + "PacketResponder failed for blk_<*>", + 2, + [ + "PacketResponder failed for blk_6996194389878584395", + "PacketResponder failed for blk_-1547954353065580372" + ] + ], + [ + "BLOCK* NameSystem.addStoredBlock: blockMap updated: <*IP*> is added to blk_<*> size <*>", + 2, + [ + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to blk_-7017553867379051457 size 67108864", + "BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added to blk_-3249711809227781266 size 67108864" + ] + ] + ]} + diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index c83298edeab..d230dca566f 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -962,8 +962,7 @@ public UnresolvedPlan visitPatternsCommand(OpenSearchPPLParser.PatternsCommandCo AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_MAX_SAMPLE_COUNT))); Literal patternBufferLimit = cmdOptions.getOrDefault( - "max_sample_count", - AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT))); + "buffer_limit", AstDSL.intLiteral(settings.getSettingValue(Key.PATTERN_BUFFER_LIMIT))); Literal showNumberedToken = cmdOptions.getOrDefault( "show_numbered_token", diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java index 3d3bb5b6a1b..c272453b829 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLPatternsTest.java @@ -326,6 +326,39 @@ public void testPatternsAggregationModeWithGroupBy_ShowNumberedToken_ForSimplePa verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testPatternsAggregationMode_SpecifyAllParameters_ForBrainMethod() { + String ppl = + "source=EMP | patterns ENAME method=BRAIN mode=aggregation max_sample_count=2" + + " buffer_limit=1000 show_numbered_token=false variable_count_threshold=3" + + " frequency_threshold_percentage=0.1"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))]," + + " pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))]," + + " sample_logs=[SAFE_CAST(ITEM($1, 'sample_logs'))])\n" + + " LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n" + + " LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2, $3, $4, $5)])\n" + + " LogicalProject(ENAME=[$1], $f8=[2], $f9=[1000], $f10=[false]," + + " $f11=[0.1:DECIMAL(2, 1)], $f12=[3])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " Uncollect\n" + + " LogicalProject(patterns_field=[$cor0.patterns_field])\n" + + " LogicalValues(tuples=[[{ 0 }]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT TRY_CAST(`t20`.`patterns_field`['pattern'] AS STRING) `patterns_field`," + + " TRY_CAST(`t20`.`patterns_field`['pattern_count'] AS BIGINT) `pattern_count`," + + " TRY_CAST(`t20`.`patterns_field`['sample_logs'] AS ARRAY< STRING >) `sample_logs`\n" + + "FROM (SELECT `pattern`(`ENAME`, 2, 1000, FALSE, 0.1, 3) `patterns_field`\n" + + "FROM `scott`.`EMP`) `$cor0`,\n" + + "LATERAL UNNEST((SELECT `$cor0`.`patterns_field`\n" + + "FROM (VALUES (0)) `t` (`ZERO`))) `t20` (`patterns_field`)"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + @Test public void testPatternsAggregationMode_NotShowNumberedToken_ForBrainMethod() { String ppl = "source=EMP | patterns ENAME method=BRAIN mode=aggregation";