Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.calcite.type.ExprSqlType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.calcite.utils.PPLOperandTypes;
import org.opensearch.sql.calcite.utils.binning.BinConstants;
import org.opensearch.sql.expression.function.ImplementorUDF;
Expand Down Expand Up @@ -44,7 +48,20 @@ public WidthBucketFunction() {

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.VARCHAR_2000;
return (opBinding) -> {
RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
RelDataType arg0Type = opBinding.getOperandType(0);
return dateRelatedType(arg0Type)
? arg0Type
: typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.VARCHAR, 2000), true);
};
}

public static boolean dateRelatedType(RelDataType type) {
return type instanceof ExprSqlType exprSqlType
&& List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP)
.contains(exprSqlType.getUdt());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,9 +867,8 @@ public void testStatsWithBinsOnTimeField_Count() throws IOException {

JSONObject result =
executeQuery("source=events_null | bin @timestamp bins=3 | stats count() by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(result, schema("count()", null, "bigint"), schema("@timestamp", null, "string"));
verifySchema(
result, schema("count()", null, "bigint"), schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(5, "2024-07-01 00:00:00"), rows(1, "2024-07-01 00:05:00"));

Expand Down Expand Up @@ -907,10 +906,8 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException {
JSONObject result =
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "string"));
result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(44.62, "2024-07-01 00:00:00"), rows(50.0, "2024-07-01 00:05:00"));

Expand Down Expand Up @@ -951,13 +948,11 @@ public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException {
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by"
+ " region, @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result,
schema("count()", null, "bigint"),
schema("region", null, "string"),
schema("@timestamp", null, "string"));
schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(
result,
Expand All @@ -976,13 +971,11 @@ public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException {
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false "
+ " avg(cpu_usage) by region, @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result,
schema("avg(cpu_usage)", null, "double"),
schema("region", null, "string"),
schema("@timestamp", null, "string"));
schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(
result,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"num": 11}'
- '{"index": {}}'
- '{"num": 15}'
- '{"index": {}}'
- '{"num": 22}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"big decimal literal":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test | bin num bins=3 | stats count() by num

- match: { total: 2 }
- match: {"datarows": [[2, "10-20"], [1, "20-30"]]}
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,7 @@ public class OpenSearchExprValueFactory {
* @param typeMapping A data type mapping produced by aggregation.
*/
public void extendTypeMapping(Map<String, OpenSearchDataType> typeMapping) {
for (var field : typeMapping.keySet()) {
// Prevent overwriting, because aggregation engine may be not aware
// of all niceties of all types.
this.typeMapping.putIfAbsent(field, typeMapping.get(field));
}
this.typeMapping.putAll(typeMapping);
}

@Getter @Setter private OpenSearchAggregationResponseParser parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.immutables.value.Value;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.calcite.type.ExprSqlType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.expression.function.udf.binning.WidthBucketFunction;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
Expand Down Expand Up @@ -220,13 +218,8 @@ static boolean containsWidthBucketFuncOnDate(LogicalProject project) {
expr ->
expr instanceof RexCall rexCall
&& rexCall.getOperator().equals(WIDTH_BUCKET)
&& dateRelatedType(rexCall.getOperands().getFirst().getType()));
}

static boolean dateRelatedType(RelDataType type) {
return type instanceof ExprSqlType exprSqlType
&& List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP)
.contains(exprSqlType.getUdt());
&& WidthBucketFunction.dateRelatedType(
rexCall.getOperands().getFirst().getType()));
}
}
}
Loading