Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ Property name Required Description
Pinot returns hostnames and not IP addresses.
``pinot.connection-timeout`` No Pinot connection timeout, default is ``15s``.
``pinot.metadata-expiry`` No Pinot metadata expiration time, default is ``2m``.
``pinot.request-timeout`` No The timeout for Pinot requests. Increasing this can reduce timeouts if DNS
resolution is slow.
``pinot.controller.authentication.type`` No Pinot authentication method for controller requests. Allowed values are
``NONE`` and ``PASSWORD`` - defaults to ``NONE`` which is no authentication.
``pinot.controller.authentication.user`` No Controller username for basic authentication method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
// can be pushed down: there are currently no subqueries in pinot.
// If there is an offset then do not push the aggregation down as the results will not be correct
if (tableHandle.getQuery().isPresent() &&
(!tableHandle.getQuery().get().getAggregateColumns().isEmpty() ||
(!isAggregationPushdownSupported(session, tableHandle.getQuery(), aggregates, assignments) ||
!tableHandle.getQuery().get().getAggregateColumns().isEmpty() ||
tableHandle.getQuery().get().isAggregateInProjections() ||
tableHandle.getQuery().get().getOffset().isPresent())) {
return Optional.empty();
Expand All @@ -368,10 +369,12 @@ public Optional<AggregationApplicationResult<ConnectorTableHandle>> applyAggrega
projections.add(new Variable(pinotColumnHandle.getColumnName(), pinotColumnHandle.getDataType()));
resultAssignments.add(new Assignment(pinotColumnHandle.getColumnName(), pinotColumnHandle, pinotColumnHandle.getDataType()));
}

List<PinotColumnHandle> groupingColumns = getOnlyElement(groupingSets).stream()
.map(PinotColumnHandle.class::cast)
.map(PinotMetadata::toNonAggregateColumnHandle)
.collect(toImmutableList());

OptionalLong limitForDynamicTable = OptionalLong.empty();
// Ensure that pinot default limit of 10 rows is not used
// By setting the limit to maxRowsPerBrokerQuery + 1 the connector will
Expand Down Expand Up @@ -421,28 +424,28 @@ public static PinotColumnHandle toNonAggregateColumnHandle(PinotColumnHandle col
return new PinotColumnHandle(columnHandle.getColumnName(), columnHandle.getDataType(), quoteIdentifier(columnHandle.getColumnName()), false, false, true, Optional.empty(), Optional.empty());
}

private Optional<AggregateExpression> applyCountDistinct(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments, PinotTableHandle tableHandle, Optional<AggregateExpression> rewriteResult)
private boolean isAggregationPushdownSupported(ConnectorSession session, Optional<DynamicTable> dynamicTable, List<AggregateFunction> aggregates, Map<String, ColumnHandle> assignments)
{
AggregateFunctionRule.RewriteContext<Void> context = new AggregateFunctionRule.RewriteContext<>()
{
@Override
public Map<String, ColumnHandle> getAssignments()
{
return assignments;
}
if (dynamicTable.isEmpty()) {
return true;
}
List<PinotColumnHandle> groupingColumns = dynamicTable.get().getGroupingColumns();
if (groupingColumns.isEmpty()) {
return true;
}
// Either second pass of applyAggregation or dynamic table exists
if (aggregates.size() != 1) {
return false;
}
AggregateFunction aggregate = getOnlyElement(aggregates);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can aggregates be empty at this point?

Copy link
Copy Markdown
Member Author

@elonazoulay elonazoulay Mar 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now aggregates would only contain 1 aggregate.

AggregateFunctionRule.RewriteContext<Void> context = new CountDistinctContext(assignments, session);

@Override
public ConnectorSession getSession()
{
return session;
}
return implementCountDistinct.getPattern().matches(aggregate, context);
}

@Override
public Optional<Void> rewriteExpression(ConnectorExpression expression)
{
throw new UnsupportedOperationException();
}
};
private Optional<AggregateExpression> applyCountDistinct(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments, PinotTableHandle tableHandle, Optional<AggregateExpression> rewriteResult)
{
AggregateFunctionRule.RewriteContext<Void> context = new CountDistinctContext(assignments, session);

if (implementCountDistinct.getPattern().matches(aggregate, context)) {
Variable argument = (Variable) getOnlyElement(aggregate.getArguments());
Expand Down Expand Up @@ -534,4 +537,35 @@ private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePr
}
return ImmutableList.of(new SchemaTableName(prefix.getSchema().get(), prefix.getTable().get()));
}

private static class CountDistinctContext
Comment thread
elonazoulay marked this conversation as resolved.
Outdated
implements AggregateFunctionRule.RewriteContext<Void>
{
private final Map<String, ColumnHandle> assignments;
private final ConnectorSession session;

CountDistinctContext(Map<String, ColumnHandle> assignments, ConnectorSession session)
{
this.assignments = requireNonNull(assignments, "assignments is null");
this.session = requireNonNull(session, "session is null");
}

@Override
public Map<String, ColumnHandle> getAssignments()
{
return assignments;
}

@Override
public ConnectorSession getSession()
{
return session;
}

@Override
public Optional<Void> rewriteExpression(ConnectorExpression expression)
{
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,91 @@ public void testAggregationPushdown()
.isThrownBy(() -> query("SELECT bool_col, COUNT(long_col) FROM \"SELECT bool_col, long_col FROM " + ALL_TYPES_TABLE + " GROUP BY bool_col, long_col\""))
.withRootCauseInstanceOf(RuntimeException.class)
.withMessage("Operation not supported for DISTINCT aggregation function");

// Verify that count(<column name>) is pushed down only when it matches a COUNT(DISTINCT <column name>) query
assertThat(query("""
SELECT COUNT(bool_col) FROM
(SELECT bool_col FROM alltypes GROUP BY bool_col)
"""))
.matches("VALUES (BIGINT '2')")
.isFullyPushedDown();
assertThat(query("""
SELECT bool_col, COUNT(long_col) FROM
(SELECT bool_col, long_col FROM alltypes GROUP BY bool_col, long_col)
GROUP BY bool_col
"""))
.matches("""
VALUES (FALSE, BIGINT '1'),
(TRUE, BIGINT '9')
""")
.isFullyPushedDown();
// Verify that count(1) is not pushed down when the subquery selects distinct values for a single column
assertThat(query("""
SELECT COUNT(1) FROM
(SELECT bool_col FROM alltypes GROUP BY bool_col)
"""))
.matches("VALUES (BIGINT '2')")
.isNotFullyPushedDown(AggregationNode.class);
// Verify that count(*) is not pushed down when the subquery selects distinct values for a single column
assertThat(query("""
SELECT COUNT(*) FROM
(SELECT bool_col FROM alltypes GROUP BY bool_col)
"""))
.matches("VALUES (BIGINT '2')")
.isNotFullyPushedDown(AggregationNode.class);
// Verify that other aggregation types are not pushed down when the subquery selects distinct values for a single column
assertThat(query("""
SELECT SUM(long_col) FROM
(SELECT long_col FROM alltypes GROUP BY long_col)
"""))
.matches("VALUES (BIGINT '-28327352787')")
.isNotFullyPushedDown(AggregationNode.class);
assertThat(query("""
SELECT bool_col, SUM(long_col) FROM
(SELECT bool_col, long_col FROM alltypes GROUP BY bool_col, long_col)
GROUP BY bool_col
"""))
.matches("VALUES (TRUE, BIGINT '-28327352787'), (FALSE, BIGINT '0')")
.isNotFullyPushedDown(ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class);
assertThat(query("""
SELECT AVG(long_col) FROM
(SELECT long_col FROM alltypes GROUP BY long_col)
"""))
.matches("VALUES (DOUBLE '-2.8327352787E9')")
.isNotFullyPushedDown(AggregationNode.class);
assertThat(query("""
SELECT bool_col, AVG(long_col) FROM
(SELECT bool_col, long_col FROM alltypes GROUP BY bool_col, long_col)
GROUP BY bool_col
"""))
.matches("VALUES (TRUE, DOUBLE '-3.147483643E9'), (FALSE, DOUBLE '0.0')")
.isNotFullyPushedDown(ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class);
assertThat(query("""
SELECT MIN(long_col) FROM
(SELECT long_col FROM alltypes GROUP BY long_col)
"""))
.matches("VALUES (BIGINT '-3147483647')")
.isNotFullyPushedDown(AggregationNode.class);
assertThat(query("""
SELECT bool_col, MIN(long_col) FROM
(SELECT bool_col, long_col FROM alltypes GROUP BY bool_col, long_col)
GROUP BY bool_col
"""))
.matches("VALUES (TRUE, BIGINT '-3147483647'), (FALSE, BIGINT '0')")
.isNotFullyPushedDown(ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class);
assertThat(query("""
SELECT MAX(long_col) FROM
(SELECT long_col FROM alltypes GROUP BY long_col)
"""))
.matches("VALUES (BIGINT '0')")
.isNotFullyPushedDown(AggregationNode.class);
assertThat(query("""
SELECT bool_col, MAX(long_col) FROM
(SELECT bool_col, long_col FROM alltypes GROUP BY bool_col, long_col)
GROUP BY bool_col
"""))
.matches("VALUES (TRUE, BIGINT '-3147483639'), (FALSE, BIGINT '0')")
.isNotFullyPushedDown(ProjectNode.class, AggregationNode.class, ExchangeNode.class, ExchangeNode.class, AggregationNode.class, ProjectNode.class);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public static void main(String[] args)
Map<String, String> pinotProperties = ImmutableMap.<String, String>builder()
.put("pinot.controller-urls", pinot.getControllerConnectString())
.put("pinot.segments-per-split", "10")
.put("pinot.request-timeout", "3m")
Comment thread
elonazoulay marked this conversation as resolved.
.buildOrThrow();
DistributedQueryRunner queryRunner = createPinotQueryRunner(properties, pinotProperties, Optional.empty());
Thread.sleep(10);
Expand Down