Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.calcite.type.ExprSqlType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.calcite.utils.PPLOperandTypes;
import org.opensearch.sql.calcite.utils.binning.BinConstants;
import org.opensearch.sql.expression.function.ImplementorUDF;
Expand Down Expand Up @@ -44,7 +48,20 @@ public WidthBucketFunction() {

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.VARCHAR_2000;
return (opBinding) -> {
RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
RelDataType arg0Type = opBinding.getOperandType(0);
return dateRelatedType(arg0Type)
? arg0Type
: typeFactory.createTypeWithNullability(
typeFactory.createSqlType(SqlTypeName.VARCHAR, 2000), true);
};
}

public static boolean dateRelatedType(RelDataType type) {
return type instanceof ExprSqlType exprSqlType
&& List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP)
.contains(exprSqlType.getUdt());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,9 +867,8 @@ public void testStatsWithBinsOnTimeField_Count() throws IOException {

JSONObject result =
executeQuery("source=events_null | bin @timestamp bins=3 | stats count() by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(result, schema("count()", null, "bigint"), schema("@timestamp", null, "string"));
verifySchema(
result, schema("count()", null, "bigint"), schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(5, "2024-07-01 00:00:00"), rows(1, "2024-07-01 00:05:00"));

Expand Down Expand Up @@ -907,10 +906,8 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException {
JSONObject result =
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "string"));
result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(44.62, "2024-07-01 00:00:00"), rows(50.0, "2024-07-01 00:05:00"));

Expand Down Expand Up @@ -951,13 +948,11 @@ public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException {
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by"
+ " region, @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result,
schema("count()", null, "bigint"),
schema("region", null, "string"),
schema("@timestamp", null, "string"));
schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(
result,
Expand All @@ -976,13 +971,11 @@ public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException {
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false "
+ " avg(cpu_usage) by region, @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result,
schema("avg(cpu_usage)", null, "double"),
schema("region", null, "string"),
schema("@timestamp", null, "string"));
schema("@timestamp", null, "timestamp"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(
result,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ calcite:
LogicalProject(count()=[$2], @timestamp=[$0], region=[$1])
LogicalAggregate(group=[{0, 1}], count()=[COUNT()])
LogicalProject(@timestamp=[$15], region=[$7])
LogicalFilter(condition=[IS NOT NULL($7)])
LogicalFilter(condition=[AND(IS NOT NULL($15), IS NOT NULL($7))])
LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())])
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
physical: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ calcite:
LogicalProject(avg(cpu_usage)=[$2], @timestamp=[$0], region=[$1])
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)])
LogicalProject(@timestamp=[$15], region=[$7], cpu_usage=[$6])
LogicalFilter(condition=[IS NOT NULL($7)])
LogicalFilter(condition=[AND(IS NOT NULL($15), IS NOT NULL($7))])
LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())])
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
physical: |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"num": 11}'
- '{"index": {}}'
- '{"num": 15}'
- '{"index": {}}'
- '{"num": 22}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"big decimal literal":
- skip:
features:
- headers
- allowed_warnings
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test | bin num bins=3 | stats count() by num

- match: { total: 2 }
- match: { "schema": [ { "name": "count()", "type": "bigint" }, { "name": "num", "type": "string" }] }
- match: {"datarows": [[2, "10-20"], [1, "20-30"]]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the result without this patch? is it a test about overwrite existing type? if yes, better to match the schema too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected the issue link, #4115. It will throw exception without this PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add the schema verification in the commit acecdd1

Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,7 @@ public class OpenSearchExprValueFactory {
* @param typeMapping A data type mapping produced by aggregation.
*/
public void extendTypeMapping(Map<String, OpenSearchDataType> typeMapping) {
for (var field : typeMapping.keySet()) {
// Prevent overwriting, because aggregation engine may be not aware
// of all niceties of all types.
this.typeMapping.putIfAbsent(field, typeMapping.get(field));
}
this.typeMapping.putAll(typeMapping);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: should we check Settings.Key.PPL_SYNTAX_LEGACY_PREFERRED first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug actually. We'd better always have this change despite of what this configuration is.

}

@Getter @Setter private OpenSearchAggregationResponseParser parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.immutables.value.Value;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.calcite.type.ExprSqlType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.expression.function.udf.binning.WidthBucketFunction;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
Expand Down Expand Up @@ -220,13 +218,8 @@ static boolean containsWidthBucketFuncOnDate(LogicalProject project) {
expr ->
expr instanceof RexCall rexCall
&& rexCall.getOperator().equals(WIDTH_BUCKET)
&& dateRelatedType(rexCall.getOperands().getFirst().getType()));
}

static boolean dateRelatedType(RelDataType type) {
return type instanceof ExprSqlType exprSqlType
&& List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP)
.contains(exprSqlType.getUdt());
&& WidthBucketFunction.dateRelatedType(
rexCall.getOperands().getFirst().getType()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -972,8 +972,8 @@ public void constructUnsupportedTypeThrowException() {

@Test
// aggregation adds info about new columns to the factory,
// it is accepted without overwriting existing data.
public void factoryMappingsAreExtendableWithoutOverWrite()
// it will overwrite existing type to fix https://github.com/opensearch-project/sql/issues/4115
public void factoryMappingsAreExtendableWithOverWrite()
throws NoSuchFieldException, IllegalAccessException {
var factory =
new OpenSearchExprValueFactory(Map.of("value", OpenSearchDataType.of(INTEGER)), true);
Expand All @@ -990,7 +990,7 @@ public void factoryMappingsAreExtendableWithoutOverWrite()
() -> assertEquals(2, mapping.size()),
() -> assertTrue(mapping.containsKey("value")),
() -> assertTrue(mapping.containsKey("agg")),
() -> assertEquals(OpenSearchDataType.of(INTEGER), mapping.get("value")),
() -> assertEquals(OpenSearchDataType.of(DOUBLE), mapping.get("value")),
() -> assertEquals(OpenSearchDataType.of(DATE), mapping.get("agg")));
}

Expand Down
Loading