Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public enum BuiltinFunctionName {
MVAPPEND(FunctionName.of("mvappend")),
MVJOIN(FunctionName.of("mvjoin")),
MVINDEX(FunctionName.of("mvindex")),
MVDEDUP(FunctionName.of("mvdedup")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
FILTER(FunctionName.of("filter")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVAPPEND;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
Expand Down Expand Up @@ -989,6 +990,7 @@ void populate() {

registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
Expand Down
38 changes: 38 additions & 0 deletions docs/user/ppl/functions/collection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,44 @@ Example::
| [1,text,2.5] |
+--------------+

MVDEDUP
-------

Description
>>>>>>>>>>>

Usage: mvdedup(array) removes duplicate values from a multivalue array while preserving the order of first occurrence. NULL elements are filtered out. Returns an array with duplicates removed, or null if the input is null.

Argument type: array: ARRAY

Return type: ARRAY

Example::

os> source=people | eval array = array(1, 2, 2, 3, 1, 4), result = mvdedup(array) | fields result | head 1
fetched rows / total rows = 1/1
+-----------+
| result |
|-----------|
| [1,2,3,4] |
+-----------+

os> source=people | eval array = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(array) | fields result | head 1
fetched rows / total rows = 1/1
+-----------+
| result |
|-----------|
| [z,a,b,c] |
+-----------+

os> source=people | eval array = array(), result = mvdedup(array) | fields result | head 1
fetched rows / total rows = 1/1
+--------+
| result |
|--------|
| [] |
+--------+

MVINDEX
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,82 @@ public void testMvindexRangeSingleElement() throws IOException {
verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(3)));
}

@Test
public void testMvdedupWithDuplicates() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |"
+ " fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(1, 2, 3, 4)));
}

@Test
public void testMvdedupWithNoDuplicates() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 |"
+ " fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(1, 2, 3, 4)));
}

@Test
public void testMvdedupWithAllDuplicates() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(5, 5, 5, 5), result = mvdedup(arr) | head 1 |"
+ " fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of(5)));
}

@Test
public void testMvdedupWithEmptyArray() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array(), result = mvdedup(arr) | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of()));
}

@Test
public void testMvdedupWithStrings() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('apple', 'banana', 'apple', 'cherry', 'banana'),"
+ " result = mvdedup(arr) | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("apple", "banana", "cherry")));
}

@Test
public void testMvdedupPreservesOrder() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result ="
+ " mvdedup(arr) | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
// Should preserve first occurrence order: z, a, b, c
verifyDataRows(actual, rows(List.of("z", "a", "b", "c")));
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH';
MVAPPEND: 'MVAPPEND';
MVJOIN: 'MVJOIN';
MVINDEX: 'MVINDEX';
MVDEDUP: 'MVDEDUP';
FORALL: 'FORALL';
FILTER: 'FILTER';
TRANSFORM: 'TRANSFORM';
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ collectionFunctionName
| MVAPPEND
| MVJOIN
| MVINDEX
| MVDEDUP
| FORALL
| EXISTS
| FILTER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,80 @@ public void testMvindexRangeNegative() {
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvdedupWithDuplicates() {
String ppl =
"source=EMP | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |"
+ " fields result";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(result=[$9])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 2, 3, 1, 4)],"
+ " result=[ARRAY_DISTINCT(array(1, 2, 2, 3, 1, 4))])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "result=[1, 2, 3, 4]\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_DISTINCT(ARRAY(1, 2, 2, 3, 1, 4)) `result`\n"
+ "FROM `scott`.`EMP`\n"
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvdedupWithNoDuplicates() {
String ppl =
"source=EMP | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 | fields"
+ " result";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(result=[$9])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 3, 4)],"
+ " result=[ARRAY_DISTINCT(array(1, 2, 3, 4))])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "result=[1, 2, 3, 4]\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_DISTINCT(ARRAY(1, 2, 3, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}

@Test
public void testMvdedupPreservesOrder() {
String ppl =
"source=EMP | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(arr) |"
+ " head 1 | fields result";
RelNode root = getRelNode(ppl);

String expectedLogical =
"LogicalProject(result=[$9])\n"
+ " LogicalSort(fetch=[1])\n"
+ " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],"
+ " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array('z', 'a', 'z', 'b', 'a', 'c')],"
+ " result=[ARRAY_DISTINCT(array('z', 'a', 'z', 'b', 'a', 'c'))])\n"
+ " LogicalTableScan(table=[[scott, EMP]])\n";
verifyLogical(root, expectedLogical);

String expectedResult = "result=[z, a, b, c]\n";
verifyResult(root, expectedResult);

String expectedSparkSql =
"SELECT ARRAY_DISTINCT(ARRAY('z', 'a', 'z', 'b', 'a', 'c')) `result`\n"
+ "FROM `scott`.`EMP`\n"
+ "LIMIT 1";
verifyPPLToSparkSQL(root, expectedSparkSql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,15 @@ public void testMvindex() {
anonymize("source=t | eval result=mvindex(array(1, 2, 3, 4, 5), 1, 3) | fields result"));
}

@Test
public void testMvdedup() {
// Test mvdedup with array containing duplicates
assertEquals(
"source=table | eval identifier=mvdedup(array(***,***,***,***,***,***)) | fields +"
+ " identifier",
anonymize("source=t | eval result=mvdedup(array(1, 2, 2, 3, 1, 4)) | fields result"));
}

@Test
public void testRexWithOffsetField() {
when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10);
Expand Down
Loading