diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index 257f9fdaa35..16fe2ea151c 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -62,6 +62,7 @@ public enum BuiltinFunctionName { /** Collection functions */ ARRAY(FunctionName.of("array")), ARRAY_LENGTH(FunctionName.of("array_length")), + MVJOIN(FunctionName.of("mvjoin")), FORALL(FunctionName.of("forall")), EXISTS(FunctionName.of("exists")), FILTER(FunctionName.of("filter")), diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index c2a22497417..59ff2866bee 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -144,6 +144,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN; import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT; import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL; import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW; @@ -816,6 +817,15 @@ void populate() { registerOperator(WEEKOFYEAR, PPLBuiltinOperators.WEEK); registerOperator(INTERNAL_PATTERN_PARSER, PPLBuiltinOperators.PATTERN_PARSER); + + // Register MVJOIN to use Calcite's ARRAY_JOIN + register( + MVJOIN, + (FunctionImp2) + (builder, array, delimiter) -> + builder.makeCall(SqlLibraryOperators.ARRAY_JOIN, array, delimiter), + PPLTypeChecker.family(SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER)); + registerOperator(ARRAY, PPLBuiltinOperators.ARRAY); registerOperator(ARRAY_LENGTH, SqlLibraryOperators.ARRAY_LENGTH); registerOperator(FORALL, PPLBuiltinOperators.FORALL); diff --git a/docs/user/ppl/functions/collection.rst b/docs/user/ppl/functions/collection.rst index 05b3c1834bf..95d55fa7e2d 100644 --- a/docs/user/ppl/functions/collection.rst +++ b/docs/user/ppl/functions/collection.rst @@ -198,4 +198,37 @@ Example:: | result | |------------| | 80 | - +------------+ \ No newline at end of file + +------------+ + +MVJOIN +------ + +Description +>>>>>>>>>>> + +Version: 3.3.0 + +Usage: mvjoin(array, delimiter) joins string array elements into a single string, separated by the specified delimiter. NULL elements are excluded from the output. Only string arrays are supported. + +Argument type: array: ARRAY of STRING, delimiter: STRING + +Return type: STRING + +Example:: + + PPL> source=people | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head 1 + fetched rows / total rows = 1/1 + +------------------------------------+ + | result | + |------------------------------------| + | "a,b,c" | + +------------------------------------+ + + PPL> source=accounts | eval names_array = array(firstname, lastname) | eval result = mvjoin(names_array, ', ') | fields result | head 1 + fetched rows / total rows = 1/1 + +------------------------------------------+ + | result | + |------------------------------------------| + | "Amber, Duke" | + +------------------------------------------+ + diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java index 905b46cc36b..7aa448bf45a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java @@ -10,6 +10,7 @@ import java.io.IOException; import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.Test; import org.opensearch.client.ResponseException; @@ -21,6 +22,7 @@ public void init() throws Exception { super.init(); enableCalcite(); loadIndex(Index.BANK); + loadIndex(Index.ARRAY); } @Test @@ -241,4 +243,131 @@ public void testReduceWithUDF() throws IOException { verifyDataRows(actual, rows(60)); } + + @Test + public void testMvjoinWithStringArray() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head" + + " 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("a,b,c")); + } + + @Test + public void testMvjoinWithStringifiedNumbers() throws IOException { + // Note: mvjoin only supports string arrays + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array('1', '2', '3'), ' | ') | fields result |" + + " head 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("1 | 2 | 3")); + } + + @Test + public void testMvjoinWithMixedStringValues() throws IOException { + // mvjoin only supports string arrays + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array('1', 'text', '2.5'), ';') | fields result |" + + " head 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("1;text;2.5")); + } + + @Test + public void testMvjoinWithEmptyArray() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array(), '-') | fields result | head 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("")); + } + + @Test + public void testMvjoinWithStringBooleans() throws IOException { + // mvjoin only supports string arrays + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array('true', 'false', 'true'), '|') | fields" + + " result | head 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("true|false|true")); + } + + @Test + public void testMvjoinWithSpecialDelimiters() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval result = mvjoin(array('apple', 'banana', 'cherry'), ' AND ') |" + + " fields result | head 1", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "string")); + verifyDataRows(actual, rows("apple AND banana AND cherry")); + } + + @Test + public void testMvjoinWithArrayFromRealFields() throws IOException { + // Test mvjoin on arrays created from real fields using array() function + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval names_array = array(firstname, lastname) | eval result =" + + " mvjoin(names_array, ',') | fields firstname, lastname, result | head 1", + TEST_INDEX_BANK)); + + verifySchema( + actual, + schema("firstname", "string"), + schema("lastname", "string"), + schema("result", "string")); + // Verify that mvjoin correctly joins the firstname and lastname fields + JSONArray dataRows = actual.getJSONArray("datarows"); + assertTrue(dataRows.length() > 0); + JSONArray firstRow = dataRows.getJSONArray(0); + assertEquals(firstRow.getString(0) + "," + firstRow.getString(1), firstRow.getString(2)); + } + + @Test + public void testMvjoinWithMultipleRealFields() throws IOException { + // Test mvjoin with arrays created from multiple real fields + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval info_array = array(city, state, employer) | eval result =" + + " mvjoin(info_array, ' | ') | fields city, state, employer, result | head 1", + TEST_INDEX_BANK)); + + verifySchema( + actual, + schema("city", "string"), + schema("state", "string"), + schema("employer", "string"), + schema("result", "string")); + // Verify that mvjoin correctly joins the city, state, and employer fields + JSONArray dataRows = actual.getJSONArray("datarows"); + assertTrue(dataRows.length() > 0); + JSONArray firstRow = dataRows.getJSONArray(0); + assertEquals( + firstRow.getString(0) + " | " + firstRow.getString(1) + " | " + firstRow.getString(2), + firstRow.getString(3)); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index a1d5574763f..f2a2fdc53ad 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -5,6 +5,8 @@ package org.opensearch.sql.calcite.remote; +import static org.junit.Assert.assertTrue; +import static org.opensearch.sql.legacy.TestUtils.*; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE; @@ -558,6 +560,16 @@ public void testExplainAppendCommand() throws IOException { TEST_INDEX_BANK))); } + @Test + public void testMvjoinExplain() throws IOException { + String query = + "source=opensearch-sql_test_index_account | eval result = mvjoin(array('a', 'b', 'c'), ',')" + + " | fields result | head 1"; + var result = explainQueryToString(query); + String expected = loadExpectedPlan("explain_mvjoin.json"); + assertJsonEqualsIgnoreId(expected, result); + } + @Test public void testPushdownLimitIntoAggregation() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_mvjoin.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvjoin.json new file mode 100644 index 00000000000..a539122e998 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvjoin.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->1, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":1,\"timeout\":\"1m\"}, requestedTotalSize=1, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvjoin.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvjoin.json new file mode 100644 index 00000000000..f8c0bc1c908 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvjoin.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n EnumerableLimit(fetch=[1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n" + } +} \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index c86f12dd764..4c37be2f318 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -421,6 +421,7 @@ ISBLANK: 'ISBLANK'; // COLLECTION FUNCTIONS ARRAY: 'ARRAY'; ARRAY_LENGTH: 'ARRAY_LENGTH'; +MVJOIN: 'MVJOIN'; FORALL: 'FORALL'; FILTER: 'FILTER'; TRANSFORM: 'TRANSFORM'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 87230eedd63..6f1499affcd 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -930,6 +930,7 @@ geoipFunctionName collectionFunctionName : ARRAY | ARRAY_LENGTH + | MVJOIN | FORALL | EXISTS | FILTER diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java new file mode 100644 index 00000000000..cd98e18e4be --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLArrayFunctionTest extends CalcitePPLAbstractTest { + + public CalcitePPLArrayFunctionTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testMvjoinWithStringArray() { + String ppl = + "source=EMP | eval joined = mvjoin(array('a', 'b', 'c'), ',') | head 1 | fields joined"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(joined=[$8])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "joined=a,b,c\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARRAY_JOIN(`array`('a', 'b', 'c'), ',') `joined`\n" + + "FROM `scott`.`EMP`\n" + + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvjoinWithDifferentDelimiter() { + String ppl = + "source=EMP | eval joined = mvjoin(array('apple', 'banana', 'cherry'), ' | ') | head 1 |" + + " fields joined"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(joined=[$8])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('apple':VARCHAR," + + " 'banana':VARCHAR, 'cherry':VARCHAR), ' | ':VARCHAR)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "joined=apple | banana | cherry\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARRAY_JOIN(`array`('apple', 'banana', 'cherry'), ' | ') `joined`\n" + + "FROM `scott`.`EMP`\n" + + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvjoinWithEmptyArray() { + String ppl = "source=EMP | eval joined = mvjoin(array(), ',') | head 1 | fields joined"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(joined=[$8])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array(), ',')])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "joined=\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARRAY_JOIN(`array`(), ',') `joined`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvjoinWithFieldReference() { + String ppl = + "source=EMP | eval joined = mvjoin(array(ENAME, JOB), '-') | head 1 | fields joined"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(joined=[$8])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array($1, $2), '-')])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT ARRAY_JOIN(`array`(`ENAME`, `JOB`), '-') `joined`\n" + + "FROM `scott`.`EMP`\n" + + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java index 500d6873f89..8da3818cc04 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java @@ -175,4 +175,12 @@ public void testLog2WithWrongArgShouldThrow() { "source=EMP | eval log2 = log2(ENAME, JOB) | fields log2", "LOG2 function expects {[INTEGER]|[DOUBLE]}, but got [STRING,STRING]"); } + + // mvjoin should reject non-string single values + @Test + public void testMvjoinRejectsNonStringValues() { + verifyQueryThrowsException( + "source=EMP | eval result = mvjoin(42, ',') | fields result | head 1", + "MVJOIN function expects {[ARRAY,STRING]}, but got [INTEGER,STRING]"); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index c805e5a5dfb..7e9c1000d00 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -610,6 +610,14 @@ public void testRexSedMode() { anonymize("source=t | rex field=data mode=sed \"s/sensitive/clean/g\" | fields data")); } + @Test + public void testMvjoin() { + // Test mvjoin with array of strings + assertEquals( + "source=t | eval result=mvjoin(array(***,***,***),***) | fields + result", + anonymize("source=t | eval result=mvjoin(array('a', 'b', 'c'), ',') | fields result")); + } + @Test public void testRexWithOffsetField() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10);