Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ public String doHttpActionWithHeaders(
Optional<String> rpcService)
{
requestBuilder = requestBuilder
.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON)
.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON);
if (requestBody.isPresent()) {
requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
}
if (rpcService.isPresent()) {
requestBuilder
.setHeader(pinotConfig.getCallerHeaderParam(), pinotConfig.getCallerHeaderValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class PinotConfig
private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES);

private boolean allowMultipleAggregations;
private boolean preferBrokerQueries = true;
private boolean forbidBrokerQueries;
private boolean forbidSegmentQueries;
private int numSegmentsPerSplit = 1;
private boolean ignoreEmptyResponses;
Expand Down Expand Up @@ -319,15 +319,15 @@ public PinotConfig setCallerHeaderParam(String callerHeaderParam)
return this;
}

public boolean isPreferBrokerQueries()
public boolean isForbidBrokerQueries()
{
return preferBrokerQueries;
return forbidBrokerQueries;
}

@Config("pinot.prefer-broker-queries")
public PinotConfig setPreferBrokerQueries(boolean preferBrokerQueries)
@Config("pinot.forbid-broker-queries")
public PinotConfig setForbidBrokerQueries(boolean forbidBrokerQueries)
{
this.preferBrokerQueries = preferBrokerQueries;
this.forbidBrokerQueries = forbidBrokerQueries;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class PinotSessionProperties
{
private static final String CONNECTION_TIMEOUT = "connection_timeout";
private static final String PREFER_BROKER_QUERIES = "prefer_broker_queries";
private static final String FORBID_BROKER_QUERIES = "forbid_broker_queries";
private static final String IGNORE_EMPTY_RESPONSES = "ignore_empty_responses";
private static final String RETRY_COUNT = "retry_count";
private static final String USE_DATE_TRUNC = "use_date_trunc";
Expand All @@ -53,9 +53,9 @@ public static int getNumSegmentsPerSplit(ConnectorSession session)
return segmentsPerSplit <= 0 ? Integer.MAX_VALUE : segmentsPerSplit;
}

public static boolean isPreferBrokerQueries(ConnectorSession session)
public static boolean isForbidBrokerQueries(ConnectorSession session)
{
return session.getProperty(PREFER_BROKER_QUERIES, Boolean.class);
return session.getProperty(FORBID_BROKER_QUERIES, Boolean.class);
}

public static boolean isForbidSegmentQueries(ConnectorSession session)
Expand Down Expand Up @@ -93,9 +93,9 @@ public PinotSessionProperties(PinotConfig pinotConfig)
{
sessionProperties = ImmutableList.of(
booleanProperty(
PREFER_BROKER_QUERIES,
"Prefer queries to broker even when parallel scan is enabled for aggregation queries",
pinotConfig.isPreferBrokerQueries(),
FORBID_BROKER_QUERIES,
"Forbid queries to the broker",
pinotConfig.isForbidBrokerQueries(),
false),
booleanProperty(
FORBID_SEGMENT_QUERIES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ public PinotQueryGeneratorContext getContext()
public Optional<PinotQueryGeneratorResult> generate(PlanNode plan, ConnectorSession session)
{
try {
boolean preferBrokerQueries = PinotSessionProperties.isPreferBrokerQueries(session);
PinotQueryGeneratorContext context = requireNonNull(plan.accept(new PinotQueryPlanVisitor(session, preferBrokerQueries), new PinotQueryGeneratorContext()), "Resulting context is null");
boolean isQueryShort = context.isQueryShort(PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session));
return Optional.of(new PinotQueryGeneratorResult(context.toQuery(pinotConfig, preferBrokerQueries, isQueryShort), context));
PinotQueryGeneratorContext context = requireNonNull(plan.accept(
new PinotQueryPlanVisitor(session),
new PinotQueryGeneratorContext()),
"Resulting context is null");
return Optional.of(new PinotQueryGeneratorResult(context.toQuery(pinotConfig, session), context));
}
catch (PinotException e) {
log.debug(e, "Possibly benign error when pushing plan into scan node %s", plan);
Expand Down Expand Up @@ -221,12 +222,12 @@ class PinotQueryPlanVisitor
extends PlanVisitor<PinotQueryGeneratorContext, PinotQueryGeneratorContext>
{
private final ConnectorSession session;
private final boolean preferBrokerQueries;
private final boolean forbidBrokerQueries;

protected PinotQueryPlanVisitor(ConnectorSession session, boolean preferBrokerQueries)
protected PinotQueryPlanVisitor(ConnectorSession session)
{
this.session = session;
this.preferBrokerQueries = preferBrokerQueries;
this.forbidBrokerQueries = PinotSessionProperties.isForbidBrokerQueries(session);
}

@Override
Expand Down Expand Up @@ -400,7 +401,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu
PinotQueryGeneratorContext context = node.getSource().accept(this, contextIn.withVariablesInAggregation(variablesInAggregation));
requireNonNull(context, "context is null");
checkSupported(!node.getStep().isOutputPartial(), "partial aggregations are not supported in Pinot pushdown framework");
checkSupported(preferBrokerQueries, "Cannot push aggregation in segment mode");
checkSupported(!forbidBrokerQueries, "Cannot push aggregation in segment mode");

// 2nd pass
LinkedHashMap<VariableReferenceExpression, Selection> newSelections = new LinkedHashMap<>();
Expand Down Expand Up @@ -448,7 +449,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu
public PinotQueryGeneratorContext visitLimit(LimitNode node, PinotQueryGeneratorContext context)
{
checkSupported(!node.isPartial(), String.format("pinot query generator cannot handle partial limit"));
checkSupported(preferBrokerQueries, "Cannot push limit in segment mode");
checkSupported(!forbidBrokerQueries, "Cannot push limit in segment mode");
context = node.getSource().accept(this, context);
requireNonNull(context, "context is null");
return context.withLimit(node.getCount()).withOutputColumns(node.getOutputVariables());
Expand All @@ -459,7 +460,7 @@ public PinotQueryGeneratorContext visitTopN(TopNNode node, PinotQueryGeneratorCo
{
context = node.getSource().accept(this, context);
requireNonNull(context, "context is null");
checkSupported(preferBrokerQueries, "Cannot push topn in segment mode");
checkSupported(!forbidBrokerQueries, "Cannot push topn in segment mode");
checkSupported(node.getStep().equals(TopNNode.Step.SINGLE), "Can only push single logical topn in");
return context.withTopN(getOrderingScheme(node), node.getCount()).withOutputColumns(node.getOutputVariables());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
import com.facebook.presto.pinot.PinotColumnHandle;
import com.facebook.presto.pinot.PinotConfig;
import com.facebook.presto.pinot.PinotException;
import com.facebook.presto.pinot.PinotSessionProperties;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.block.SortOrder;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -38,7 +41,6 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.lang.StrictMath.toIntExact;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -60,18 +62,15 @@ public class PinotQueryGeneratorContext
private final OptionalInt limit;
private final int aggregations;

public boolean isQueryShort(int nonAggregateRowLimit)
{
return hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateRowLimit;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("selections", selections)
.add("groupByColumns", groupByColumns)
.add("topNColumnOrderingMap", topNColumnOrderingMap)
.add("hiddenColumnSet", hiddenColumnSet)
.add("variablesInAggregation", variablesInAggregation)
.add("from", from)
.add("filter", filter)
.add("limit", limit)
Expand Down Expand Up @@ -261,9 +260,11 @@ Set<VariableReferenceExpression> getVariablesInAggregation()
/**
* Convert the current context to a PQL
*/
public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, boolean preferBrokerQueries, boolean isQueryShort)
public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, ConnectorSession session)
{
boolean forBroker = preferBrokerQueries && isQueryShort;
int nonAggregateShortQueryLimit = PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session);
boolean isQueryShort = hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit;
boolean forBroker = !PinotSessionProperties.isForbidBrokerQueries(session) && isQueryShort;
if (!pinotConfig.isAllowMultipleAggregations() && aggregations > 1 && !groupByColumns.isEmpty()) {
throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by is forbidden");
}
Expand All @@ -276,6 +277,9 @@ public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, boolean
.filter(s -> !groupByColumns.contains(s.getKey())) // remove the group by columns from the query as Pinot barfs if the group by column is an expression
.map(s -> s.getValue().getDefinition())
.collect(Collectors.joining(", "));
if (expressions.isEmpty()) {
throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Empty PQL expressions: " + toString());
}

String tableName = from.orElseThrow(() -> new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Table name not encountered yet"));
String query = "SELECT " + expressions + " FROM " + tableName + (forBroker ? "" : TABLE_NAME_SUFFIX_TEMPLATE);
Expand Down Expand Up @@ -338,11 +342,11 @@ else if (!groupByColumns.isEmpty()) {
query += " " + limitKeyWord + " " + queryLimit;
}

List<PinotColumnHandle> columnHandles = ImmutableList.copyOf(getAssignments().values());
return new PinotQueryGenerator.GeneratedPql(tableName, query, getIndicesMappingFromPinotSchemaToPrestoSchema(query, columnHandles), groupByColumns.size(), filter.isPresent(), isQueryShort);
List<Integer> indices = getIndicesMappingFromPinotSchemaToPrestoSchema(query, getAssignments());
return new PinotQueryGenerator.GeneratedPql(tableName, query, indices, groupByColumns.size(), filter.isPresent(), isQueryShort);
}

private List<Integer> getIndicesMappingFromPinotSchemaToPrestoSchema(String query, List<PinotColumnHandle> handles)
private List<Integer> getIndicesMappingFromPinotSchemaToPrestoSchema(String query, Map<VariableReferenceExpression, PinotColumnHandle> assignments)
{
LinkedHashMap<VariableReferenceExpression, Selection> expressionsInPinotOrder = new LinkedHashMap<>();
for (VariableReferenceExpression groupByColumn : groupByColumns) {
Expand All @@ -358,33 +362,37 @@ private List<Integer> getIndicesMappingFromPinotSchemaToPrestoSchema(String quer
expressionsInPinotOrder.putAll(selections);

checkSupported(
handles.size() == expressionsInPinotOrder.keySet().stream().filter(key -> !hiddenColumnSet.contains(key)).count(),
assignments.size() == expressionsInPinotOrder.keySet().stream().filter(key -> !hiddenColumnSet.contains(key)).count(),
"Expected returned expressions %s to match selections %s",
Joiner.on(",").withKeyValueSeparator(":").join(expressionsInPinotOrder), Joiner.on(",").join(handles));

Map<VariableReferenceExpression, Integer> nameToIndex = new HashMap<>();
for (int i = 0; i < handles.size(); i++) {
PinotColumnHandle columnHandle = handles.get(i);
VariableReferenceExpression columnName = new VariableReferenceExpression(columnHandle.getColumnName().toLowerCase(ENGLISH), columnHandle.getDataType());
Integer previous = nameToIndex.put(columnName, i);
Joiner.on(",").withKeyValueSeparator(":").join(expressionsInPinotOrder),
Joiner.on(",").withKeyValueSeparator("=").join(assignments));

Map<VariableReferenceExpression, Integer> assignmentToIndex = new HashMap<>();
Iterator<Map.Entry<VariableReferenceExpression, PinotColumnHandle>> assignmentsIterator = assignments.entrySet().iterator();
for (int i = 0; i < assignments.size(); i++) {
VariableReferenceExpression key = assignmentsIterator.next().getKey();
Integer previous = assignmentToIndex.put(key, i);
if (previous != null) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format("Expected Pinot column handle %s to occur only once, but we have: %s", columnName, Joiner.on(",").join(handles)));
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format("Expected Pinot column handle %s to occur only once, but we have: %s", key, Joiner.on(",").withKeyValueSeparator("=").join(assignments)));
}
}

ImmutableList.Builder<Integer> outputIndices = ImmutableList.builder();
for (Map.Entry<VariableReferenceExpression, Selection> expression : expressionsInPinotOrder.entrySet()) {
Integer index = nameToIndex.get(expression.getKey());
Integer index;
if (hiddenColumnSet.contains(expression.getKey())) {
index = -1; // negative output index means to skip this value returned by pinot at query time
}
else {
index = assignmentToIndex.get(expression.getKey());
}
if (index == null) {
throw new PinotException(
PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query),
format(
"Expected to find a Pinot column handle for the expression %s, but we have %s",
expression,
Joiner.on(",").withKeyValueSeparator(":").join(nameToIndex)));
Joiner.on(",").withKeyValueSeparator(":").join(assignmentToIndex)));
}
outputIndices.add(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testDefaults()
.setCallerHeaderParam("RPC-Caller")
.setMetadataCacheExpiry(new Duration(2, TimeUnit.MINUTES))
.setAllowMultipleAggregations(false)
.setPreferBrokerQueries(true)
.setForbidBrokerQueries(false)
.setRestProxyServiceForQuery(null)
.setRestProxyUrl(null)
.setNumSegmentsPerSplit(1)
Expand Down Expand Up @@ -77,7 +77,7 @@ public void testExplicitPropertyMappings()
.put("pinot.caller-header-param", "myParam")
.put("pinot.service-header-param", "myServiceHeader")
.put("pinot.allow-multiple-aggregations", "true")
.put("pinot.prefer-broker-queries", "false")
.put("pinot.forbid-broker-queries", "true")
.put("pinot.rest-proxy-url", "localhost:1111")
.put("pinot.rest-proxy-service-for-query", "pinot-rest-proxy-service")
.put("pinot.num-segments-per-split", "2")
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testExplicitPropertyMappings()
.setCallerHeaderParam("myParam")
.setMetadataCacheExpiry(new Duration(1, TimeUnit.MINUTES))
.setAllowMultipleAggregations(true)
.setPreferBrokerQueries(false)
.setForbidBrokerQueries(true)
.setRestProxyServiceForQuery("pinot-rest-proxy-service")
.setNumSegmentsPerSplit(2)
.setIgnoreEmptyResponses(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testRealtimeSegmentSplitsOneSegmentPerServer()

private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits, boolean expectFilter)
{
PinotConfig pinotConfig = new PinotConfig().setPreferBrokerQueries(false);
PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false);
SessionHolder sessionHolder = new SessionHolder(pinotConfig);
PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch);
Expand All @@ -70,7 +70,7 @@ private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segment

private void testSegmentSplitsHelperWithFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits)
{
PinotConfig pinotConfig = new PinotConfig().setPreferBrokerQueries(false);
PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false);
SessionHolder sessionHolder = new SessionHolder(pinotConfig);
PlanBuilder planBuilder = createPlanBuilder(sessionHolder);
PlanNode plan = filter(planBuilder, tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch), getRowExpression("city = 'Boston'", sessionHolder));
Expand Down
Loading