Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-matching</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-plugin-toolkit</artifactId>
Expand Down Expand Up @@ -110,6 +115,11 @@
<version>1.15</version>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class PinotBrokerPageSource
Expand All @@ -45,20 +49,24 @@ public class PinotBrokerPageSource
private boolean finished;
private long readTimeNanos;
private long completedBytes;
private final AtomicLong currentRowCount = new AtomicLong();
private final int limitForBrokerQueries;

private Iterator<BrokerResultRow> resultIterator;

public PinotBrokerPageSource(
ConnectorSession session,
PinotQuery query,
List<PinotColumnHandle> columnHandles,
PinotClient pinotClient)
PinotClient pinotClient,
int limitForBrokerQueries)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this limitForBrokerQueries in PinotBrokerPageSource? I feel we should leverage this in the planning phase, if it's large than this number, then we can fall back to segment-level query without broker pushdown?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it's possible to do it in planning as you do not know how many rows will be returned ahead of time.
This is similar to how the segment queries handle rows exceeded, it was extracted from the elastic search connector, suggested by @martint - @martint , @xiangfu0 , lmk if this sounds correct.

{
this.query = requireNonNull(query, "query is null");
this.pinotClient = requireNonNull(pinotClient, "pinotClient is null");
this.session = requireNonNull(session, "session is null");
this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");
this.decoders = createDecoders(columnHandles);
this.limitForBrokerQueries = limitForBrokerQueries;

this.columnBuilders = columnHandles.stream()
.map(PinotColumnHandle::getDataType)
Expand Down Expand Up @@ -113,6 +121,12 @@ public Page getNextPage()
int rowCount = 0;
while (size < PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES && resultIterator.hasNext()) {
rowCount++;
// The limit for broker queries can be exceeded if the broker query was created via pushdown.
// The reason for the limit is that Pinot brokers allocate memory based on the limit size.
// This is a temporary workaround to address https://github.com/apache/pinot/issues/7110
if (currentRowCount.incrementAndGet() > limitForBrokerQueries) {
throw new PinotException(PINOT_EXCEPTION, Optional.of(query.getQuery()), format("Broker query returned '%s' rows, maximum allowed is '%s' rows.", currentRowCount.get(), limitForBrokerQueries));
}
BrokerResultRow row = resultIterator.next();
for (int i = 0; i < decoders.size(); i++) {
int fieldIndex = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ public class PinotColumnHandle
{
private final String columnName;
private final Type dataType;
private final boolean returnNullOnEmptyGroup;

public PinotColumnHandle(String columnName, Type dataType)
{
this(columnName, dataType, true);
}

@JsonCreator
public PinotColumnHandle(
@JsonProperty("columnName") String columnName,
@JsonProperty("dataType") Type dataType)
@JsonProperty("dataType") Type dataType,
@JsonProperty("returnNullOnEmptyGroup") boolean returnNullOnEmptyGroup)
{
this.columnName = requireNonNull(columnName, "columnName is null");
this.dataType = requireNonNull(dataType, "dataType is null");
this.returnNullOnEmptyGroup = returnNullOnEmptyGroup;
}

@JsonProperty
Expand All @@ -56,6 +64,14 @@ public ColumnMetadata getColumnMetadata()
return new ColumnMetadata(getColumnName(), getDataType());
}

// Some aggregations should return null on empty group, ex. min/max
// If false then return the value from Pinot, ex. count(*)
@JsonProperty
public boolean isReturnNullOnEmptyGroup()
{
return returnNullOnEmptyGroup;
}

@Override
public boolean equals(Object o)
{
Expand Down Expand Up @@ -83,6 +99,7 @@ public String toString()
return toStringHelper(this)
.add("columnName", columnName)
.add("dataType", dataType)
.add("returnNullOnEmptyGroup", returnNullOnEmptyGroup)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.annotation.PostConstruct;
import javax.validation.constraints.NotNull;

import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;

public class PinotConfig
Expand All @@ -52,6 +55,9 @@ public class PinotConfig
private int fetchRetryCount = 2;
private int nonAggregateLimitForBrokerQueries = 25_000;
private int maxRowsPerSplitForSegmentQueries = 50_000;
private int maxRowsForBrokerQueries = 50_000;
private boolean aggregationPushdownEnabled = true;
private boolean countDistinctPushdownEnabled = true;

@NotNull
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -269,4 +275,49 @@ private static URI stringToUri(String server)
}
return URI.create("http://" + server);
}

public int getMaxRowsForBrokerQueries()
{
return maxRowsForBrokerQueries;
}

@Config("pinot.max-rows-for-broker-queries")
public PinotConfig setMaxRowsForBrokerQueries(int maxRowsForBrokerQueries)
{
this.maxRowsForBrokerQueries = maxRowsForBrokerQueries;
return this;
}

public boolean isAggregationPushdownEnabled()
{
return aggregationPushdownEnabled;
}

@Config("pinot.aggregation-pushdown.enabled")
public PinotConfig setAggregationPushdownEnabled(boolean aggregationPushdownEnabled)
{
this.aggregationPushdownEnabled = aggregationPushdownEnabled;
return this;
}

public boolean isCountDistinctPushdownEnabled()
{
return countDistinctPushdownEnabled;
}

@Config("pinot.count-distinct-pushdown.enabled")
@ConfigDescription("Controls whether distinct count is pushed down to Pinot. Distinct count pushdown can cause Pinot to do a full scan. Aggregation pushdown must also be enabled in addition to this parameter otherwise no pushdowns will be enabled.")
public PinotConfig setCountDistinctPushdownEnabled(boolean countDistinctPushdownEnabled)
{
this.countDistinctPushdownEnabled = countDistinctPushdownEnabled;
return this;
}

@PostConstruct
public void validate()
{
checkState(
!countDistinctPushdownEnabled || aggregationPushdownEnabled,
"Invalid configuration: pinot.aggregation-pushdown.enabled must be enabled if pinot.count-distinct-pushdown.enabled");
}
}
Loading