diff --git a/core/src/main/java/org/opensearch/sql/expression/function/udf/binning/WidthBucketFunction.java b/core/src/main/java/org/opensearch/sql/expression/function/udf/binning/WidthBucketFunction.java index ef68b17fa14..160827c7961 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/udf/binning/WidthBucketFunction.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/udf/binning/WidthBucketFunction.java @@ -11,9 +11,13 @@ import org.apache.calcite.adapter.enumerable.RexToLixTranslator; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexCall; -import org.apache.calcite.sql.type.ReturnTypes; import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.calcite.type.ExprSqlType; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT; import org.opensearch.sql.calcite.utils.PPLOperandTypes; import org.opensearch.sql.calcite.utils.binning.BinConstants; import org.opensearch.sql.expression.function.ImplementorUDF; @@ -44,7 +48,20 @@ public WidthBucketFunction() { @Override public SqlReturnTypeInference getReturnTypeInference() { - return ReturnTypes.VARCHAR_2000; + return (opBinding) -> { + RelDataTypeFactory typeFactory = opBinding.getTypeFactory(); + RelDataType arg0Type = opBinding.getOperandType(0); + return dateRelatedType(arg0Type) + ? arg0Type + : typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.VARCHAR, 2000), true); + }; + } + + public static boolean dateRelatedType(RelDataType type) { + return type instanceof ExprSqlType exprSqlType + && List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP) + .contains(exprSqlType.getUdt()); } @Override diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java index 3f0adb08432..13e6b4a47e1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java @@ -867,9 +867,8 @@ public void testStatsWithBinsOnTimeField_Count() throws IOException { JSONObject result = executeQuery("source=events_null | bin @timestamp bins=3 | stats count() by @timestamp"); - // TODO: @timestamp should keep date as its type, to be addressed by this issue: - // https://github.com/opensearch-project/sql/issues/4317 - verifySchema(result, schema("count()", null, "bigint"), schema("@timestamp", null, "string")); + verifySchema( + result, schema("count()", null, "bigint"), schema("@timestamp", null, "timestamp")); // auto_date_histogram will choose span=5m for bins=3 verifyDataRows(result, rows(5, "2024-07-01 00:00:00"), rows(1, "2024-07-01 00:05:00")); @@ -907,10 +906,8 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException { JSONObject result = executeQuery( "source=events_null | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp"); - // TODO: @timestamp should keep date as its type, to be addressed by this issue: - // https://github.com/opensearch-project/sql/issues/4317 verifySchema( - result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "string")); + result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "timestamp")); // auto_date_histogram will choose span=5m for bins=3 verifyDataRows(result, rows(44.62, "2024-07-01 00:00:00"), rows(50.0, "2024-07-01 00:05:00")); @@ -951,13 +948,11 @@ public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException { executeQuery( "source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by" + " region, @timestamp"); - // TODO: @timestamp should keep date as its type, to be addressed by this issue: - // https://github.com/opensearch-project/sql/issues/4317 verifySchema( result, schema("count()", null, "bigint"), schema("region", null, "string"), - schema("@timestamp", null, "string")); + schema("@timestamp", null, "timestamp")); // auto_date_histogram will choose span=5m for bins=3 verifyDataRows( result, @@ -976,13 +971,11 @@ public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException { executeQuery( "source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false " + " avg(cpu_usage) by region, @timestamp"); - // TODO: @timestamp should keep date as its type, to be addressed by this issue: - // https://github.com/opensearch-project/sql/issues/4317 verifySchema( result, schema("avg(cpu_usage)", null, "double"), schema("region", null, "string"), - schema("@timestamp", null, "string")); + schema("@timestamp", null, "timestamp")); // auto_date_histogram will choose span=5m for bins=3 verifyDataRows( result, diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml index a285a731ab9..8d3e77e622e 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term.yaml @@ -4,7 +4,7 @@ calcite: LogicalProject(count()=[$2], @timestamp=[$0], region=[$1]) LogicalAggregate(group=[{0, 1}], count()=[COUNT()]) LogicalProject(@timestamp=[$15], region=[$7]) - LogicalFilter(condition=[IS NOT NULL($7)]) + LogicalFilter(condition=[AND(IS NOT NULL($15), IS NOT NULL($7))]) LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())]) CalciteLogicalIndexScan(table=[[OpenSearch, events]]) physical: | diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml index 147902bdf0d..ffc24ee8939 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time_and_term2.yaml @@ -4,7 +4,7 @@ calcite: LogicalProject(avg(cpu_usage)=[$2], @timestamp=[$0], region=[$1]) LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)]) LogicalProject(@timestamp=[$15], region=[$7], cpu_usage=[$6]) - LogicalFilter(condition=[IS NOT NULL($7)]) + LogicalFilter(condition=[AND(IS NOT NULL($15), IS NOT NULL($7))]) LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())]) CalciteLogicalIndexScan(table=[[OpenSearch, events]]) physical: | diff --git a/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4415.yml b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4415.yml new file mode 100644 index 00000000000..4a7f2426426 --- /dev/null +++ b/integ-test/src/yamlRestTest/resources/rest-api-spec/test/issues/4415.yml @@ -0,0 +1,42 @@ +setup: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : true + - do: + bulk: + index: test + refresh: true + body: + - '{"index": {}}' + - '{"num": 11}' + - '{"index": {}}' + - '{"num": 15}' + - '{"index": {}}' + - '{"num": 22}' + +--- +teardown: + - do: + query.settings: + body: + transient: + plugins.calcite.enabled : false + +--- +"big decimal literal": + - skip: + features: + - headers + - allowed_warnings + - do: + headers: + Content-Type: 'application/json' + ppl: + body: + query: source=test | bin num bins=3 | stats count() by num + + - match: { total: 2 } + - match: { "schema": [ { "name": "count()", "type": "bigint" }, { "name": "num", "type": "string" }] } + - match: {"datarows": [[2, "10-20"], [1, "20-30"]]} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index 6fb08c6ab69..f303cb725e0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -85,11 +85,7 @@ public class OpenSearchExprValueFactory { * @param typeMapping A data type mapping produced by aggregation. */ public void extendTypeMapping(Map typeMapping) { - for (var field : typeMapping.keySet()) { - // Prevent overwriting, because aggregation engine may be not aware - // of all niceties of all types. - this.typeMapping.putIfAbsent(field, typeMapping.get(field)); - } + this.typeMapping.putAll(typeMapping); } @Getter @Setter private OpenSearchAggregationResponseParser parser; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java index 0e9f68dfc3d..51539314718 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchAggregateIndexScanRule.java @@ -16,7 +16,6 @@ import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; import org.apache.calcite.rel.logical.LogicalProject; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -24,8 +23,7 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.immutables.value.Value; import org.opensearch.sql.ast.expression.Argument; -import org.opensearch.sql.calcite.type.ExprSqlType; -import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT; +import org.opensearch.sql.expression.function.udf.binning.WidthBucketFunction; import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan; /** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */ @@ -220,13 +218,8 @@ static boolean containsWidthBucketFuncOnDate(LogicalProject project) { expr -> expr instanceof RexCall rexCall && rexCall.getOperator().equals(WIDTH_BUCKET) - && dateRelatedType(rexCall.getOperands().getFirst().getType())); - } - - static boolean dateRelatedType(RelDataType type) { - return type instanceof ExprSqlType exprSqlType - && List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP) - .contains(exprSqlType.getUdt()); + && WidthBucketFunction.dateRelatedType( + rexCall.getOperands().getFirst().getType())); } } } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java index e8588fa778c..32ba07d4d53 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactoryTest.java @@ -972,8 +972,8 @@ public void constructUnsupportedTypeThrowException() { @Test // aggregation adds info about new columns to the factory, - // it is accepted without overwriting existing data. - public void factoryMappingsAreExtendableWithoutOverWrite() + // it will overwrite existing type to fix https://github.com/opensearch-project/sql/issues/4115 + public void factoryMappingsAreExtendableWithOverWrite() throws NoSuchFieldException, IllegalAccessException { var factory = new OpenSearchExprValueFactory(Map.of("value", OpenSearchDataType.of(INTEGER)), true); @@ -990,7 +990,7 @@ public void factoryMappingsAreExtendableWithoutOverWrite() () -> assertEquals(2, mapping.size()), () -> assertTrue(mapping.containsKey("value")), () -> assertTrue(mapping.containsKey("agg")), - () -> assertEquals(OpenSearchDataType.of(INTEGER), mapping.get("value")), + () -> assertEquals(OpenSearchDataType.of(DOUBLE), mapping.get("value")), () -> assertEquals(OpenSearchDataType.of(DATE), mapping.get("agg"))); }