From 50af8bfdc76783671376a715e302975531efba31 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 18 Nov 2025 13:30:42 -0800 Subject: [PATCH 1/4] Support eval function Signed-off-by: Kai Huang --- .../function/BuiltinFunctionName.java | 1 + .../function/CollectionUDF/MVDedupCore.java | 50 ++++++++++++ .../CollectionUDF/MVDedupFunctionImpl.java | 78 +++++++++++++++++++ .../function/PPLBuiltinOperators.java | 2 + .../expression/function/PPLFuncImpTable.java | 2 + .../remote/CalciteArrayFunctionIT.java | 78 +++++++++++++++++++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 1 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 1 + .../calcite/CalcitePPLArrayFunctionTest.java | 74 ++++++++++++++++++ 9 files changed, 287 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java create mode 100644 core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index e2508af219b..d30af69d32e 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -75,6 +75,7 @@ public enum BuiltinFunctionName { MVAPPEND(FunctionName.of("mvappend")), MVJOIN(FunctionName.of("mvjoin")), MVINDEX(FunctionName.of("mvindex")), + MVDEDUP(FunctionName.of("mvdedup")), FORALL(FunctionName.of("forall")), EXISTS(FunctionName.of("exists")), FILTER(FunctionName.of("filter")), diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java new file mode 100644 index 00000000000..4cc7879ddc9 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function.CollectionUDF; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +/** Core logic for `mvdedup` command to remove duplicate values from a multivalue field */ +public class MVDedupCore { + + /** + * Remove duplicate elements from the input array while preserving order. Null input returns null. + * Empty array returns empty array. See {@ref MVDedupFunctionImplTest} for detailed behavior. + * + * @param array input array (can be null) + * @return array with duplicates removed, or null if input is null + */ + public static List removeDuplicates(Object array) { + if (array == null) { + return null; + } + + if (!(array instanceof List)) { + // If not a list, wrap it in a list + List result = new ArrayList<>(); + result.add(array); + return result; + } + + List inputList = (List) array; + if (inputList.isEmpty()) { + return new ArrayList<>(); + } + + // Use LinkedHashSet to preserve insertion order while removing duplicates + Set seen = new LinkedHashSet<>(); + for (Object item : inputList) { + if (item != null) { + seen.add(item); + } + } + + return new ArrayList<>(seen); + } +} diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java new file mode 100644 index 00000000000..1015da27d42 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function.CollectionUDF; + +import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType; + +import java.util.List; +import org.apache.calcite.adapter.enumerable.NotNullImplementor; +import org.apache.calcite.adapter.enumerable.NullPolicy; +import org.apache.calcite.adapter.enumerable.RexToLixTranslator; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Types; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.expression.function.ImplementorUDF; +import org.opensearch.sql.expression.function.UDFOperandMetadata; + +/** + * MVDedup function that removes duplicate values from a multivalue array while preserving order. + * Returns an array with duplicates removed or null for consistent type behavior. + */ +public class MVDedupFunctionImpl extends ImplementorUDF { + + public MVDedupFunctionImpl() { + super(new MVDedupImplementor(), NullPolicy.ARG0); + } + + @Override + public SqlReturnTypeInference getReturnTypeInference() { + return sqlOperatorBinding -> { + RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory(); + + if (sqlOperatorBinding.getOperandCount() == 0) { + return typeFactory.createSqlType(SqlTypeName.NULL); + } + + RelDataType operandType = sqlOperatorBinding.getOperandType(0); + + // If operand is already an array, return the same array type + if (!operandType.isStruct() && operandType.getComponentType() != null) { + return createArrayType( + typeFactory, + typeFactory.createTypeWithNullability(operandType.getComponentType(), true), + true); + } + + // If operand is not an array, wrap it in an array type + return createArrayType( + typeFactory, typeFactory.createTypeWithNullability(operandType, true), true); + }; + } + + @Override + public UDFOperandMetadata getOperandMetadata() { + return null; + } + + public static class MVDedupImplementor implements NotNullImplementor { + @Override + public Expression implement( + RexToLixTranslator translator, RexCall call, List translatedOperands) { + return Expressions.call( + Types.lookupMethod(MVDedupFunctionImpl.class, "mvdedup", Object.class), + translatedOperands.get(0)); + } + } + + public static Object mvdedup(Object array) { + return MVDedupCore.removeDuplicates(array); + } +} diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java index 7b7b591874b..a715472b732 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java @@ -47,6 +47,7 @@ import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl; +import org.opensearch.sql.expression.function.CollectionUDF.MVDedupFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MapAppendFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MapRemoveFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl; @@ -391,6 +392,7 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable { public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append"); public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE"); public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend"); + public static final SqlOperator MVDEDUP = new MVDedupFunctionImpl().toUDF("mvdedup"); public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter"); public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform"); public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce"); diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index bb5160f0a1a..35d149854cb 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -150,6 +150,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLYFUNCTION; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTI_MATCH; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVAPPEND; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX; import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN; import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT; @@ -989,6 +990,7 @@ void populate() { registerOperator(ARRAY, PPLBuiltinOperators.ARRAY); registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND); + registerOperator(MVDEDUP, PPLBuiltinOperators.MVDEDUP); registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND); registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT); registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java index c829565768f..52a6e181e20 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java @@ -489,4 +489,82 @@ public void testMvindexRangeSingleElement() throws IOException { verifySchema(actual, schema("result", "array")); verifyDataRows(actual, rows(List.of(3))); } + + @Test + public void testMvdedupWithDuplicates() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |" + + " fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + verifyDataRows(actual, rows(List.of(1, 2, 3, 4))); + } + + @Test + public void testMvdedupWithNoDuplicates() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 |" + + " fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + verifyDataRows(actual, rows(List.of(1, 2, 3, 4))); + } + + @Test + public void testMvdedupWithAllDuplicates() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array(5, 5, 5, 5), result = mvdedup(arr) | head 1 |" + + " fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + verifyDataRows(actual, rows(List.of(5))); + } + + @Test + public void testMvdedupWithEmptyArray() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array(), result = mvdedup(arr) | head 1 | fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + verifyDataRows(actual, rows(List.of())); + } + + @Test + public void testMvdedupWithStrings() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array('apple', 'banana', 'apple', 'cherry', 'banana')," + + " result = mvdedup(arr) | head 1 | fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + verifyDataRows(actual, rows(List.of("apple", "banana", "cherry"))); + } + + @Test + public void testMvdedupPreservesOrder() throws IOException { + JSONObject actual = + executeQuery( + String.format( + "source=%s | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result =" + + " mvdedup(arr) | head 1 | fields result", + TEST_INDEX_BANK)); + + verifySchema(actual, schema("result", "array")); + // Should preserve first occurrence order: z, a, b, c + verifyDataRows(actual, rows(List.of("z", "a", "b", "c"))); + } } diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 2e0643fa283..ebe0fcb4f21 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -442,6 +442,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH'; MVAPPEND: 'MVAPPEND'; MVJOIN: 'MVJOIN'; MVINDEX: 'MVINDEX'; +MVDEDUP: 'MVDEDUP'; FORALL: 'FORALL'; FILTER: 'FILTER'; TRANSFORM: 'TRANSFORM'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 494adb15717..d1775578b7b 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -1095,6 +1095,7 @@ collectionFunctionName | MVAPPEND | MVJOIN | MVINDEX + | MVDEDUP | FORALL | EXISTS | FILTER diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java index bffa20175d5..e1acd96f7ca 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java @@ -214,4 +214,78 @@ public void testMvindexRangeNegative() { + "LIMIT 1"; verifyPPLToSparkSQL(root, expectedSparkSql); } + + @Test + public void testMvdedupWithDuplicates() { + String ppl = + "source=EMP | eval arr = array(1, 2, 2, 3, 1, 4), result = mvdedup(arr) | head 1 |" + + " fields result"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(result=[$9])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 2, 3, 1, 4)]," + + " result=[mvdedup(array(1, 2, 2, 3, 1, 4))])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "result=[1, 2, 3, 4]\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT MVDEDUP(ARRAY(1, 2, 2, 3, 1, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvdedupWithNoDuplicates() { + String ppl = + "source=EMP | eval arr = array(1, 2, 3, 4), result = mvdedup(arr) | head 1 | fields" + + " result"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(result=[$9])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 3, 4)]," + + " result=[mvdedup(array(1, 2, 3, 4))])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "result=[1, 2, 3, 4]\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT MVDEDUP(ARRAY(1, 2, 3, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvdedupPreservesOrder() { + String ppl = + "source=EMP | eval arr = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(arr) |" + + " head 1 | fields result"; + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalProject(result=[$9])\n" + + " LogicalSort(fetch=[1])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array('z', 'a', 'z', 'b', 'a', 'c')]," + + " result=[mvdedup(array('z', 'a', 'z', 'b', 'a', 'c'))])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "result=[z, a, b, c]\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT MVDEDUP(ARRAY('z', 'a', 'z', 'b', 'a', 'c')) `result`\n" + + "FROM `scott`.`EMP`\n" + + "LIMIT 1"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } From 40e0aa4bbc2d345de3f3b9a86f4a4ba7c8d624ba Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 19 Nov 2025 10:57:26 -0800 Subject: [PATCH 2/4] Updates Signed-off-by: Kai Huang --- docs/user/ppl/functions/collection.rst | 38 +++++++++++++++++++ .../ppl/utils/PPLQueryDataAnonymizerTest.java | 9 +++++ 2 files changed, 47 insertions(+) diff --git a/docs/user/ppl/functions/collection.rst b/docs/user/ppl/functions/collection.rst index 5c2b7c30f74..34c02074641 100644 --- a/docs/user/ppl/functions/collection.rst +++ b/docs/user/ppl/functions/collection.rst @@ -302,6 +302,44 @@ Example:: | [1,text,2.5] | +--------------+ +MVDEDUP +------- + +Description +>>>>>>>>>>> + +Usage: mvdedup(array) removes duplicate values from a multivalue array while preserving the order of first occurrence. NULL elements are filtered out. Returns an array with duplicates removed, or null if the input is null. + +Argument type: array: ARRAY + +Return type: ARRAY + +Example:: + + os> source=people | eval array = array(1, 2, 2, 3, 1, 4), result = mvdedup(array) | fields result | head 1 + fetched rows / total rows = 1/1 + +-----------+ + | result | + |-----------| + | [1,2,3,4] | + +-----------+ + + os> source=people | eval array = array('z', 'a', 'z', 'b', 'a', 'c'), result = mvdedup(array) | fields result | head 1 + fetched rows / total rows = 1/1 + +-----------+ + | result | + |-----------| + | [z,a,b,c] | + +-----------+ + + os> source=people | eval array = array(), result = mvdedup(array) | fields result | head 1 + fetched rows / total rows = 1/1 + +--------+ + | result | + |--------| + | [] | + +--------+ + MVINDEX ------- diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index f205b9fe0cc..767171debd5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -822,6 +822,15 @@ public void testMvindex() { anonymize("source=t | eval result=mvindex(array(1, 2, 3, 4, 5), 1, 3) | fields result")); } + @Test + public void testMvdedup() { + // Test mvdedup with array containing duplicates + assertEquals( + "source=table | eval identifier=mvdedup(array(***,***,***,***,***,***)) | fields +" + + " identifier", + anonymize("source=t | eval result=mvdedup(array(1, 2, 2, 3, 1, 4)) | fields result")); + } + @Test public void testRexWithOffsetField() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10); From 9e3302c15eeeb3c7df135ba68be3a4ac2db4acef Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 19 Nov 2025 11:11:45 -0800 Subject: [PATCH 3/4] update javadoc Signed-off-by: Kai Huang --- .../sql/expression/function/CollectionUDF/MVDedupCore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java index 4cc7879ddc9..9f4d929bfa1 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java @@ -15,10 +15,10 @@ public class MVDedupCore { /** * Remove duplicate elements from the input array while preserving order. Null input returns null. - * Empty array returns empty array. See {@ref MVDedupFunctionImplTest} for detailed behavior. + * Empty array returns empty array. Null elements within the array are filtered out. * * @param array input array (can be null) - * @return array with duplicates removed, or null if input is null + * @return array with duplicates removed and null elements filtered out, or null if input is null */ public static List removeDuplicates(Object array) { if (array == null) { From 234f115abf99d2f0ca9fd9dba03bd739745f1ee0 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 20 Nov 2025 10:38:22 -0800 Subject: [PATCH 4/4] Update to use ARRAY_DISTINCT Signed-off-by: Kai Huang --- .../function/CollectionUDF/MVDedupCore.java | 50 ------------ .../CollectionUDF/MVDedupFunctionImpl.java | 78 ------------------- .../function/PPLBuiltinOperators.java | 2 - .../expression/function/PPLFuncImpTable.java | 2 +- .../calcite/CalcitePPLArrayFunctionTest.java | 14 ++-- 5 files changed, 9 insertions(+), 137 deletions(-) delete mode 100644 core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java delete mode 100644 core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java deleted file mode 100644 index 9f4d929bfa1..00000000000 --- a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupCore.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.expression.function.CollectionUDF; - -import java.util.ArrayList; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Set; - -/** Core logic for `mvdedup` command to remove duplicate values from a multivalue field */ -public class MVDedupCore { - - /** - * Remove duplicate elements from the input array while preserving order. Null input returns null. - * Empty array returns empty array. Null elements within the array are filtered out. - * - * @param array input array (can be null) - * @return array with duplicates removed and null elements filtered out, or null if input is null - */ - public static List removeDuplicates(Object array) { - if (array == null) { - return null; - } - - if (!(array instanceof List)) { - // If not a list, wrap it in a list - List result = new ArrayList<>(); - result.add(array); - return result; - } - - List inputList = (List) array; - if (inputList.isEmpty()) { - return new ArrayList<>(); - } - - // Use LinkedHashSet to preserve insertion order while removing duplicates - Set seen = new LinkedHashSet<>(); - for (Object item : inputList) { - if (item != null) { - seen.add(item); - } - } - - return new ArrayList<>(seen); - } -} diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java deleted file mode 100644 index 1015da27d42..00000000000 --- a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/MVDedupFunctionImpl.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.expression.function.CollectionUDF; - -import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType; - -import java.util.List; -import org.apache.calcite.adapter.enumerable.NotNullImplementor; -import org.apache.calcite.adapter.enumerable.NullPolicy; -import org.apache.calcite.adapter.enumerable.RexToLixTranslator; -import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.linq4j.tree.Expressions; -import org.apache.calcite.linq4j.tree.Types; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.sql.type.SqlReturnTypeInference; -import org.apache.calcite.sql.type.SqlTypeName; -import org.opensearch.sql.expression.function.ImplementorUDF; -import org.opensearch.sql.expression.function.UDFOperandMetadata; - -/** - * MVDedup function that removes duplicate values from a multivalue array while preserving order. - * Returns an array with duplicates removed or null for consistent type behavior. - */ -public class MVDedupFunctionImpl extends ImplementorUDF { - - public MVDedupFunctionImpl() { - super(new MVDedupImplementor(), NullPolicy.ARG0); - } - - @Override - public SqlReturnTypeInference getReturnTypeInference() { - return sqlOperatorBinding -> { - RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory(); - - if (sqlOperatorBinding.getOperandCount() == 0) { - return typeFactory.createSqlType(SqlTypeName.NULL); - } - - RelDataType operandType = sqlOperatorBinding.getOperandType(0); - - // If operand is already an array, return the same array type - if (!operandType.isStruct() && operandType.getComponentType() != null) { - return createArrayType( - typeFactory, - typeFactory.createTypeWithNullability(operandType.getComponentType(), true), - true); - } - - // If operand is not an array, wrap it in an array type - return createArrayType( - typeFactory, typeFactory.createTypeWithNullability(operandType, true), true); - }; - } - - @Override - public UDFOperandMetadata getOperandMetadata() { - return null; - } - - public static class MVDedupImplementor implements NotNullImplementor { - @Override - public Expression implement( - RexToLixTranslator translator, RexCall call, List translatedOperands) { - return Expressions.call( - Types.lookupMethod(MVDedupFunctionImpl.class, "mvdedup", Object.class), - translatedOperands.get(0)); - } - } - - public static Object mvdedup(Object array) { - return MVDedupCore.removeDuplicates(array); - } -} diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java index a715472b732..7b7b591874b 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java @@ -47,7 +47,6 @@ import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl; -import org.opensearch.sql.expression.function.CollectionUDF.MVDedupFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MapAppendFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.MapRemoveFunctionImpl; import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl; @@ -392,7 +391,6 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable { public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append"); public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE"); public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend"); - public static final SqlOperator MVDEDUP = new MVDedupFunctionImpl().toUDF("mvdedup"); public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter"); public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform"); public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce"); diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index 35d149854cb..a2ea4cfcb30 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -990,7 +990,7 @@ void populate() { registerOperator(ARRAY, PPLBuiltinOperators.ARRAY); registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND); - registerOperator(MVDEDUP, PPLBuiltinOperators.MVDEDUP); + registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT); registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND); registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT); registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java index e1acd96f7ca..176fb534f37 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java @@ -227,7 +227,7 @@ public void testMvdedupWithDuplicates() { + " LogicalSort(fetch=[1])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 2, 3, 1, 4)]," - + " result=[mvdedup(array(1, 2, 2, 3, 1, 4))])\n" + + " result=[ARRAY_DISTINCT(array(1, 2, 2, 3, 1, 4))])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); @@ -235,7 +235,9 @@ public void testMvdedupWithDuplicates() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT MVDEDUP(ARRAY(1, 2, 2, 3, 1, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; + "SELECT ARRAY_DISTINCT(ARRAY(1, 2, 2, 3, 1, 4)) `result`\n" + + "FROM `scott`.`EMP`\n" + + "LIMIT 1"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -251,7 +253,7 @@ public void testMvdedupWithNoDuplicates() { + " LogicalSort(fetch=[1])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array(1, 2, 3, 4)]," - + " result=[mvdedup(array(1, 2, 3, 4))])\n" + + " result=[ARRAY_DISTINCT(array(1, 2, 3, 4))])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); @@ -259,7 +261,7 @@ public void testMvdedupWithNoDuplicates() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT MVDEDUP(ARRAY(1, 2, 3, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; + "SELECT ARRAY_DISTINCT(ARRAY(1, 2, 3, 4)) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -275,7 +277,7 @@ public void testMvdedupPreservesOrder() { + " LogicalSort(fetch=[1])\n" + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," + " SAL=[$5], COMM=[$6], DEPTNO=[$7], arr=[array('z', 'a', 'z', 'b', 'a', 'c')]," - + " result=[mvdedup(array('z', 'a', 'z', 'b', 'a', 'c'))])\n" + + " result=[ARRAY_DISTINCT(array('z', 'a', 'z', 'b', 'a', 'c'))])\n" + " LogicalTableScan(table=[[scott, EMP]])\n"; verifyLogical(root, expectedLogical); @@ -283,7 +285,7 @@ public void testMvdedupPreservesOrder() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT MVDEDUP(ARRAY('z', 'a', 'z', 'b', 'a', 'c')) `result`\n" + "SELECT ARRAY_DISTINCT(ARRAY('z', 'a', 'z', 'b', 'a', 'c')) `result`\n" + "FROM `scott`.`EMP`\n" + "LIMIT 1"; verifyPPLToSparkSQL(root, expectedSparkSql);