Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ private PPLReturnTypes() {}
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIME_UDT);
public static final SqlReturnTypeInference TIMESTAMP_FORCE_NULLABLE =
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIMESTAMP_UDT);
public static final SqlReturnTypeInference IP_FORCE_NULLABLE =
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_IP_UDT);
public static SqlReturnTypeInference INTEGER_FORCE_NULLABLE =
ReturnTypes.INTEGER.andThen(SqlTypeTransforms.FORCE_NULLABLE);
public static SqlReturnTypeInference STRING_FORCE_NULLABLE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class UserDefinedFunctionUtils {
TYPE_FACTORY.createUDT(ExprUDT.EXPR_TIMESTAMP, true);
public static final RelDataType NULLABLE_STRING =
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR), true);
public static final RelDataType NULLABLE_IP_UDT = TYPE_FACTORY.createUDT(EXPR_IP, true);

public static RelDataType nullablePatternAggList =
createArrayType(
Expand All @@ -79,6 +80,7 @@ public class UserDefinedFunctionUtils {
ImmutableSet.of("match", "match_phrase", "match_bool_prefix", "match_phrase_prefix");
public static Set<String> MULTI_FIELDS_RELEVANCE_FUNCTION_SET =
ImmutableSet.of("simple_query_string", "query_string", "multi_match");
public static String IP_FUNCTION_NAME = "IP";

/**
* Creates a SqlUserDefinedAggFunction that wraps a Java class implementing an aggregate function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
.toUDF("TIME");

// IP cast function
public static final SqlOperator IP = new IPFunction().toUDF("IP");
public static final SqlOperator IP =
new IPFunction().toUDF(UserDefinedFunctionUtils.IP_FUNCTION_NAME);
public static final SqlOperator TIME_TO_SEC =
adaptExprMethodToUDF(
DateTimeFunctions.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@

package org.opensearch.sql.expression.function.udf.ip;

import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.ConstantExpression;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.opensearch.sql.data.model.ExprIpValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
Expand All @@ -32,33 +43,80 @@
* </ul>
*/
public class CompareIpFunction extends ImplementorUDF {
private final SqlKind kind;

private CompareIpFunction(ComparisonType comparisonType) {
super(new CompareImplementor(comparisonType), NullPolicy.ANY);
private CompareIpFunction(SqlKind kind) {
super(new CompareImplementor(kind), NullPolicy.ANY);
this.kind = kind;
}

public static CompareIpFunction less() {
return new CompareIpFunction(ComparisonType.LESS);
return new CompareIpFunction(SqlKind.LESS_THAN);
}

public static CompareIpFunction greater() {
return new CompareIpFunction(ComparisonType.GREATER);
return new CompareIpFunction(SqlKind.GREATER_THAN);
}

public static CompareIpFunction lessOrEquals() {
return new CompareIpFunction(ComparisonType.LESS_OR_EQUAL);
return new CompareIpFunction(SqlKind.LESS_THAN_OR_EQUAL);
}

public static CompareIpFunction greaterOrEquals() {
return new CompareIpFunction(ComparisonType.GREATER_OR_EQUAL);
return new CompareIpFunction(SqlKind.GREATER_THAN_OR_EQUAL);
}

public static CompareIpFunction equals() {
return new CompareIpFunction(ComparisonType.EQUALS);
return new CompareIpFunction(SqlKind.EQUALS);
}

public static CompareIpFunction notEquals() {
return new CompareIpFunction(ComparisonType.NOT_EQUALS);
return new CompareIpFunction(SqlKind.NOT_EQUALS);
}

@Override
public SqlUserDefinedFunction toUDF(String functionName, boolean isDeterministic) {
SqlIdentifier udfIdentifier =
new SqlIdentifier(Collections.singletonList(functionName), null, SqlParserPos.ZERO, null);
return new SqlUserDefinedFunction(
udfIdentifier,
kind,
getReturnTypeInference(),
InferTypes.ANY_NULLABLE,
getOperandMetadata(),
getFunction()) {
@Override
public boolean isDeterministic() {
return isDeterministic;
}

@Override
public @Nullable SqlOperator reverse() {
switch (kind) {
case LESS_THAN:
return PPLBuiltinOperators.GREATER_IP;
case GREATER_THAN:
return PPLBuiltinOperators.LESS_IP;
case LESS_THAN_OR_EQUAL:
return PPLBuiltinOperators.GTE_IP;
case GREATER_THAN_OR_EQUAL:
return PPLBuiltinOperators.LTE_IP;
case EQUALS:
return PPLBuiltinOperators.EQUALS_IP;
case NOT_EQUALS:
return PPLBuiltinOperators.NOT_EQUALS_IP;
default:
throw new IllegalArgumentException(
String.format(
Locale.ROOT, "CompareIpFunction is not supposed to be of kind: %s", kind));
}
}

@Override
public SqlSyntax getSyntax() {
return SqlSyntax.BINARY;
}
};
}

@Override
Expand All @@ -72,10 +130,10 @@ public UDFOperandMetadata getOperandMetadata() {
}

public static class CompareImplementor implements NotNullImplementor {
private final ComparisonType comparisonType;
private final SqlKind compareType;

public CompareImplementor(ComparisonType comparisonType) {
this.comparisonType = comparisonType;
public CompareImplementor(SqlKind compareType) {
this.compareType = compareType;
}

@Override
Expand All @@ -88,27 +146,27 @@ public Expression implement(
translatedOperands.get(0),
translatedOperands.get(1));

return generateComparisonExpression(compareResult, comparisonType);
return evalCompareResult(compareResult, compareType);
}

private static Expression generateComparisonExpression(
Expression compareResult, ComparisonType comparisonType) {
final ConstantExpression zero = Expressions.constant(0);
switch (comparisonType) {
private static Expression evalCompareResult(Expression compareResult, SqlKind compareType) {
final ConstantExpression zero = Expressions.constant(0);
switch (compareType) {
case EQUALS:
return Expressions.equal(compareResult, zero);
case NOT_EQUALS:
return Expressions.notEqual(compareResult, zero);
case LESS:
case LESS_THAN:
return Expressions.lessThan(compareResult, zero);
case LESS_OR_EQUAL:
case LESS_THAN_OR_EQUAL:
return Expressions.lessThanOrEqual(compareResult, zero);
case GREATER:
case GREATER_THAN:
return Expressions.greaterThan(compareResult, zero);
case GREATER_OR_EQUAL:
case GREATER_THAN_OR_EQUAL:
return Expressions.greaterThanOrEqual(compareResult, zero);
default:
throw new IllegalArgumentException("Unexpected comparison type: " + comparisonType);
throw new UnsupportedOperationException(
String.format(Locale.ROOT, "Unsupported compare type: %s", compareType));
}
}

Expand All @@ -128,13 +186,4 @@ private static ExprIpValue toExprIpValue(Object obj) {
throw new IllegalArgumentException("Invalid IP type: " + obj);
}
}

public enum ComparisonType {
EQUALS,
NOT_EQUALS,
LESS,
LESS_OR_EQUAL,
GREATER,
GREATER_OR_EQUAL
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
import org.opensearch.sql.data.model.ExprIpValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
Expand Down Expand Up @@ -46,8 +46,7 @@ public UDFOperandMetadata getOperandMetadata() {

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return ReturnTypes.explicit(
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_IP, true));
return PPLReturnTypes.IP_FORCE_NULLABLE;
}

public static class CastImplementor
Expand Down
22 changes: 18 additions & 4 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.junit.Assume;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.ResponseException;
Expand Down Expand Up @@ -96,18 +97,31 @@ public void testFilterByCompareStringTimePushDownExplain() throws IOException {

@Test
public void testFilterByCompareIPCoercion() throws IOException {
// Should automatically cast the string literal to IP.
String expected = loadExpectedPlan("explain_filter_compare_ip.json");
// The index of host is flaky (different from test to test)
// Should automatically cast the string literal to IP and pushdown it as a range query
assertJsonEqualsIgnoreFieldIndex(
expected,
loadExpectedPlan("explain_filter_compare_ip.json"),
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | where host > '1.1.1.1' | fields host",
TEST_INDEX_WEBLOGS)));
}

@Test
public void testFilterByCompareIpv6Swapped() throws IOException {
// Ignored in v2: the serialized string is unstable because of function properties
Assume.assumeTrue(isCalciteEnabled());
// Test swapping ip and string. In v2, this is pushed down as script;
// with Calcite, it will still be pushed down as a range query
assertJsonEqualsIgnoreFieldIndex(
loadExpectedPlan("explain_filter_compare_ipv6_swapped.json"),
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | where '::ffff:1234' <= host | fields host",
TEST_INDEX_WEBLOGS)));
}

private static void assertJsonEqualsIgnoreFieldIndex(String expected, String actual) throws IOException {
String reorderedExpected = maskIndexAndReorderProject(expected);
String reorderedActual = maskIndexAndReorderProject(actual);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$3])\n LogicalFilter(condition=[GREATER_IP($3, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], SCRIPT->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAensKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQDfXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJHUkVBVEVSX0lQIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfSwKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIklQIiwKICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgInN5bnRheCI6ICJGVU5DVElPTiIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJsaXRlcmFsIjogIjEuMS4xLjEiLAogICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICBdLAogICAgICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIk9USEVSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlCiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiQk9PTEVBTiIsCiAgICAibnVsbGFibGUiOiB0cnVlCiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":1754322819643187000}},\"boost\":1.0}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[GREATER_IP($0, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"1.1.1.1\",\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->LTE_IP(IP('::ffff:1234':VARCHAR), $0), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"::ffff:1234\",\"to\":null,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..11=[{inputs}], expr#12=['::ffff:1234':VARCHAR], expr#13=[IP($t12)], expr#14=[LTE_IP($t13, $t0)], host=[$t0], $condition=[$t14])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n"
}
}
Loading
Loading