diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotClusterInfoFetcher.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotClusterInfoFetcher.java index 8189029dd5997..a533212f73f60 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotClusterInfoFetcher.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotClusterInfoFetcher.java @@ -124,8 +124,10 @@ public String doHttpActionWithHeaders( Optional rpcService) { requestBuilder = requestBuilder - .setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON) .setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON); + if (requestBody.isPresent()) { + requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON); + } if (rpcService.isPresent()) { requestBuilder .setHeader(pinotConfig.getCallerHeaderParam(), pinotConfig.getCallerHeaderValue()) diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotConfig.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotConfig.java index 77697ee59ede8..a11b1080858c7 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotConfig.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotConfig.java @@ -74,7 +74,7 @@ public class PinotConfig private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES); private boolean allowMultipleAggregations; - private boolean preferBrokerQueries = true; + private boolean forbidBrokerQueries; private boolean forbidSegmentQueries; private int numSegmentsPerSplit = 1; private boolean ignoreEmptyResponses; @@ -319,15 +319,15 @@ public PinotConfig setCallerHeaderParam(String callerHeaderParam) return this; } - public boolean isPreferBrokerQueries() + public boolean isForbidBrokerQueries() { - return preferBrokerQueries; + return forbidBrokerQueries; } - @Config("pinot.prefer-broker-queries") - public PinotConfig setPreferBrokerQueries(boolean preferBrokerQueries) + @Config("pinot.forbid-broker-queries") + public PinotConfig setForbidBrokerQueries(boolean forbidBrokerQueries) { - this.preferBrokerQueries = preferBrokerQueries; + this.forbidBrokerQueries = forbidBrokerQueries; return this; } diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java index d6bfaff0d49ce..ea7ed2bc16251 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java @@ -33,7 +33,7 @@ public class PinotSessionProperties { private static final String CONNECTION_TIMEOUT = "connection_timeout"; - private static final String PREFER_BROKER_QUERIES = "prefer_broker_queries"; + private static final String FORBID_BROKER_QUERIES = "forbid_broker_queries"; private static final String IGNORE_EMPTY_RESPONSES = "ignore_empty_responses"; private static final String RETRY_COUNT = "retry_count"; private static final String USE_DATE_TRUNC = "use_date_trunc"; @@ -53,9 +53,9 @@ public static int getNumSegmentsPerSplit(ConnectorSession session) return segmentsPerSplit <= 0 ? Integer.MAX_VALUE : segmentsPerSplit; } - public static boolean isPreferBrokerQueries(ConnectorSession session) + public static boolean isForbidBrokerQueries(ConnectorSession session) { - return session.getProperty(PREFER_BROKER_QUERIES, Boolean.class); + return session.getProperty(FORBID_BROKER_QUERIES, Boolean.class); } public static boolean isForbidSegmentQueries(ConnectorSession session) @@ -93,9 +93,9 @@ public PinotSessionProperties(PinotConfig pinotConfig) { sessionProperties = ImmutableList.of( booleanProperty( - PREFER_BROKER_QUERIES, - "Prefer queries to broker even when parallel scan is enabled for aggregation queries", - pinotConfig.isPreferBrokerQueries(), + FORBID_BROKER_QUERIES, + "Forbid queries to the broker", + pinotConfig.isForbidBrokerQueries(), false), booleanProperty( FORBID_SEGMENT_QUERIES, diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java index c98e409c323e2..fceb28bb554b2 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java @@ -130,10 +130,11 @@ public PinotQueryGeneratorContext getContext() public Optional generate(PlanNode plan, ConnectorSession session) { try { - boolean preferBrokerQueries = PinotSessionProperties.isPreferBrokerQueries(session); - PinotQueryGeneratorContext context = requireNonNull(plan.accept(new PinotQueryPlanVisitor(session, preferBrokerQueries), new PinotQueryGeneratorContext()), "Resulting context is null"); - boolean isQueryShort = context.isQueryShort(PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session)); - return Optional.of(new PinotQueryGeneratorResult(context.toQuery(pinotConfig, preferBrokerQueries, isQueryShort), context)); + PinotQueryGeneratorContext context = requireNonNull(plan.accept( + new PinotQueryPlanVisitor(session), + new PinotQueryGeneratorContext()), + "Resulting context is null"); + return Optional.of(new PinotQueryGeneratorResult(context.toQuery(pinotConfig, session), context)); } catch (PinotException e) { log.debug(e, "Possibly benign error when pushing plan into scan node %s", plan); @@ -221,12 +222,12 @@ class PinotQueryPlanVisitor extends PlanVisitor { private final ConnectorSession session; - private final boolean preferBrokerQueries; + private final boolean forbidBrokerQueries; - protected PinotQueryPlanVisitor(ConnectorSession session, boolean preferBrokerQueries) + protected PinotQueryPlanVisitor(ConnectorSession session) { this.session = session; - this.preferBrokerQueries = preferBrokerQueries; + this.forbidBrokerQueries = PinotSessionProperties.isForbidBrokerQueries(session); } @Override @@ -400,7 +401,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu PinotQueryGeneratorContext context = node.getSource().accept(this, contextIn.withVariablesInAggregation(variablesInAggregation)); requireNonNull(context, "context is null"); checkSupported(!node.getStep().isOutputPartial(), "partial aggregations are not supported in Pinot pushdown framework"); - checkSupported(preferBrokerQueries, "Cannot push aggregation in segment mode"); + checkSupported(!forbidBrokerQueries, "Cannot push aggregation in segment mode"); // 2nd pass LinkedHashMap newSelections = new LinkedHashMap<>(); @@ -448,7 +449,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu public PinotQueryGeneratorContext visitLimit(LimitNode node, PinotQueryGeneratorContext context) { checkSupported(!node.isPartial(), String.format("pinot query generator cannot handle partial limit")); - checkSupported(preferBrokerQueries, "Cannot push limit in segment mode"); + checkSupported(!forbidBrokerQueries, "Cannot push limit in segment mode"); context = node.getSource().accept(this, context); requireNonNull(context, "context is null"); return context.withLimit(node.getCount()).withOutputColumns(node.getOutputVariables()); @@ -459,7 +460,7 @@ public PinotQueryGeneratorContext visitTopN(TopNNode node, PinotQueryGeneratorCo { context = node.getSource().accept(this, context); requireNonNull(context, "context is null"); - checkSupported(preferBrokerQueries, "Cannot push topn in segment mode"); + checkSupported(!forbidBrokerQueries, "Cannot push topn in segment mode"); checkSupported(node.getStep().equals(TopNNode.Step.SINGLE), "Can only push single logical topn in"); return context.withTopN(getOrderingScheme(node), node.getCount()).withOutputColumns(node.getOutputVariables()); } diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java index e7388f3a3aea8..7a6d24684ff70 100644 --- a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java +++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java @@ -16,6 +16,8 @@ import com.facebook.presto.pinot.PinotColumnHandle; import com.facebook.presto.pinot.PinotConfig; import com.facebook.presto.pinot.PinotException; +import com.facebook.presto.pinot.PinotSessionProperties; +import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.block.SortOrder; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.google.common.base.Joiner; @@ -23,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -38,7 +41,6 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static java.lang.StrictMath.toIntExact; import static java.lang.String.format; -import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; /** @@ -60,18 +62,15 @@ public class PinotQueryGeneratorContext private final OptionalInt limit; private final int aggregations; - public boolean isQueryShort(int nonAggregateRowLimit) - { - return hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateRowLimit; - } - @Override public String toString() { return toStringHelper(this) .add("selections", selections) .add("groupByColumns", groupByColumns) + .add("topNColumnOrderingMap", topNColumnOrderingMap) .add("hiddenColumnSet", hiddenColumnSet) + .add("variablesInAggregation", variablesInAggregation) .add("from", from) .add("filter", filter) .add("limit", limit) @@ -261,9 +260,11 @@ Set getVariablesInAggregation() /** * Convert the current context to a PQL */ - public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, boolean preferBrokerQueries, boolean isQueryShort) + public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, ConnectorSession session) { - boolean forBroker = preferBrokerQueries && isQueryShort; + int nonAggregateShortQueryLimit = PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session); + boolean isQueryShort = hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit; + boolean forBroker = !PinotSessionProperties.isForbidBrokerQueries(session) && isQueryShort; if (!pinotConfig.isAllowMultipleAggregations() && aggregations > 1 && !groupByColumns.isEmpty()) { throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by is forbidden"); } @@ -276,6 +277,9 @@ public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, boolean .filter(s -> !groupByColumns.contains(s.getKey())) // remove the group by columns from the query as Pinot barfs if the group by column is an expression .map(s -> s.getValue().getDefinition()) .collect(Collectors.joining(", ")); + if (expressions.isEmpty()) { + throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Empty PQL expressions: " + toString()); + } String tableName = from.orElseThrow(() -> new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Table name not encountered yet")); String query = "SELECT " + expressions + " FROM " + tableName + (forBroker ? "" : TABLE_NAME_SUFFIX_TEMPLATE); @@ -338,11 +342,11 @@ else if (!groupByColumns.isEmpty()) { query += " " + limitKeyWord + " " + queryLimit; } - List columnHandles = ImmutableList.copyOf(getAssignments().values()); - return new PinotQueryGenerator.GeneratedPql(tableName, query, getIndicesMappingFromPinotSchemaToPrestoSchema(query, columnHandles), groupByColumns.size(), filter.isPresent(), isQueryShort); + List indices = getIndicesMappingFromPinotSchemaToPrestoSchema(query, getAssignments()); + return new PinotQueryGenerator.GeneratedPql(tableName, query, indices, groupByColumns.size(), filter.isPresent(), isQueryShort); } - private List getIndicesMappingFromPinotSchemaToPrestoSchema(String query, List handles) + private List getIndicesMappingFromPinotSchemaToPrestoSchema(String query, Map assignments) { LinkedHashMap expressionsInPinotOrder = new LinkedHashMap<>(); for (VariableReferenceExpression groupByColumn : groupByColumns) { @@ -358,33 +362,37 @@ private List getIndicesMappingFromPinotSchemaToPrestoSchema(String quer expressionsInPinotOrder.putAll(selections); checkSupported( - handles.size() == expressionsInPinotOrder.keySet().stream().filter(key -> !hiddenColumnSet.contains(key)).count(), + assignments.size() == expressionsInPinotOrder.keySet().stream().filter(key -> !hiddenColumnSet.contains(key)).count(), "Expected returned expressions %s to match selections %s", - Joiner.on(",").withKeyValueSeparator(":").join(expressionsInPinotOrder), Joiner.on(",").join(handles)); - - Map nameToIndex = new HashMap<>(); - for (int i = 0; i < handles.size(); i++) { - PinotColumnHandle columnHandle = handles.get(i); - VariableReferenceExpression columnName = new VariableReferenceExpression(columnHandle.getColumnName().toLowerCase(ENGLISH), columnHandle.getDataType()); - Integer previous = nameToIndex.put(columnName, i); + Joiner.on(",").withKeyValueSeparator(":").join(expressionsInPinotOrder), + Joiner.on(",").withKeyValueSeparator("=").join(assignments)); + + Map assignmentToIndex = new HashMap<>(); + Iterator> assignmentsIterator = assignments.entrySet().iterator(); + for (int i = 0; i < assignments.size(); i++) { + VariableReferenceExpression key = assignmentsIterator.next().getKey(); + Integer previous = assignmentToIndex.put(key, i); if (previous != null) { - throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format("Expected Pinot column handle %s to occur only once, but we have: %s", columnName, Joiner.on(",").join(handles))); + throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format("Expected Pinot column handle %s to occur only once, but we have: %s", key, Joiner.on(",").withKeyValueSeparator("=").join(assignments))); } } ImmutableList.Builder outputIndices = ImmutableList.builder(); for (Map.Entry expression : expressionsInPinotOrder.entrySet()) { - Integer index = nameToIndex.get(expression.getKey()); + Integer index; if (hiddenColumnSet.contains(expression.getKey())) { index = -1; // negative output index means to skip this value returned by pinot at query time } + else { + index = assignmentToIndex.get(expression.getKey()); + } if (index == null) { throw new PinotException( PINOT_UNSUPPORTED_EXPRESSION, Optional.of(query), format( "Expected to find a Pinot column handle for the expression %s, but we have %s", expression, - Joiner.on(",").withKeyValueSeparator(":").join(nameToIndex))); + Joiner.on(",").withKeyValueSeparator(":").join(assignmentToIndex))); } outputIndices.add(index); } diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java index fe3bae05965c7..371f5d2a553a0 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java @@ -45,7 +45,7 @@ public void testDefaults() .setCallerHeaderParam("RPC-Caller") .setMetadataCacheExpiry(new Duration(2, TimeUnit.MINUTES)) .setAllowMultipleAggregations(false) - .setPreferBrokerQueries(true) + .setForbidBrokerQueries(false) .setRestProxyServiceForQuery(null) .setRestProxyUrl(null) .setNumSegmentsPerSplit(1) @@ -77,7 +77,7 @@ public void testExplicitPropertyMappings() .put("pinot.caller-header-param", "myParam") .put("pinot.service-header-param", "myServiceHeader") .put("pinot.allow-multiple-aggregations", "true") - .put("pinot.prefer-broker-queries", "false") + .put("pinot.forbid-broker-queries", "true") .put("pinot.rest-proxy-url", "localhost:1111") .put("pinot.rest-proxy-service-for-query", "pinot-rest-proxy-service") .put("pinot.num-segments-per-split", "2") @@ -108,7 +108,7 @@ public void testExplicitPropertyMappings() .setCallerHeaderParam("myParam") .setMetadataCacheExpiry(new Duration(1, TimeUnit.MINUTES)) .setAllowMultipleAggregations(true) - .setPreferBrokerQueries(false) + .setForbidBrokerQueries(true) .setRestProxyServiceForQuery("pinot-rest-proxy-service") .setNumSegmentsPerSplit(2) .setIgnoreEmptyResponses(true) diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSplitManager.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSplitManager.java index f52aac2aff924..80751d9f61977 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSplitManager.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/TestPinotSplitManager.java @@ -57,7 +57,7 @@ public void testRealtimeSegmentSplitsOneSegmentPerServer() private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits, boolean expectFilter) { - PinotConfig pinotConfig = new PinotConfig().setPreferBrokerQueries(false); + PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false); SessionHolder sessionHolder = new SessionHolder(pinotConfig); PlanBuilder planBuilder = createPlanBuilder(sessionHolder); PlanNode plan = tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch); @@ -70,7 +70,7 @@ private void testSegmentSplitsHelperNoFilter(PinotTableHandle table, int segment private void testSegmentSplitsHelperWithFilter(PinotTableHandle table, int segmentsPerSplit, int expectedNumSplits) { - PinotConfig pinotConfig = new PinotConfig().setPreferBrokerQueries(false); + PinotConfig pinotConfig = new PinotConfig().setForbidBrokerQueries(false); SessionHolder sessionHolder = new SessionHolder(pinotConfig); PlanBuilder planBuilder = createPlanBuilder(sessionHolder); PlanNode plan = filter(planBuilder, tableScan(planBuilder, table, regionId, city, fare, secondsSinceEpoch), getRowExpression("city = 'Boston'", sessionHolder)); diff --git a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/query/TestPinotConnectorPlanOptimizer.java b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/query/TestPinotConnectorPlanOptimizer.java index 70ef43fb39bb8..efb7b30b2bda8 100644 --- a/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/query/TestPinotConnectorPlanOptimizer.java +++ b/presto-pinot-toolkit/src/test/java/com/facebook/presto/pinot/query/TestPinotConnectorPlanOptimizer.java @@ -191,7 +191,7 @@ public void testLimitPushdownWithStarSelection() { PlanBuilder pb = createPlanBuilder(defaultSessionHolder); PlanNode originalPlan = limit(pb, 50L, tableScan(pb, pinotTable, regionId, city, fare, secondsSinceEpoch)); - PlanNode optimized = getOptimizedPlan(pb, originalPlan, true); + PlanNode optimized = getOptimizedPlan(pb, originalPlan); assertPlanMatch(optimized, PinotTableScanMatcher.match(pinotTable, Optional.of("SELECT regionId, city, fare, secondsSinceEpoch FROM hybrid LIMIT 50"), Optional.of(false), originalPlan.getOutputVariables()), typeProvider); } @@ -202,7 +202,7 @@ public void testPartialPredicatePushdown() TableScanNode tableScanNode = tableScan(pb, pinotTable, regionId, city, fare, secondsSinceEpoch); FilterNode filter = filter(pb, tableScanNode, getRowExpression("lower(substr(city, 0, 3)) = 'del' AND fare > 100", defaultSessionHolder)); PlanNode originalPlan = limit(pb, 50L, filter); - PlanNode optimized = getOptimizedPlan(pb, originalPlan, true); + PlanNode optimized = getOptimizedPlan(pb, originalPlan); PlanMatchPattern tableScanMatcher = PinotTableScanMatcher.match(pinotTable, Optional.of("SELECT regionId, city, fare, secondsSinceEpoch FROM hybrid__TABLE_NAME_SUFFIX_TEMPLATE__ WHERE \\(fare > 100\\).*"), Optional.of(true), filter.getOutputVariables()); assertPlanMatch(optimized, PlanMatchPattern.limit(50L, PlanMatchPattern.filter("lower(substr(city, 0, 3)) = 'del'", tableScanMatcher)), typeProvider); } @@ -217,15 +217,15 @@ public void testUnsupportedPredicatePushdown() PlanNode limit = limit(planBuilder, 50L, tableScan(planBuilder, pinotTable, regionId, city, fare, secondsSinceEpoch)); PlanNode originalPlan = planBuilder.aggregation(builder -> builder.source(limit).globalGrouping().addAggregation(new VariableReferenceExpression("count", BIGINT), getRowExpression("count(*)", defaultSessionHolder))); - PlanNode optimized = getOptimizedPlan(planBuilder, originalPlan, true); + PlanNode optimized = getOptimizedPlan(planBuilder, originalPlan); PlanMatchPattern tableScanMatcher = PinotTableScanMatcher.match(pinotTable, Optional.of("SELECT regionId, city, fare, secondsSinceEpoch FROM hybrid LIMIT 50"), Optional.of(false), originalPlan.getOutputVariables()); assertPlanMatch(optimized, aggregation(aggregationsSecond, tableScanMatcher), typeProvider); } - private PlanNode getOptimizedPlan(PlanBuilder planBuilder, PlanNode originalPlan, boolean scanParallelism) + private PlanNode getOptimizedPlan(PlanBuilder planBuilder, PlanNode originalPlan) { - PinotConfig pinotConfig = new PinotConfig().setPreferBrokerQueries(scanParallelism); + PinotConfig pinotConfig = new PinotConfig(); PinotQueryGenerator pinotQueryGenerator = new PinotQueryGenerator(pinotConfig, typeManager, functionMetadataManager, standardFunctionResolution); PinotConnectorPlanOptimizer optimizer = new PinotConnectorPlanOptimizer(pinotQueryGenerator, typeManager, functionMetadataManager, logicalRowExpressions, standardFunctionResolution); return optimizer.optimize(originalPlan, defaultSessionHolder.getConnectorSession(), new PlanVariableAllocator(), planBuilder.getIdAllocator());