Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public enum BuiltinFunctionName {
/** Collection functions */
ARRAY(FunctionName.of("array")),
ARRAY_LENGTH(FunctionName.of("array_length")),
MVJOIN(FunctionName.of("mvjoin")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
FILTER(FunctionName.of("filter")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
Expand Down Expand Up @@ -816,6 +817,15 @@ void populate() {
registerOperator(WEEKOFYEAR, PPLBuiltinOperators.WEEK);

registerOperator(INTERNAL_PATTERN_PARSER, PPLBuiltinOperators.PATTERN_PARSER);

// Register MVJOIN to use Calcite's ARRAY_JOIN
register(
MVJOIN,
(FunctionImp2)
(builder, array, delimiter) ->
builder.makeCall(SqlLibraryOperators.ARRAY_JOIN, array, delimiter),
PPLTypeChecker.family(SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER));

registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(ARRAY_LENGTH, SqlLibraryOperators.ARRAY_LENGTH);
registerOperator(FORALL, PPLBuiltinOperators.FORALL);
Expand Down
35 changes: 34 additions & 1 deletion docs/user/ppl/functions/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,37 @@ Example::
| result |
|------------|
| 80 |
+------------+
+------------+

MVJOIN
------

Description
>>>>>>>>>>>

Version: 3.3.0

Usage: mvjoin(array, delimiter) joins string array elements into a single string, separated by the specified delimiter. NULL elements are excluded from the output. Only string arrays are supported.

Argument type: array: ARRAY of STRING, delimiter: STRING

Return type: STRING

Example::

PPL> source=people | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head 1
fetched rows / total rows = 1/1
+------------------------------------+
| result |
|------------------------------------|
| "a,b,c" |
+------------------------------------+

PPL> source=accounts | eval names_array = array(firstname, lastname) | eval result = mvjoin(names_array, ', ') | fields result | head 1
fetched rows / total rows = 1/1
+------------------------------------------+
| result |
|------------------------------------------|
| "Amber, Duke" |
+------------------------------------------+

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.io.IOException;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.ResponseException;
Expand All @@ -21,6 +22,7 @@ public void init() throws Exception {
super.init();
enableCalcite();
loadIndex(Index.BANK);
loadIndex(Index.ARRAY);
}

@Test
Expand Down Expand Up @@ -241,4 +243,131 @@ public void testReduceWithUDF() throws IOException {

verifyDataRows(actual, rows(60));
}

@Test
public void testMvjoinWithStringArray() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array('a', 'b', 'c'), ',') | fields result | head"
+ " 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("a,b,c"));
}

@Test
public void testMvjoinWithStringifiedNumbers() throws IOException {
// Note: mvjoin only supports string arrays
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array('1', '2', '3'), ' | ') | fields result |"
+ " head 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("1 | 2 | 3"));
}

@Test
public void testMvjoinWithMixedStringValues() throws IOException {
// mvjoin only supports string arrays
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array('1', 'text', '2.5'), ';') | fields result |"
+ " head 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("1;text;2.5"));
}

@Test
public void testMvjoinWithEmptyArray() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array(), '-') | fields result | head 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows(""));
}

@Test
public void testMvjoinWithStringBooleans() throws IOException {
// mvjoin only supports string arrays
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array('true', 'false', 'true'), '|') | fields"
+ " result | head 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("true|false|true"));
}

@Test
public void testMvjoinWithSpecialDelimiters() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvjoin(array('apple', 'banana', 'cherry'), ' AND ') |"
+ " fields result | head 1",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "string"));
verifyDataRows(actual, rows("apple AND banana AND cherry"));
}

@Test
public void testMvjoinWithArrayFromRealFields() throws IOException {
// Test mvjoin on arrays created from real fields using array() function
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval names_array = array(firstname, lastname) | eval result ="
+ " mvjoin(names_array, ',') | fields firstname, lastname, result | head 1",
TEST_INDEX_BANK));

verifySchema(
actual,
schema("firstname", "string"),
schema("lastname", "string"),
schema("result", "string"));
// Verify that mvjoin correctly joins the firstname and lastname fields
JSONArray dataRows = actual.getJSONArray("datarows");
assertTrue(dataRows.length() > 0);
JSONArray firstRow = dataRows.getJSONArray(0);
assertEquals(firstRow.getString(0) + "," + firstRow.getString(1), firstRow.getString(2));
}

@Test
public void testMvjoinWithMultipleRealFields() throws IOException {
// Test mvjoin with arrays created from multiple real fields
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval info_array = array(city, state, employer) | eval result ="
+ " mvjoin(info_array, ' | ') | fields city, state, employer, result | head 1",
TEST_INDEX_BANK));

verifySchema(
actual,
schema("city", "string"),
schema("state", "string"),
schema("employer", "string"),
schema("result", "string"));
// Verify that mvjoin correctly joins the city, state, and employer fields
JSONArray dataRows = actual.getJSONArray("datarows");
assertTrue(dataRows.length() > 0);
JSONArray firstRow = dataRows.getJSONArray(0);
assertEquals(
firstRow.getString(0) + " | " + firstRow.getString(1) + " | " + firstRow.getString(2),
firstRow.getString(3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.calcite.remote;

import static org.junit.Assert.assertTrue;
import static org.opensearch.sql.legacy.TestUtils.*;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
Expand Down Expand Up @@ -558,6 +560,16 @@ public void testExplainAppendCommand() throws IOException {
TEST_INDEX_BANK)));
}

@Test
public void testMvjoinExplain() throws IOException {
String query =
"source=opensearch-sql_test_index_account | eval result = mvjoin(array('a', 'b', 'c'), ',')"
+ " | fields result | head 1";
var result = explainQueryToString(query);
String expected = loadExpectedPlan("explain_mvjoin.json");
assertJsonEqualsIgnoreId(expected, result);
}

@Test
public void testPushdownLimitIntoAggregation() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[LIMIT->1, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":1,\"timeout\":\"1m\"}, requestedTotalSize=1, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalSort(fetch=[1])\n LogicalProject(result=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..16=[{inputs}], expr#17=['a'], expr#18=['b'], expr#19=['c'], expr#20=[array($t17, $t18, $t19)], expr#21=[','], expr#22=[ARRAY_JOIN($t20, $t21)], result=[$t22])\n EnumerableLimit(fetch=[1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n"
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ ISBLANK: 'ISBLANK';
// COLLECTION FUNCTIONS
ARRAY: 'ARRAY';
ARRAY_LENGTH: 'ARRAY_LENGTH';
MVJOIN: 'MVJOIN';
FORALL: 'FORALL';
FILTER: 'FILTER';
TRANSFORM: 'TRANSFORM';
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ geoipFunctionName
collectionFunctionName
: ARRAY
| ARRAY_LENGTH
| MVJOIN
| FORALL
| EXISTS
| FILTER
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ppl.calcite;

import org.apache.calcite.rel.RelNode;
import org.apache.calcite.test.CalciteAssert;
import org.junit.Test;

public class CalcitePPLArrayFunctionTest extends CalcitePPLAbstractTest {

public CalcitePPLArrayFunctionTest() {
super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL);
}

@Test
public void testMvjoinWithStringArray() {
String ppl =
"source=EMP | eval joined = mvjoin(array('a', 'b', 'c'), ',') | head 1 | fields joined";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(joined=[$8])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('a', 'b', 'c'), ',')])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "joined=a,b,c\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_JOIN(`array`('a', 'b', 'c'), ',') `joined`\n"
+ "FROM `scott`.`EMP`\n"
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvjoinWithDifferentDelimiter() {
String ppl =
"source=EMP | eval joined = mvjoin(array('apple', 'banana', 'cherry'), ' | ') | head 1 |"
+ " fields joined";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(joined=[$8])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array('apple':VARCHAR,"
+ " 'banana':VARCHAR, 'cherry':VARCHAR), ' | ':VARCHAR)])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "joined=apple | banana | cherry\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_JOIN(`array`('apple', 'banana', 'cherry'), ' | ') `joined`\n"
+ "FROM `scott`.`EMP`\n"
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvjoinWithEmptyArray() {
String ppl = "source=EMP | eval joined = mvjoin(array(), ',') | head 1 | fields joined";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(joined=[$8])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array(), ',')])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "joined=\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_JOIN(`array`(), ',') `joined`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvjoinWithFieldReference() {
String ppl =
"source=EMP | eval joined = mvjoin(array(ENAME, JOB), '-') | head 1 | fields joined";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(joined=[$8])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], joined=[ARRAY_JOIN(array($1, $2), '-')])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedSparkSql =
"SELECT ARRAY_JOIN(`array`(`ENAME`, `JOB`), '-') `joined`\n"
+ "FROM `scott`.`EMP`\n"
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}
}
Loading
Loading