Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class PinotConfig
private boolean useDateTrunc;
private int nonAggregateLimitForBrokerQueries = DEFAULT_NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES;
private boolean pushdownTopNBrokerQueries = true;
private boolean pushdownProjectExpressions = true;
private String grpcHost;
private int grpcPort = DEFAULT_PROXY_GRPC_PORT;
private boolean useProxy;
Expand Down Expand Up @@ -515,6 +516,18 @@ public PinotConfig setPushdownTopNBrokerQueries(boolean pushdownTopNBrokerQuerie
return this;
}

public boolean isPushdownProjectExpressions()
{
return pushdownProjectExpressions;
}

@Config("pinot.pushdown-project-expressions")
public PinotConfig setPushdownProjectExpressions(boolean pushdownProjectExpressions)
{
this.pushdownProjectExpressions = pushdownProjectExpressions;
return this;
}

public boolean isUseStreamingForSegmentQueries()
{
return useStreamingForSegmentQueries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class PinotSessionProperties
public static final String USE_PINOT_SQL_FOR_BROKER_QUERIES = "use_pinot_sql_for_broker_queries";
public static final String NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES = "non_aggregate_limit_for_broker_queries";
public static final String PUSHDOWN_TOPN_BROKER_QUERIES = "pushdown_topn_broker_queries";
public static final String PUSHDOWN_PROJECT_EXPRESSIONS = "pushdown_project_expressions";
public static final String FORBID_SEGMENT_QUERIES = "forbid_segment_queries";
public static final String NUM_SEGMENTS_PER_SPLIT = "num_segments_per_split";
public static final String TOPN_LARGE = "topn_large";
Expand Down Expand Up @@ -105,6 +106,11 @@ public static boolean getPushdownTopnBrokerQueries(ConnectorSession session)
return session.getProperty(PUSHDOWN_TOPN_BROKER_QUERIES, Boolean.class);
}

public static boolean getPushdownProjectExpressions(ConnectorSession session)
{
return session.getProperty(PUSHDOWN_PROJECT_EXPRESSIONS, Boolean.class);
}

public static int getTopNLarge(ConnectorSession session)
{
return session.getProperty(TOPN_LARGE, Integer.class);
Expand Down Expand Up @@ -184,6 +190,11 @@ public PinotSessionProperties(PinotConfig pinotConfig)
"Push down order by to pinot broker for top queries",
pinotConfig.isPushdownTopNBrokerQueries(),
false),
booleanProperty(
PUSHDOWN_PROJECT_EXPRESSIONS,
"Push down expressions in projection to Pinot broker",
pinotConfig.isPushdownProjectExpressions(),
false),
new PropertyMetadata<>(
CONNECTION_TIMEOUT,
"Connection Timeout to talk to Pinot servers",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.pinot.PinotException;
import com.facebook.presto.pinot.PinotSessionProperties;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.function.FunctionHandle;
import com.facebook.presto.spi.function.FunctionMetadata;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
Expand All @@ -29,7 +30,6 @@
import io.airlift.slice.Slice;

import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -43,12 +43,6 @@
public class PinotAggregationProjectConverter
extends PinotProjectExpressionConverter
{
// Pinot does not support modulus yet
private static final Map<String, String> PRESTO_TO_PINOT_OPERATORS = ImmutableMap.of(
"-", "SUB",
"+", "ADD",
"*", "MULT",
"/", "DIV");
private static final String FROM_UNIXTIME = "from_unixtime";

private static final Map<String, String> PRESTO_TO_PINOT_ARRAY_AGGREGATIONS = ImmutableMap.<String, String>builder()
Expand All @@ -58,8 +52,6 @@ public class PinotAggregationProjectConverter
.put("array_sum", "arraySum")
.build();

private final FunctionMetadataManager functionMetadataManager;
private final ConnectorSession session;
private final VariableReferenceExpression arrayVariableHint;

public PinotAggregationProjectConverter(TypeManager typeManager, FunctionMetadataManager functionMetadataManager, StandardFunctionResolution standardFunctionResolution, ConnectorSession session)
Expand All @@ -69,9 +61,7 @@ public PinotAggregationProjectConverter(TypeManager typeManager, FunctionMetadat

public PinotAggregationProjectConverter(TypeManager typeManager, FunctionMetadataManager functionMetadataManager, StandardFunctionResolution standardFunctionResolution, ConnectorSession session, VariableReferenceExpression arrayVariableHint)
{
super(typeManager, standardFunctionResolution);
this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
this.session = requireNonNull(session, "session is null");
super(typeManager, functionMetadataManager, standardFunctionResolution, session);
this.arrayVariableHint = arrayVariableHint;
}

Expand All @@ -80,11 +70,15 @@ public PinotExpression visitCall(
CallExpression call,
Map<VariableReferenceExpression, PinotQueryGeneratorContext.Selection> context)
{
Optional<PinotExpression> basicCallHandlingResult = basicCallHandling(call, context);
if (basicCallHandlingResult.isPresent()) {
return basicCallHandlingResult.get();
FunctionHandle functionHandle = call.getFunctionHandle();
if (standardFunctionResolution.isCastFunction(functionHandle)) {
return handleCast(call, context);
}
FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
if (standardFunctionResolution.isNotFunction(functionHandle) || standardFunctionResolution.isBetweenFunction(functionHandle)) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Unsupported function in pinot aggregation: " + functionHandle);
}

FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(functionHandle);
Optional<OperatorType> operatorTypeOptional = functionMetadata.getOperatorType();
if (operatorTypeOptional.isPresent()) {
OperatorType operatorType = operatorTypeOptional.get();
Expand Down Expand Up @@ -204,29 +198,6 @@ private PinotExpression handleDateTruncationViaDateTruncation(
return derived("dateTrunc(" + inputColumn + "," + inputFormat + ", " + inputTimeZone + ", " + getStringFromConstant(intervalParameter) + ")");
}

private PinotExpression handleArithmeticExpression(
CallExpression expression,
OperatorType operatorType,
Map<VariableReferenceExpression, PinotQueryGeneratorContext.Selection> context)
{
List<RowExpression> arguments = expression.getArguments();
if (arguments.size() == 1) {
String prefix = operatorType == OperatorType.NEGATION ? "-" : "";
return derived(prefix + arguments.get(0).accept(this, context).getDefinition());
}
if (arguments.size() == 2) {
PinotExpression left = arguments.get(0).accept(this, context);
PinotExpression right = arguments.get(1).accept(this, context);
String prestoOperator = operatorType.getOperator();
String pinotOperator = PRESTO_TO_PINOT_OPERATORS.get(prestoOperator);
if (pinotOperator == null) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Unsupported binary expression " + prestoOperator);
}
return derived(format("%s(%s, %s)", pinotOperator, left.getDefinition(), right.getDefinition()));
}
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("Don't know how to interpret %s as an arithmetic expression", expression));
}

private PinotExpression handleFunction(
CallExpression function,
Map<VariableReferenceExpression, PinotQueryGeneratorContext.Selection> context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private PinotExpression handleNot(CallExpression not, Function<VariableReference
RowExpression input = not.getArguments().get(0);
if (input instanceof SpecialFormExpression) {
SpecialFormExpression specialFormExpression = (SpecialFormExpression) input;
// NOT operator is only supported on top of the IN expression
// NOT operator is supported on top of IN and IS_NULL
if (specialFormExpression.getForm() == SpecialFormExpression.Form.IN) {
return handleIn(specialFormExpression, false, context);
}
Expand All @@ -262,7 +262,7 @@ private PinotExpression handleNot(CallExpression not, Function<VariableReference
}
}

throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("NOT operator is supported only on top of IN operator. Received: %s", not));
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("NOT operator is supported only on top of IN and IS_NULL operator. Received: %s", not));
}

private PinotExpression handleCast(CallExpression cast, Function<VariableReferenceExpression, Selection> context)
Expand Down Expand Up @@ -313,24 +313,22 @@ public PinotExpression visitCall(CallExpression call, Function<VariableReference
if (standardFunctionResolution.isBetweenFunction(functionHandle)) {
return handleBetween(call, context);
}
if (standardFunctionResolution.isArithmeticFunction(functionHandle)) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Arithmetic expressions are not supported in filter: " + call);
}

FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
Optional<OperatorType> operatorTypeOptional = functionMetadata.getOperatorType();
if (operatorTypeOptional.isPresent()) {
OperatorType operatorType = operatorTypeOptional.get();
if (operatorType.isArithmeticOperator()) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Arithmetic expressions are not supported in filter: " + call);
}
if (operatorType.isComparisonOperator()) {
return handleLogicalBinary(operatorType.getOperator(), call, context);
}
Optional<OperatorType> operatorType = functionMetadata.getOperatorType();
if (standardFunctionResolution.isComparisonFunction(functionHandle) && operatorType.isPresent()) {
return handleLogicalBinary(operatorType.get().getOperator(), call, context);
}
if ("contains".equals(functionMetadata.getName().getObjectName())) {
return handleContains(call, context);
}
// Handle queries like `eventTimestamp < 1391126400000`.
// Otherwise TypeManager.canCoerce(...) will return false and directly fail this query.
if (functionMetadata.getName().getObjectName().equalsIgnoreCase("$literal$timestamp") ||
functionMetadata.getName().getObjectName().equalsIgnoreCase("$literal$date")) {
functionMetadata.getName().getObjectName().equalsIgnoreCase("$literal$date")) {
return handleDateAndTimestampMagicLiteralFunction(call, context);
}
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("function %s not supported in filter", call));
Expand Down Expand Up @@ -374,19 +372,44 @@ public PinotExpression visitVariableReference(VariableReferenceExpression refere
return new PinotExpression(input.getDefinition(), input.getOrigin());
}

private String getExpressionOrConstantString(RowExpression expression, Function<VariableReferenceExpression, Selection> context)
{
if (expression instanceof ConstantExpression) {
return new PinotExpression(
getLiteralAsString((ConstantExpression) expression),
PinotQueryGeneratorContext.Origin.LITERAL
).getDefinition();
}
return expression.accept(this, context).getDefinition();
}

@Override
public PinotExpression visitSpecialForm(SpecialFormExpression specialForm, Function<VariableReferenceExpression, Selection> context)
{
switch (specialForm.getForm()) {
case IF:
case NULL_IF:
case SWITCH:
case WHEN:
case COALESCE:
case DEREFERENCE:
case ROW_CONSTRUCTOR:
case BIND:
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Pinot does not support the special form " + specialForm);
case SWITCH:
int numArguments = specialForm.getArguments().size();
String searchExpression = getExpressionOrConstantString(specialForm.getArguments().get(0), context);
return derived(format(
"CASE %s %s ELSE %s END",
searchExpression,
specialForm.getArguments().subList(1, numArguments - 1).stream()
.map(argument -> argument.accept(this, context).getDefinition())
.collect(Collectors.joining(" ")),
getExpressionOrConstantString(specialForm.getArguments().get(numArguments - 1), context)));
case WHEN:
return derived(format(
"%s %s THEN %s",
specialForm.getForm().toString(),
getExpressionOrConstantString(specialForm.getArguments().get(0), context),
getExpressionOrConstantString(specialForm.getArguments().get(1), context)));
case IN:
return handleIn(specialForm, true, context);
case AND:
Expand Down
Loading