queryPinot(ConnectorSession session, PinotSplit split)
+ {
+ String pql = split.getSegmentPql().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the pql"));
+ String host = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host"));
+ try {
+ return ImmutableMap.copyOf(
+ pinotQueryClient.queryPinotServerForDataTable(
+ pql,
+ host,
+ split.getSegments(),
+ PinotSessionProperties.getConnectionTimeout(session).toMillis(),
+ PinotSessionProperties.isIgnoreEmptyResponses(session),
+ PinotSessionProperties.getPinotRetryCount(session)));
+ }
+ catch (PinotScatterGatherQueryClient.PinotException pe) {
+ throw new PinotException(PINOT_ERROR_CODE_MAP.getOrDefault(pe.getErrorCode(), PinotErrorCode.PINOT_UNCLASSIFIED_ERROR), Optional.of(pql), String.format("Error when hitting host %s", host), pe);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ /**
+ * Generates the {@link com.facebook.presto.spi.block.Block} for the specific column from the {@link #currentDataTable}.
+ *
+ * Based on the original Pinot column types, write as Presto-supported values to {@link com.facebook.presto.spi.block.BlockBuilder}, e.g.
+ * FLOAT -> Double, INT -> Long, String -> Slice.
+ *
+ * @param blockBuilder blockBuilder for the current column
+ * @param columnType type of the column
+ * @param columnIdx column index
+ */
+
+ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ Class> javaType = columnType.getJavaType();
+ DataSchema.ColumnDataType pinotColumnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIdx);
+ if (javaType.equals(boolean.class)) {
+ writeBooleanBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(long.class)) {
+ writeLongBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(double.class)) {
+ writeDoubleBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(Slice.class)) {
+ writeSliceBlock(blockBuilder, columnType, columnIdx);
+ }
+ else {
+ throw new PrestoException(
+ PINOT_UNSUPPORTED_COLUMN_TYPE,
+ String.format(
+ "Failed to write column %s. pinotColumnType %s, javaType %s",
+ columnHandles.get(columnIdx).getColumnName(), pinotColumnType, javaType));
+ }
+ }
+
+ private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
+ {
+ for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
+ columnType.writeBoolean(blockBuilder, getBoolean(i, columnIndex));
+ completedBytes++;
+ }
+ }
+
+ private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
+ {
+ for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
+ columnType.writeLong(blockBuilder, getLong(i, columnIndex));
+ completedBytes += Long.BYTES;
+ }
+ }
+
+ private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
+ {
+ for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
+ columnType.writeDouble(blockBuilder, getDouble(i, columnIndex));
+ completedBytes += Double.BYTES;
+ }
+ }
+
+ private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int columnIndex)
+ {
+ for (int i = 0; i < currentDataTable.getDataTable().getNumberOfRows(); i++) {
+ Slice slice = getSlice(i, columnIndex);
+ columnType.writeSlice(blockBuilder, slice, 0, slice.length());
+ completedBytes += slice.getBytes().length;
+ }
+ }
+
+ Type getType(int columnIndex)
+ {
+ checkArgument(columnIndex < columnHandles.size(), "Invalid field index");
+ return columnHandles.get(columnIndex).getDataType();
+ }
+
+ boolean getBoolean(int rowIdx, int columnIndex)
+ {
+ return Boolean.getBoolean(currentDataTable.getDataTable().getString(rowIdx, columnIndex));
+ }
+
+ long getLong(int rowIndex, int columnIndex)
+ {
+ DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ // Note columnType in the dataTable could be different from the original columnType in the columnHandle.
+ // e.g. when original column type is int/long and aggregation value is requested, the returned dataType from Pinot would be double.
+ // So need to cast it back to the original columnType.
+ if (dataType.equals(DataType.DOUBLE)) {
+ return (long) currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
+ }
+ if (dataType.equals(DataType.INT)) {
+ return (long) currentDataTable.getDataTable().getInt(rowIndex, columnIndex);
+ }
+ else {
+ return currentDataTable.getDataTable().getLong(rowIndex, columnIndex);
+ }
+ }
+
+ double getDouble(int rowIndex, int columnIndex)
+ {
+ DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ if (dataType.equals(DataType.FLOAT)) {
+ return currentDataTable.getDataTable().getFloat(rowIndex, columnIndex);
+ }
+ else {
+ return currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
+ }
+ }
+
+ Slice getSlice(int rowIndex, int columnIndex)
+ {
+ checkColumnType(columnIndex, VARCHAR);
+ DataSchema.ColumnDataType columnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ switch (columnType) {
+ case INT_ARRAY:
+ int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(intArray));
+ case LONG_ARRAY:
+ long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(longArray));
+ case FLOAT_ARRAY:
+ float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(floatArray));
+ case DOUBLE_ARRAY:
+ double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(doubleArray));
+ case STRING_ARRAY:
+ String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(stringArray));
+ case STRING:
+ String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
+ if (field == null || field.isEmpty()) {
+ return Slices.EMPTY_SLICE;
+ }
+ return Slices.utf8Slice(field);
+ }
+ return Slices.EMPTY_SLICE;
+ }
+
+ /**
+ * Get estimated size in bytes for the Pinot column.
+ * Deterministic for numeric fields; use estimate for other types to save calculation.
+ *
+ * @param dataType FieldSpec.dataType for Pinot column.
+ * @return estimated size in bytes.
+ */
+ private int getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType)
+ {
+ if (dataType.isNumber()) {
+ switch (dataType) {
+ case LONG:
+ return Long.BYTES;
+ case FLOAT:
+ return Float.BYTES;
+ case DOUBLE:
+ return Double.BYTES;
+ case INT:
+ default:
+ return Integer.BYTES;
+ }
+ }
+ return pinotConfig.getEstimatedSizeInBytesForNonNumericColumn();
+ }
+
+ void checkColumnType(int columnIndex, Type expected)
+ {
+ Type actual = getType(columnIndex);
+ checkArgument(actual.equals(expected), "Expected column %s to be type %s but is %s", columnIndex, expected, actual);
+ }
+
+ Type getTypeForBlock(PinotColumnHandle pinotColumnHandle)
+ {
+ if (pinotColumnHandle.getDataType().equals(INTEGER)) {
+ return BIGINT;
+ }
+ return pinotColumnHandle.getDataType();
+ }
+
+ @Override
+ public long getCompletedPositions()
+ {
+ return 0;
+ }
+
+ private static class PinotDataTableWithSize
+ {
+ DataTable dataTable;
+ int estimatedSizeInBytes;
+
+ PinotDataTableWithSize(DataTable dataTable, int estimatedSizeInBytes)
+ {
+ this.dataTable = dataTable;
+ this.estimatedSizeInBytes = estimatedSizeInBytes;
+ }
+
+ DataTable getDataTable()
+ {
+ return dataTable;
+ }
+
+ int getEstimatedSizeInBytes()
+ {
+ return estimatedSizeInBytes;
+ }
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java
new file mode 100644
index 0000000000000..d6bfaff0d49ce
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSessionProperties.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.session.PropertyMetadata;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import io.airlift.units.Duration;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
+import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
+import static com.facebook.presto.spi.type.IntegerType.INTEGER;
+import static com.facebook.presto.spi.type.VarcharType.createUnboundedVarcharType;
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PinotSessionProperties
+{
+ private static final String CONNECTION_TIMEOUT = "connection_timeout";
+ private static final String PREFER_BROKER_QUERIES = "prefer_broker_queries";
+ private static final String IGNORE_EMPTY_RESPONSES = "ignore_empty_responses";
+ private static final String RETRY_COUNT = "retry_count";
+ private static final String USE_DATE_TRUNC = "use_date_trunc";
+ private static final String NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES = "non_aggregate_limit_for_broker_queries";
+
+ @VisibleForTesting
+ public static final String FORBID_SEGMENT_QUERIES = "forbid_segment_queries";
+
+ @VisibleForTesting
+ public static final String NUM_SEGMENTS_PER_SPLIT = "num_segments_per_split";
+
+ private final List> sessionProperties;
+
+ public static int getNumSegmentsPerSplit(ConnectorSession session)
+ {
+ int segmentsPerSplit = session.getProperty(NUM_SEGMENTS_PER_SPLIT, Integer.class);
+ return segmentsPerSplit <= 0 ? Integer.MAX_VALUE : segmentsPerSplit;
+ }
+
+ public static boolean isPreferBrokerQueries(ConnectorSession session)
+ {
+ return session.getProperty(PREFER_BROKER_QUERIES, Boolean.class);
+ }
+
+ public static boolean isForbidSegmentQueries(ConnectorSession session)
+ {
+ return session.getProperty(FORBID_SEGMENT_QUERIES, Boolean.class);
+ }
+
+ public static Duration getConnectionTimeout(ConnectorSession session)
+ {
+ return session.getProperty(CONNECTION_TIMEOUT, Duration.class);
+ }
+
+ public static boolean isIgnoreEmptyResponses(ConnectorSession session)
+ {
+ return session.getProperty(IGNORE_EMPTY_RESPONSES, Boolean.class);
+ }
+
+ public static int getPinotRetryCount(ConnectorSession session)
+ {
+ return session.getProperty(RETRY_COUNT, Integer.class);
+ }
+
+ public static boolean isUseDateTruncation(ConnectorSession session)
+ {
+ return session.getProperty(USE_DATE_TRUNC, Boolean.class);
+ }
+
+ public static int getNonAggregateLimitForBrokerQueries(ConnectorSession session)
+ {
+ return session.getProperty(NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES, Integer.class);
+ }
+
+ @Inject
+ public PinotSessionProperties(PinotConfig pinotConfig)
+ {
+ sessionProperties = ImmutableList.of(
+ booleanProperty(
+ PREFER_BROKER_QUERIES,
+ "Prefer queries to broker even when parallel scan is enabled for aggregation queries",
+ pinotConfig.isPreferBrokerQueries(),
+ false),
+ booleanProperty(
+ FORBID_SEGMENT_QUERIES,
+ "Forbid segment queries",
+ pinotConfig.isForbidSegmentQueries(),
+ false),
+ booleanProperty(
+ IGNORE_EMPTY_RESPONSES,
+ "Ignore empty or missing pinot server responses",
+ pinotConfig.isIgnoreEmptyResponses(),
+ false),
+ integerProperty(
+ RETRY_COUNT,
+ "Retry count for retriable pinot data fetch calls",
+ pinotConfig.getFetchRetryCount(),
+ false),
+ integerProperty(
+ NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES,
+ "Max limit for non aggregate queries to the pinot broker",
+ pinotConfig.getNonAggregateLimitForBrokerQueries(),
+ false),
+ booleanProperty(
+ USE_DATE_TRUNC,
+ "Use the new UDF dateTrunc in pinot that is more presto compatible",
+ pinotConfig.isUseDateTrunc(),
+ false),
+ new PropertyMetadata<>(
+ CONNECTION_TIMEOUT,
+ "Connection Timeout to talk to Pinot servers",
+ createUnboundedVarcharType(),
+ Duration.class,
+ pinotConfig.getConnectionTimeout(),
+ false,
+ value -> Duration.valueOf((String) value),
+ Duration::toString),
+ new PropertyMetadata<>(
+ NUM_SEGMENTS_PER_SPLIT,
+ "Number of segments of the same host per split",
+ INTEGER,
+ Integer.class,
+ pinotConfig.getNumSegmentsPerSplit(),
+ false,
+ value -> {
+ int ret = ((Number) value).intValue();
+ checkArgument(ret > 0, "Number of segments per split must be more than zero");
+ return ret;
+ },
+ object -> object));
+ }
+
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplit.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplit.java
new file mode 100644
index 0000000000000..6f02c311cb1e4
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplit.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.pinot.query.PinotQueryGenerator;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class PinotSplit
+ implements ConnectorSplit
+{
+ private final String connectorId;
+ private final SplitType splitType;
+
+ // Properties needed for broker split type
+ private final Optional brokerPql;
+
+ // Properties needed for segment split type
+ private final Optional segmentPql;
+ private final List segments;
+ private final Optional segmentHost;
+
+ @JsonCreator
+ public PinotSplit(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("splitType") SplitType splitType,
+ @JsonProperty("brokerPql") Optional brokerPql,
+ @JsonProperty("segmentPql") Optional segmentPql,
+ @JsonProperty("segments") List segments,
+ @JsonProperty("segmentHost") Optional segmentHost)
+ {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ this.splitType = requireNonNull(splitType, "splitType id is null");
+ this.brokerPql = requireNonNull(brokerPql, "brokerPql is null");
+ this.segmentPql = requireNonNull(segmentPql, "table name is null");
+ this.segments = ImmutableList.copyOf(requireNonNull(segments, "segment is null"));
+ this.segmentHost = requireNonNull(segmentHost, "host is null");
+
+ // make sure the segment properties are present when the split type is segment
+ if (splitType == SplitType.SEGMENT) {
+ checkArgument(segmentPql.isPresent(), "Table name is missing from the split");
+ checkArgument(!segments.isEmpty(), "Segments are missing from the split");
+ checkArgument(segmentHost.isPresent(), "Segment host address is missing from the split");
+ }
+ else {
+ checkArgument(brokerPql.isPresent(), "brokerPql is missing from the split");
+ }
+ }
+
+ public static PinotSplit createBrokerSplit(String connectorId, PinotQueryGenerator.GeneratedPql brokerPql)
+ {
+ return new PinotSplit(
+ requireNonNull(connectorId, "connector id is null"),
+ SplitType.BROKER,
+ Optional.of(requireNonNull(brokerPql, "brokerPql is null")),
+ Optional.empty(),
+ ImmutableList.of(),
+ Optional.empty());
+ }
+
+ public static PinotSplit createSegmentSplit(String connectorId, String pql, List segments, String segmentHost)
+ {
+ return new PinotSplit(
+ requireNonNull(connectorId, "connector id is null"),
+ SplitType.SEGMENT,
+ Optional.empty(),
+ Optional.of(requireNonNull(pql, "pql is null")),
+ requireNonNull(segments, "segments are null"),
+ Optional.of(requireNonNull(segmentHost, "segmentHost is null")));
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public SplitType getSplitType()
+ {
+ return splitType;
+ }
+
+ @JsonProperty
+ public Optional getBrokerPql()
+ {
+ return brokerPql;
+ }
+
+ @JsonProperty
+ public Optional getSegmentPql()
+ {
+ return segmentPql;
+ }
+
+ @JsonProperty
+ public Optional getSegmentHost()
+ {
+ return segmentHost;
+ }
+
+ @JsonProperty
+ public List getSegments()
+ {
+ return segments;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("splitType", splitType)
+ .add("segmentPql", segmentPql)
+ .add("brokerPql", brokerPql)
+ .add("segments", segments)
+ .add("segmentHost", segmentHost)
+ .toString();
+ }
+
+ @Override
+ public boolean isRemotelyAccessible()
+ {
+ return true;
+ }
+
+ @Override
+ public List getAddresses()
+ {
+ return null;
+ }
+
+ @Override
+ public Object getInfo()
+ {
+ return this;
+ }
+
+ public enum SplitType
+ {
+ SEGMENT,
+ BROKER,
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java
new file mode 100644
index 0000000000000..34bb63bc83ac2
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.pinot.query.PinotQueryGenerator.GeneratedPql;
+import com.facebook.presto.spi.ConnectorId;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ErrorCode;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+import com.facebook.presto.spi.ErrorType;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.Iterables;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static com.facebook.presto.pinot.PinotSplit.createBrokerSplit;
+import static com.facebook.presto.pinot.PinotSplit.createSegmentSplit;
+import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.TABLE_NAME_SUFFIX_TEMPLATE;
+import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.TIME_BOUNDARY_FILTER_TEMPLATE;
+import static com.facebook.presto.spi.ErrorType.USER_ERROR;
+import static java.util.Collections.singletonList;
+import static java.util.Objects.requireNonNull;
+
+public class PinotSplitManager
+ implements ConnectorSplitManager
+{
+ private final String connectorId;
+ private final PinotConnection pinotPrestoConnection;
+
+ @Inject
+ public PinotSplitManager(ConnectorId connectorId, PinotConnection pinotPrestoConnection)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.pinotPrestoConnection = requireNonNull(pinotPrestoConnection, "pinotPrestoConnection is null");
+ }
+
+ protected ConnectorSplitSource generateSplitForBrokerBasedScan(GeneratedPql brokerPql)
+ {
+ return new FixedSplitSource(singletonList(createBrokerSplit(connectorId, brokerPql)));
+ }
+
+ protected ConnectorSplitSource generateSplitsForSegmentBasedScan(
+ PinotTableLayoutHandle pinotLayoutHandle,
+ ConnectorSession session)
+ {
+ PinotTableHandle tableHandle = pinotLayoutHandle.getTable();
+ String tableName = tableHandle.getTableName();
+ Map>> routingTable;
+
+ routingTable = pinotPrestoConnection.getRoutingTable(tableName);
+
+ List splits = new ArrayList<>();
+ if (!routingTable.isEmpty()) {
+ GeneratedPql segmentPql = tableHandle.getPql().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected to find realtime and offline pql in " + tableHandle));
+ PinotClusterInfoFetcher.TimeBoundary timeBoundary = pinotPrestoConnection.getTimeBoundary(tableName);
+ String realtime = getSegmentPql(segmentPql, "_REALTIME", timeBoundary.getOnlineTimePredicate());
+ String offline = getSegmentPql(segmentPql, "_OFFLINE", timeBoundary.getOfflineTimePredicate());
+ generateSegmentSplits(splits, routingTable, tableName, "_REALTIME", session, realtime);
+ generateSegmentSplits(splits, routingTable, tableName, "_OFFLINE", session, offline);
+ }
+
+ Collections.shuffle(splits);
+ return new FixedSplitSource(splits);
+ }
+
+ private String getSegmentPql(GeneratedPql basePql, String suffix, Optional timePredicate)
+ {
+ String pql = basePql.getPql().replace(TABLE_NAME_SUFFIX_TEMPLATE, suffix);
+ if (timePredicate.isPresent()) {
+ String tp = timePredicate.get();
+ pql = pql.replace(TIME_BOUNDARY_FILTER_TEMPLATE, basePql.isHaveFilter() ? tp : " WHERE " + tp);
+ }
+ else {
+ pql = pql.replace(TIME_BOUNDARY_FILTER_TEMPLATE, "");
+ }
+ return pql;
+ }
+
+ protected void generateSegmentSplits(
+ List splits,
+ Map>> routingTable,
+ String tableName,
+ String tableNameSuffix,
+ ConnectorSession session,
+ String pql)
+ {
+ final String finalTableName = tableName + tableNameSuffix;
+ int segmentsPerSplitConfigured = PinotSessionProperties.getNumSegmentsPerSplit(session);
+ for (String routingTableName : routingTable.keySet()) {
+ if (!routingTableName.equalsIgnoreCase(finalTableName)) {
+ continue;
+ }
+
+ Map> hostToSegmentsMap = routingTable.get(routingTableName);
+ hostToSegmentsMap.forEach((host, segments) -> {
+ int numSegmentsInThisSplit = Math.min(segments.size(), segmentsPerSplitConfigured);
+ // segments is already shuffled
+ Iterables.partition(segments, numSegmentsInThisSplit).forEach(
+ segmentsForThisSplit -> splits.add(
+ createSegmentSplit(connectorId, pql, segmentsForThisSplit, host)));
+ });
+ }
+ }
+
+ public enum QueryNotAdequatelyPushedDownErrorCode
+ implements ErrorCodeSupplier
+ {
+ PQL_NOT_PRESENT(1, USER_ERROR, "Query uses unsupported expressions that cannot be pushed into the storage engine. Please see https://XXX for more details");
+
+ private final ErrorCode errorCode;
+
+ QueryNotAdequatelyPushedDownErrorCode(int code, ErrorType type, String guidance)
+ {
+ errorCode = new ErrorCode(code + 0x0625_0000, name() + ": " + guidance, type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+ }
+
+ public static class QueryNotAdequatelyPushedDownException
+ extends PrestoException
+ {
+ private final String connectorId;
+ private final ConnectorTableHandle connectorTableHandle;
+
+ public QueryNotAdequatelyPushedDownException(
+ QueryNotAdequatelyPushedDownErrorCode errorCode,
+ ConnectorTableHandle connectorTableHandle,
+ String connectorId)
+ {
+ super(requireNonNull(errorCode, "error code is null"), (String) null);
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ this.connectorTableHandle = requireNonNull(connectorTableHandle, "connector table handle is null");
+ }
+
+ @Override
+ public String getMessage()
+ {
+ return super.getMessage() + String.format(" table: %s:%s", connectorId, connectorTableHandle);
+ }
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableLayoutHandle layout,
+ SplitSchedulingContext splitSchedulingContext)
+ {
+ PinotTableLayoutHandle pinotLayoutHandle = (PinotTableLayoutHandle) layout;
+ PinotTableHandle pinotTableHandle = pinotLayoutHandle.getTable();
+ Supplier errorSupplier = () -> new QueryNotAdequatelyPushedDownException(QueryNotAdequatelyPushedDownErrorCode.PQL_NOT_PRESENT, pinotTableHandle, connectorId);
+ if (!pinotTableHandle.getIsQueryShort().orElseThrow(errorSupplier)) {
+ if (PinotSessionProperties.isForbidSegmentQueries(session)) {
+ throw errorSupplier.get();
+ }
+ return generateSplitsForSegmentBasedScan(pinotLayoutHandle, session);
+ }
+ else {
+ return generateSplitForBrokerBasedScan(pinotTableHandle.getPql().orElseThrow(errorSupplier));
+ }
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTable.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTable.java
new file mode 100644
index 0000000000000..79fcdddd80b47
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class PinotTable
+{
+ private final String name;
+ private final List columns;
+ private final List columnsMetadata;
+
+ @JsonCreator
+ public PinotTable(
+ @JsonProperty("name") String name,
+ @JsonProperty("columns") List columns)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or is empty");
+ this.name = requireNonNull(name, "name is null");
+ this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
+
+ this.columnsMetadata = columns.stream().map(c -> new PinotColumnMetadata(c.getName(), c.getType())).collect(Collectors.toList());
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public List getColumns()
+ {
+ return columns;
+ }
+
+ public List getColumnsMetadata()
+ {
+ return columnsMetadata;
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java
new file mode 100644
index 0000000000000..0e7f134608339
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.pinot.query.PinotQueryGenerator;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public final class PinotTableHandle
+ implements ConnectorTableHandle
+{
+ private final String connectorId;
+ private final String schemaName;
+ private final String tableName;
+ private final Optional isQueryShort;
+ private final Optional pql;
+
+ public PinotTableHandle(
+ String connectorId,
+ String schemaName,
+ String tableName)
+ {
+ this(connectorId, schemaName, tableName, Optional.empty(), Optional.empty());
+ }
+
+ @JsonCreator
+ public PinotTableHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("isQueryShort") Optional isQueryShort,
+ @JsonProperty("pql") Optional pql)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.schemaName = requireNonNull(schemaName, "schemaName is null");
+ this.tableName = requireNonNull(tableName, "tableName is null");
+ this.isQueryShort = requireNonNull(isQueryShort, "safe to execute is null");
+ this.pql = requireNonNull(pql, "broker pql is null");
+ }
+
+ @JsonProperty
+ public Optional getPql()
+ {
+ return pql;
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public String getSchemaName()
+ {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public Optional getIsQueryShort()
+ {
+ return isQueryShort;
+ }
+
+ public SchemaTableName toSchemaTableName()
+ {
+ return new SchemaTableName(schemaName, tableName);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PinotTableHandle that = (PinotTableHandle) o;
+ return Objects.equals(connectorId, that.connectorId) &&
+ Objects.equals(schemaName, that.schemaName) &&
+ Objects.equals(tableName, that.tableName) &&
+ Objects.equals(isQueryShort, that.isQueryShort) &&
+ Objects.equals(pql, that.pql);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, schemaName, tableName, isQueryShort, pql);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("schemaName", schemaName)
+ .add("tableName", tableName)
+ .add("isQueryShort", isQueryShort)
+ .add("pql", pql)
+ .toString();
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java
new file mode 100644
index 0000000000000..dd234c6b7c185
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class PinotTableLayoutHandle
+ implements ConnectorTableLayoutHandle
+{
+ private final PinotTableHandle table;
+
+ @JsonCreator
+ public PinotTableLayoutHandle(
+ @JsonProperty("table") PinotTableHandle table)
+ {
+ this.table = requireNonNull(table, "table is null");
+ }
+
+ @JsonProperty
+ public PinotTableHandle getTable()
+ {
+ return table;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PinotTableLayoutHandle that = (PinotTableLayoutHandle) o;
+ return Objects.equals(table, that.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(table);
+ }
+
+ @Override
+ public String toString()
+ {
+ return table.toString();
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java
new file mode 100644
index 0000000000000..5e0fe3f7b31e2
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum PinotTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotUtils.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotUtils.java
new file mode 100644
index 0000000000000..1f13c7e97b478
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/PinotUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.google.common.base.Preconditions;
+
+import java.util.function.Function;
+
+import static java.net.HttpURLConnection.HTTP_MULT_CHOICE;
+import static java.net.HttpURLConnection.HTTP_OK;
+
+public class PinotUtils
+{
+ private PinotUtils()
+ {
+ }
+
+ static boolean isValidPinotHttpResponseCode(int status)
+ {
+ return status >= HTTP_OK && status < HTTP_MULT_CHOICE;
+ }
+
+ public static T doWithRetries(int retries, Function caller)
+ {
+ PinotException firstError = null;
+ Preconditions.checkState(retries > 0, "Invalid num of retries %d", retries);
+ for (int i = 0; i < retries; ++i) {
+ try {
+ return caller.apply(i);
+ }
+ catch (PinotException e) {
+ if (firstError == null) {
+ firstError = e;
+ }
+ if (!e.getPinotErrorCode().isRetriable()) {
+ throw e;
+ }
+ }
+ }
+ throw firstError;
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/RebindSafeMBeanServer.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/RebindSafeMBeanServer.java
new file mode 100644
index 0000000000000..1b7f8b8312660
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/RebindSafeMBeanServer.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.OperationsException;
+import javax.management.QueryExp;
+import javax.management.ReflectionException;
+import javax.management.loading.ClassLoaderRepository;
+
+import java.io.ObjectInputStream;
+import java.util.Set;
+
+/**
+ * MBeanServer wrapper that a ignores calls to registerMBean when there is already
+ * a MBean registered with the specified object name.
+ *
+ * This originally existed in hive, raptor and cassandra and I am promoting it to SPI
+ */
+@ThreadSafe
+public class RebindSafeMBeanServer
+ implements MBeanServer
+{
+ private final MBeanServer mbeanServer;
+
+ public RebindSafeMBeanServer(MBeanServer mbeanServer)
+ {
+ this.mbeanServer = mbeanServer;
+ }
+
+ /**
+ * Delegates to the wrapped mbean server, but if a mbean is already registered
+ * with the specified name, the existing instance is returned.
+ */
+ @Override
+ public ObjectInstance registerMBean(Object object, ObjectName name)
+ throws MBeanRegistrationException, NotCompliantMBeanException
+ {
+ while (true) {
+ try {
+ // try to register the mbean
+ return mbeanServer.registerMBean(object, name);
+ }
+ catch (InstanceAlreadyExistsException ignored) {
+ }
+
+ try {
+ // a mbean is already installed, try to return the already registered instance
+ ObjectInstance objectInstance = mbeanServer.getObjectInstance(name);
+ return objectInstance;
+ }
+ catch (InstanceNotFoundException ignored) {
+ // the mbean was removed before we could get the reference
+ // start the whole process over again
+ }
+ }
+ }
+
+ @Override
+ public void unregisterMBean(ObjectName name)
+ throws InstanceNotFoundException, MBeanRegistrationException
+ {
+ mbeanServer.unregisterMBean(name);
+ }
+
+ @Override
+ public ObjectInstance getObjectInstance(ObjectName name)
+ throws InstanceNotFoundException
+ {
+ return mbeanServer.getObjectInstance(name);
+ }
+
+ @Override
+ public Set queryMBeans(ObjectName name, QueryExp query)
+ {
+ return mbeanServer.queryMBeans(name, query);
+ }
+
+ @Override
+ public Set queryNames(ObjectName name, QueryExp query)
+ {
+ return mbeanServer.queryNames(name, query);
+ }
+
+ @Override
+ public boolean isRegistered(ObjectName name)
+ {
+ return mbeanServer.isRegistered(name);
+ }
+
+ @Override
+ public Integer getMBeanCount()
+ {
+ return mbeanServer.getMBeanCount();
+ }
+
+ @Override
+ public Object getAttribute(ObjectName name, String attribute)
+ throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException
+ {
+ return mbeanServer.getAttribute(name, attribute);
+ }
+
+ @Override
+ public AttributeList getAttributes(ObjectName name, String[] attributes)
+ throws InstanceNotFoundException, ReflectionException
+ {
+ return mbeanServer.getAttributes(name, attributes);
+ }
+
+ @Override
+ public void setAttribute(ObjectName name, Attribute attribute)
+ throws InstanceNotFoundException, AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException
+ {
+ mbeanServer.setAttribute(name, attribute);
+ }
+
+ @Override
+ public AttributeList setAttributes(ObjectName name, AttributeList attributes)
+ throws InstanceNotFoundException, ReflectionException
+ {
+ return mbeanServer.setAttributes(name, attributes);
+ }
+
+ @Override
+ public Object invoke(ObjectName name, String operationName, Object[] params, String[] signature)
+ throws InstanceNotFoundException, MBeanException, ReflectionException
+ {
+ return mbeanServer.invoke(name, operationName, params, signature);
+ }
+
+ @Override
+ public String getDefaultDomain()
+ {
+ return mbeanServer.getDefaultDomain();
+ }
+
+ @Override
+ public String[] getDomains()
+ {
+ return mbeanServer.getDomains();
+ }
+
+ @Override
+ public void addNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object context)
+ throws InstanceNotFoundException
+ {
+ mbeanServer.addNotificationListener(name, listener, filter, context);
+ }
+
+ @Override
+ public void addNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object context)
+ throws InstanceNotFoundException
+ {
+ mbeanServer.addNotificationListener(name, listener, filter, context);
+ }
+
+ @Override
+ public void removeNotificationListener(ObjectName name, ObjectName listener)
+ throws InstanceNotFoundException, ListenerNotFoundException
+ {
+ mbeanServer.removeNotificationListener(name, listener);
+ }
+
+ @Override
+ public void removeNotificationListener(ObjectName name, ObjectName listener, NotificationFilter filter, Object context)
+ throws InstanceNotFoundException, ListenerNotFoundException
+ {
+ mbeanServer.removeNotificationListener(name, listener, filter, context);
+ }
+
+ @Override
+ public void removeNotificationListener(ObjectName name, NotificationListener listener)
+ throws InstanceNotFoundException, ListenerNotFoundException
+ {
+ mbeanServer.removeNotificationListener(name, listener);
+ }
+
+ @Override
+ public void removeNotificationListener(ObjectName name, NotificationListener listener, NotificationFilter filter, Object context)
+ throws InstanceNotFoundException, ListenerNotFoundException
+ {
+ mbeanServer.removeNotificationListener(name, listener, filter, context);
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo(ObjectName name)
+ throws InstanceNotFoundException, IntrospectionException, ReflectionException
+ {
+ return mbeanServer.getMBeanInfo(name);
+ }
+
+ @Override
+ public boolean isInstanceOf(ObjectName name, String className)
+ throws InstanceNotFoundException
+ {
+ return mbeanServer.isInstanceOf(name, className);
+ }
+
+ @Override
+ public Object instantiate(String className)
+ throws ReflectionException, MBeanException
+ {
+ return mbeanServer.instantiate(className);
+ }
+
+ @Override
+ public Object instantiate(String className, ObjectName loaderName)
+ throws ReflectionException, MBeanException, InstanceNotFoundException
+ {
+ return mbeanServer.instantiate(className, loaderName);
+ }
+
+ @Override
+ public Object instantiate(String className, Object[] params, String[] signature)
+ throws ReflectionException, MBeanException
+ {
+ return mbeanServer.instantiate(className, params, signature);
+ }
+
+ @Override
+ public Object instantiate(String className, ObjectName loaderName, Object[] params, String[] signature)
+ throws ReflectionException, MBeanException, InstanceNotFoundException
+ {
+ return mbeanServer.instantiate(className, loaderName, params, signature);
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public ObjectInputStream deserialize(ObjectName name, byte[] data)
+ throws OperationsException
+ {
+ return mbeanServer.deserialize(name, data);
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public ObjectInputStream deserialize(String className, byte[] data)
+ throws OperationsException, ReflectionException
+ {
+ return mbeanServer.deserialize(className, data);
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("deprecation")
+ public ObjectInputStream deserialize(String className, ObjectName loaderName, byte[] data)
+ throws OperationsException, ReflectionException
+ {
+ return mbeanServer.deserialize(className, loaderName, data);
+ }
+
+ @Override
+ public ClassLoader getClassLoaderFor(ObjectName mbeanName)
+ throws InstanceNotFoundException
+ {
+ return mbeanServer.getClassLoaderFor(mbeanName);
+ }
+
+ @Override
+ public ClassLoader getClassLoader(ObjectName loaderName)
+ throws InstanceNotFoundException
+ {
+ return mbeanServer.getClassLoader(loaderName);
+ }
+
+ @Override
+ public ClassLoaderRepository getClassLoaderRepository()
+ {
+ return mbeanServer.getClassLoaderRepository();
+ }
+
+ @Override
+ public ObjectInstance createMBean(String className, ObjectName name)
+ throws ReflectionException, InstanceAlreadyExistsException, MBeanException, NotCompliantMBeanException
+ {
+ return mbeanServer.createMBean(className, name);
+ }
+
+ @Override
+ public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName)
+ throws ReflectionException, InstanceAlreadyExistsException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException
+ {
+ return mbeanServer.createMBean(className, name, loaderName);
+ }
+
+ @Override
+ public ObjectInstance createMBean(String className, ObjectName name, Object[] params, String[] signature)
+ throws ReflectionException, InstanceAlreadyExistsException, MBeanException, NotCompliantMBeanException
+ {
+ return mbeanServer.createMBean(className, name, params, signature);
+ }
+
+ @Override
+ public ObjectInstance createMBean(String className, ObjectName name, ObjectName loaderName, Object[] params, String[] signature)
+ throws ReflectionException, InstanceAlreadyExistsException, MBeanException, NotCompliantMBeanException, InstanceNotFoundException
+ {
+ return mbeanServer.createMBean(className, name, loaderName, params, signature);
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotAggregationProjectConverter.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotAggregationProjectConverter.java
new file mode 100644
index 0000000000000..308b028ba3553
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotAggregationProjectConverter.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.presto.pinot.PinotException;
+import com.facebook.presto.pinot.PinotSessionProperties;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.function.FunctionMetadata;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.OperatorType;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import org.joda.time.DateTimeZone;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
+import static com.facebook.presto.pinot.PinotPushdownUtils.getLiteralAsString;
+import static com.facebook.presto.pinot.query.PinotExpression.derived;
+import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public class PinotAggregationProjectConverter
+ extends PinotProjectExpressionConverter
+{
+ // Pinot does not support modulus yet
+ private static final Map PRESTO_TO_PINOT_OPERATORS = ImmutableMap.of(
+ "-", "SUB",
+ "+", "ADD",
+ "*", "MULT",
+ "/", "DIV");
+ private static final String FROM_UNIXTIME = "from_unixtime";
+
+ private final FunctionMetadataManager functionMetadataManager;
+ private final ConnectorSession session;
+
+ public PinotAggregationProjectConverter(TypeManager typeManager, FunctionMetadataManager functionMetadataManager, StandardFunctionResolution standardFunctionResolution, ConnectorSession session)
+ {
+ super(typeManager, standardFunctionResolution);
+ this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
+ this.session = requireNonNull(session, "session is null");
+ }
+
+ @Override
+ public PinotExpression visitCall(
+ CallExpression call,
+ Map context)
+ {
+ Optional basicCallHandlingResult = basicCallHandling(call, context);
+ if (basicCallHandlingResult.isPresent()) {
+ return basicCallHandlingResult.get();
+ }
+ FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
+ Optional operatorTypeOptional = functionMetadata.getOperatorType();
+ if (operatorTypeOptional.isPresent()) {
+ OperatorType operatorType = operatorTypeOptional.get();
+ if (operatorType.isArithmeticOperator()) {
+ return handleArithmeticExpression(call, operatorType, context);
+ }
+ if (operatorType.isComparisonOperator()) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Comparison operator not supported: " + call);
+ }
+ }
+ return handleFunction(call, context);
+ }
+
+ @Override
+ public PinotExpression visitConstant(
+ ConstantExpression literal,
+ Map context)
+ {
+ return new PinotExpression(getLiteralAsString(literal), PinotQueryGeneratorContext.Origin.LITERAL);
+ }
+
+ private PinotExpression handleDateTruncationViaDateTimeConvert(
+ CallExpression function,
+ Map context)
+ {
+ // Convert SQL standard function `DATE_TRUNC(INTERVAL, DATE/TIMESTAMP COLUMN)` to
+ // Pinot's equivalent function `dateTimeConvert(columnName, inputFormat, outputFormat, outputGranularity)`
+ // Pinot doesn't have a DATE/TIMESTAMP type. That means the input column (second argument) has been converted from numeric type to DATE/TIMESTAMP using one of the
+ // conversion functions in SQL. First step is find the function and find its input column units (seconds, secondsSinceEpoch etc.)
+ RowExpression timeInputParameter = function.getArguments().get(1);
+ String inputColumn;
+ String inputFormat;
+
+ CallExpression timeConversion = getExpressionAsFunction(timeInputParameter, timeInputParameter);
+ switch (timeConversion.getDisplayName().toLowerCase(ENGLISH)) {
+ case FROM_UNIXTIME:
+ inputColumn = timeConversion.getArguments().get(0).accept(this, context).getDefinition();
+ inputFormat = "'1:SECONDS:EPOCH'";
+ break;
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "not supported: " + timeConversion.getDisplayName());
+ }
+
+ String outputFormat = "'1:MILLISECONDS:EPOCH'";
+ String outputGranularity;
+
+ RowExpression intervalParameter = function.getArguments().get(0);
+ if (!(intervalParameter instanceof ConstantExpression)) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(),
+ "interval unit in date_trunc is not supported: " + intervalParameter);
+ }
+
+ String value = getStringFromConstant(intervalParameter);
+ switch (value) {
+ case "second":
+ outputGranularity = "'1:SECONDS'";
+ break;
+ case "minute":
+ outputGranularity = "'1:MINUTES'";
+ break;
+ case "hour":
+ outputGranularity = "'1:HOURS'";
+ break;
+ case "day":
+ outputGranularity = "'1:DAYS'";
+ break;
+ case "week":
+ outputGranularity = "'1:WEEKS'";
+ break;
+ case "month":
+ outputGranularity = "'1:MONTHS'";
+ break;
+ case "quarter":
+ outputGranularity = "'1:QUARTERS'";
+ break;
+ case "year":
+ outputGranularity = "'1:YEARS'";
+ break;
+ default:
+ throw new PinotException(
+ PINOT_UNSUPPORTED_EXPRESSION,
+ Optional.empty(),
+ "interval in date_trunc is not supported: " + value);
+ }
+
+ return derived("dateTimeConvert(" + inputColumn + ", " + inputFormat + ", " + outputFormat + ", " + outputGranularity + ")");
+ }
+
+ private PinotExpression handleDateTruncationViaDateTruncation(
+ CallExpression function,
+ Map context)
+ {
+ RowExpression timeInputParameter = function.getArguments().get(1);
+ String inputColumn;
+ String inputTimeZone;
+ String inputFormat;
+
+ CallExpression timeConversion = getExpressionAsFunction(timeInputParameter, timeInputParameter);
+ switch (timeConversion.getDisplayName().toLowerCase(ENGLISH)) {
+ case FROM_UNIXTIME:
+ inputColumn = timeConversion.getArguments().get(0).accept(this, context).getDefinition();
+ inputTimeZone = timeConversion.getArguments().size() > 1 ? getStringFromConstant(timeConversion.getArguments().get(1)) : DateTimeZone.UTC.getID();
+ inputFormat = "seconds";
+ break;
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "not supported: " + timeConversion.getDisplayName());
+ }
+
+ RowExpression intervalParameter = function.getArguments().get(0);
+ if (!(intervalParameter instanceof ConstantExpression)) {
+ throw new PinotException(
+ PINOT_UNSUPPORTED_EXPRESSION,
+ Optional.empty(),
+ "interval unit in date_trunc is not supported: " + intervalParameter);
+ }
+
+ return derived("dateTrunc(" + inputColumn + "," + inputFormat + ", " + inputTimeZone + ", " + getStringFromConstant(intervalParameter) + ")");
+ }
+
+ private PinotExpression handleArithmeticExpression(
+ CallExpression expression,
+ OperatorType operatorType,
+ Map context)
+ {
+ List arguments = expression.getArguments();
+ if (arguments.size() == 1) {
+ String prefix = operatorType == OperatorType.NEGATION ? "-" : "";
+ return derived(prefix + arguments.get(0).accept(this, context).getDefinition());
+ }
+ if (arguments.size() == 2) {
+ PinotExpression left = arguments.get(0).accept(this, context);
+ PinotExpression right = arguments.get(1).accept(this, context);
+ String prestoOperator = operatorType.getOperator();
+ String pinotOperator = PRESTO_TO_PINOT_OPERATORS.get(prestoOperator);
+ if (pinotOperator == null) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Unsupported binary expression " + prestoOperator);
+ }
+ return derived(format("%s(%s, %s)", pinotOperator, left.getDefinition(), right.getDefinition()));
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("Don't know how to interpret %s as an arithmetic expression", expression));
+ }
+
+ private PinotExpression handleFunction(
+ CallExpression function,
+ Map context)
+ {
+ switch (function.getDisplayName().toLowerCase(ENGLISH)) {
+ case "date_trunc":
+ boolean useDateTruncation = PinotSessionProperties.isUseDateTruncation(session);
+ return useDateTruncation ?
+ handleDateTruncationViaDateTruncation(function, context) :
+ handleDateTruncationViaDateTimeConvert(function, context);
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("function %s not supported yet", function.getDisplayName()));
+ }
+ }
+
+ private static String getStringFromConstant(RowExpression expression)
+ {
+ if (expression instanceof ConstantExpression) {
+ Object value = ((ConstantExpression) expression).getValue();
+ if (value instanceof String) {
+ return (String) value;
+ }
+ if (value instanceof Slice) {
+ return ((Slice) value).toStringUtf8();
+ }
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected string literal but found " + expression);
+ }
+
+ private CallExpression getExpressionAsFunction(
+ RowExpression originalExpression,
+ RowExpression expression)
+ {
+ if (expression instanceof CallExpression) {
+ CallExpression call = (CallExpression) expression;
+ if (standardFunctionResolution.isCastFunction(call.getFunctionHandle())) {
+ if (isImplicitCast(call.getArguments().get(0).getType(), call.getType())) {
+ return getExpressionAsFunction(originalExpression, call.getArguments().get(0));
+ }
+ }
+ else {
+ return call;
+ }
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Could not dig function out of expression: " + originalExpression + ", inside of " + expression);
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotExpression.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotExpression.java
new file mode 100644
index 0000000000000..13deee4f76eb3
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Origin;
+
+import static java.util.Objects.requireNonNull;
+
+public class PinotExpression
+{
+ private final String definition;
+ private final Origin origin;
+
+ PinotExpression(String definition, Origin origin)
+ {
+ this.definition = requireNonNull(definition, "definition is null");
+ this.origin = requireNonNull(origin, "origin is null");
+ }
+
+ static PinotExpression derived(String definition)
+ {
+ return new PinotExpression(definition, Origin.DERIVED);
+ }
+
+ public String getDefinition()
+ {
+ return definition;
+ }
+
+ public Origin getOrigin()
+ {
+ return origin;
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotFilterExpressionConverter.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotFilterExpressionConverter.java
new file mode 100644
index 0000000000000..736521dc30c87
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotFilterExpressionConverter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.presto.pinot.PinotException;
+import com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Origin;
+import com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Selection;
+import com.facebook.presto.spi.function.FunctionHandle;
+import com.facebook.presto.spi.function.FunctionMetadata;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.OperatorType;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.facebook.presto.spi.relation.InputReferenceExpression;
+import com.facebook.presto.spi.relation.LambdaDefinitionExpression;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.RowExpressionVisitor;
+import com.facebook.presto.spi.relation.SpecialFormExpression;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
+import static com.facebook.presto.pinot.PinotPushdownUtils.getLiteralAsString;
+import static com.facebook.presto.pinot.query.PinotExpression.derived;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Convert {@link RowExpression} in filter into Pinot complaint expression text
+ */
+public class PinotFilterExpressionConverter
+ implements RowExpressionVisitor>
+{
+ private static final Set LOGICAL_BINARY_OPS_FILTER = ImmutableSet.of("=", "<", "<=", ">", ">=", "<>");
+ private final TypeManager typeManager;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final StandardFunctionResolution standardFunctionResolution;
+
+ public PinotFilterExpressionConverter(
+ TypeManager typeManager,
+ FunctionMetadataManager functionMetadataManager,
+ StandardFunctionResolution standardFunctionResolution)
+ {
+ this.typeManager = requireNonNull(typeManager, "type manager is null");
+ this.functionMetadataManager = requireNonNull(functionMetadataManager, "function metadata manager is null");
+ this.standardFunctionResolution = requireNonNull(standardFunctionResolution, "standardFunctionResolution is null");
+ }
+
+ private PinotExpression handleIn(
+ SpecialFormExpression specialForm,
+ boolean isWhitelist,
+ Function context)
+ {
+ return derived(format("(%s %s (%s))",
+ specialForm.getArguments().get(0).accept(this, context).getDefinition(),
+ isWhitelist ? "IN" : "NOT IN",
+ specialForm.getArguments().subList(1, specialForm.getArguments().size()).stream()
+ .map(argument -> argument.accept(this, context).getDefinition())
+ .collect(Collectors.joining(", "))));
+ }
+
+ private PinotExpression handleLogicalBinary(
+ String operator,
+ CallExpression call,
+ Function context)
+ {
+ if (!LOGICAL_BINARY_OPS_FILTER.contains(operator)) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("'%s' is not supported in filter", operator));
+ }
+ List arguments = call.getArguments();
+ if (arguments.size() == 2) {
+ return derived(format(
+ "(%s %s %s)",
+ arguments.get(0).accept(this, context).getDefinition(),
+ operator,
+ arguments.get(1).accept(this, context).getDefinition()));
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("Unknown logical binary: '%s'", call));
+ }
+
+ private PinotExpression handleBetween(
+ CallExpression between,
+ Function context)
+ {
+ if (between.getArguments().size() == 3) {
+ RowExpression value = between.getArguments().get(0);
+ RowExpression min = between.getArguments().get(1);
+ RowExpression max = between.getArguments().get(2);
+
+ return derived(format(
+ "(%s BETWEEN %s AND %s)",
+ value.accept(this, context).getDefinition(),
+ min.accept(this, context).getDefinition(),
+ max.accept(this, context).getDefinition()));
+ }
+
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("Between operator not supported: %s", between));
+ }
+
+ private PinotExpression handleNot(CallExpression not, Function context)
+ {
+ if (not.getArguments().size() == 1) {
+ RowExpression input = not.getArguments().get(0);
+ if (input instanceof SpecialFormExpression) {
+ SpecialFormExpression specialFormExpression = (SpecialFormExpression) input;
+ // NOT operator is only supported on top of the IN expression
+ if (specialFormExpression.getForm() == SpecialFormExpression.Form.IN) {
+ return handleIn(specialFormExpression, false, context);
+ }
+ }
+ }
+
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("NOT operator is supported only on top of IN operator. Received: %s", not));
+ }
+
+ private PinotExpression handleCast(CallExpression cast, Function context)
+ {
+ if (cast.getArguments().size() == 1) {
+ RowExpression input = cast.getArguments().get(0);
+ Type expectedType = cast.getType();
+ if (typeManager.canCoerce(input.getType(), expectedType)) {
+ return input.accept(this, context);
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Non implicit casts not supported: " + cast);
+ }
+
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("This type of CAST operator not supported. Received: %s", cast));
+ }
+
+ @Override
+ public PinotExpression visitCall(CallExpression call, Function context)
+ {
+ FunctionHandle functionHandle = call.getFunctionHandle();
+ if (standardFunctionResolution.isNotFunction(functionHandle)) {
+ return handleNot(call, context);
+ }
+ if (standardFunctionResolution.isCastFunction(functionHandle)) {
+ return handleCast(call, context);
+ }
+ if (standardFunctionResolution.isBetweenFunction(functionHandle)) {
+ return handleBetween(call, context);
+ }
+ FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle());
+ Optional operatorTypeOptional = functionMetadata.getOperatorType();
+ if (operatorTypeOptional.isPresent()) {
+ OperatorType operatorType = operatorTypeOptional.get();
+ if (operatorType.isArithmeticOperator()) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Arithmetic expressions are not supported in filter: " + call);
+ }
+ if (operatorType.isComparisonOperator()) {
+ return handleLogicalBinary(operatorType.getOperator(), call, context);
+ }
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("function %s not supported in filter", call));
+ }
+
+ @Override
+ public PinotExpression visitInputReference(InputReferenceExpression reference, Function context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Pinot does not support struct dereferencing " + reference);
+ }
+
+ @Override
+ public PinotExpression visitConstant(ConstantExpression literal, Function context)
+ {
+ return new PinotExpression(getLiteralAsString(literal), Origin.LITERAL);
+ }
+
+ @Override
+ public PinotExpression visitLambda(LambdaDefinitionExpression lambda, Function context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Pinot does not support lambda " + lambda);
+ }
+
+ @Override
+ public PinotExpression visitVariableReference(VariableReferenceExpression reference, Function context)
+ {
+ Selection input = requireNonNull(context.apply(reference), format("Input column %s does not exist in the input: %s", reference, context));
+ return new PinotExpression(input.getDefinition(), input.getOrigin());
+ }
+
+ @Override
+ public PinotExpression visitSpecialForm(SpecialFormExpression specialForm, Function context)
+ {
+ switch (specialForm.getForm()) {
+ case IF:
+ case NULL_IF:
+ case SWITCH:
+ case WHEN:
+ case IS_NULL:
+ case COALESCE:
+ case DEREFERENCE:
+ case ROW_CONSTRUCTOR:
+ case BIND:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Pinot does not support the special form " + specialForm);
+ case IN:
+ return handleIn(specialForm, true, context);
+ case AND:
+ case OR:
+ return derived(format(
+ "(%s %s %s)",
+ specialForm.getArguments().get(0).accept(this, context).getDefinition(),
+ specialForm.getForm().toString(),
+ specialForm.getArguments().get(1).accept(this, context).getDefinition()));
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Unexpected special form: " + specialForm);
+ }
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProjectExpressionConverter.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProjectExpressionConverter.java
new file mode 100644
index 0000000000000..51e0ba8f20679
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotProjectExpressionConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.presto.pinot.PinotException;
+import com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Selection;
+import com.facebook.presto.spi.function.FunctionHandle;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.facebook.presto.spi.relation.InputReferenceExpression;
+import com.facebook.presto.spi.relation.LambdaDefinitionExpression;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.RowExpressionVisitor;
+import com.facebook.presto.spi.relation.SpecialFormExpression;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.facebook.presto.spi.type.StandardTypes;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+class PinotProjectExpressionConverter
+ implements RowExpressionVisitor>
+{
+ private static final Set TIME_EQUIVALENT_TYPES = ImmutableSet.of(StandardTypes.BIGINT, StandardTypes.INTEGER, StandardTypes.TINYINT, StandardTypes.SMALLINT);
+
+ protected final TypeManager typeManager;
+ protected final StandardFunctionResolution standardFunctionResolution;
+
+ public PinotProjectExpressionConverter(
+ TypeManager typeManager,
+ StandardFunctionResolution standardFunctionResolution)
+ {
+ this.typeManager = requireNonNull(typeManager, "type manager");
+ this.standardFunctionResolution = requireNonNull(standardFunctionResolution, "standardFunctionResolution is null");
+ }
+
+ @Override
+ public PinotExpression visitVariableReference(
+ VariableReferenceExpression reference,
+ Map context)
+ {
+ Selection input = requireNonNull(context.get(reference), format("Input column %s does not exist in the input", reference));
+ return new PinotExpression(input.getDefinition(), input.getOrigin());
+ }
+
+ @Override
+ public PinotExpression visitLambda(
+ LambdaDefinitionExpression lambda,
+ Map context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Pinot does not support lambda " + lambda);
+ }
+
+ protected boolean isImplicitCast(Type inputType, Type resultType)
+ {
+ if (typeManager.canCoerce(inputType, resultType)) {
+ return true;
+ }
+ return resultType.getTypeSignature().getBase().equals(StandardTypes.TIMESTAMP) && TIME_EQUIVALENT_TYPES.contains(inputType.getTypeSignature().getBase());
+ }
+
+ private PinotExpression handleCast(
+ CallExpression cast,
+ Map context)
+ {
+ if (cast.getArguments().size() == 1) {
+ RowExpression input = cast.getArguments().get(0);
+ Type expectedType = cast.getType();
+ if (isImplicitCast(input.getType(), expectedType)) {
+ return input.accept(this, context);
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Non implicit casts not supported: " + cast);
+ }
+
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("This type of CAST operator not supported. Received: %s", cast));
+ }
+
+ protected Optional basicCallHandling(CallExpression call, Map context)
+ {
+ FunctionHandle functionHandle = call.getFunctionHandle();
+ if (standardFunctionResolution.isNotFunction(functionHandle) || standardFunctionResolution.isBetweenFunction(functionHandle)) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Unsupported function in pinot aggregation: " + functionHandle);
+ }
+ if (standardFunctionResolution.isCastFunction(functionHandle)) {
+ return Optional.of(handleCast(call, context));
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public PinotExpression visitInputReference(
+ InputReferenceExpression reference,
+ Map context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Input reference not supported: " + reference);
+ }
+
+ @Override
+ public PinotExpression visitSpecialForm(
+ SpecialFormExpression specialForm,
+ Map context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Special form not supported: " + specialForm);
+ }
+
+ @Override
+ public PinotExpression visitCall(CallExpression call, Map context)
+ {
+ return basicCallHandling(call, context).orElseThrow(() -> new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Call not supported: " + call));
+ }
+
+ @Override
+ public PinotExpression visitConstant(ConstantExpression literal, Map context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Constant not supported: " + literal);
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java
new file mode 100644
index 0000000000000..c98e409c323e2
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGenerator.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.pinot.PinotColumnHandle;
+import com.facebook.presto.pinot.PinotConfig;
+import com.facebook.presto.pinot.PinotException;
+import com.facebook.presto.pinot.PinotPushdownUtils.AggregationColumnNode;
+import com.facebook.presto.pinot.PinotPushdownUtils.AggregationFunctionColumnNode;
+import com.facebook.presto.pinot.PinotPushdownUtils.GroupByColumnNode;
+import com.facebook.presto.pinot.PinotSessionProperties;
+import com.facebook.presto.pinot.PinotTableHandle;
+import com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Selection;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.plan.AggregationNode;
+import com.facebook.presto.spi.plan.FilterNode;
+import com.facebook.presto.spi.plan.LimitNode;
+import com.facebook.presto.spi.plan.PlanNode;
+import com.facebook.presto.spi.plan.PlanVisitor;
+import com.facebook.presto.spi.plan.ProjectNode;
+import com.facebook.presto.spi.plan.TableScanNode;
+import com.facebook.presto.spi.plan.TopNNode;
+import com.facebook.presto.spi.relation.CallExpression;
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.facebook.presto.spi.relation.RowExpression;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.facebook.presto.spi.type.BigintType;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableMap;
+
+import javax.inject.Inject;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
+import static com.facebook.presto.pinot.PinotPushdownUtils.checkSupported;
+import static com.facebook.presto.pinot.PinotPushdownUtils.computeAggregationNodes;
+import static com.facebook.presto.pinot.PinotPushdownUtils.getLiteralAsString;
+import static com.facebook.presto.pinot.PinotPushdownUtils.getOrderingScheme;
+import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Origin.DERIVED;
+import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Origin.LITERAL;
+import static com.facebook.presto.pinot.query.PinotQueryGeneratorContext.Origin.TABLE_COLUMN;
+import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public class PinotQueryGenerator
+{
+ private static final Logger log = Logger.get(PinotQueryGenerator.class);
+ private static final Map UNARY_AGGREGATION_MAP = ImmutableMap.of(
+ "min", "min",
+ "max", "max",
+ "avg", "avg",
+ "sum", "sum",
+ "approx_distinct", "DISTINCTCOUNTHLL");
+
+ private final PinotConfig pinotConfig;
+ private final TypeManager typeManager;
+ private final FunctionMetadataManager functionMetadataManager;
+ private final StandardFunctionResolution standardFunctionResolution;
+ private final PinotFilterExpressionConverter pinotFilterExpressionConverter;
+ private final PinotProjectExpressionConverter pinotProjectExpressionConverter;
+
+ @Inject
+ public PinotQueryGenerator(
+ PinotConfig pinotConfig,
+ TypeManager typeManager,
+ FunctionMetadataManager functionMetadataManager,
+ StandardFunctionResolution standardFunctionResolution)
+ {
+ this.pinotConfig = requireNonNull(pinotConfig, "pinot config is null");
+ this.typeManager = requireNonNull(typeManager, "type manager is null");
+ this.functionMetadataManager = requireNonNull(functionMetadataManager, "function metadata manager is null");
+ this.standardFunctionResolution = requireNonNull(standardFunctionResolution, "standardFunctionResolution is null");
+ this.pinotFilterExpressionConverter = new PinotFilterExpressionConverter(this.typeManager, this.functionMetadataManager, standardFunctionResolution);
+ this.pinotProjectExpressionConverter = new PinotProjectExpressionConverter(typeManager, standardFunctionResolution);
+ }
+
+ public static class PinotQueryGeneratorResult
+ {
+ private final GeneratedPql generatedPql;
+ private final PinotQueryGeneratorContext context;
+
+ public PinotQueryGeneratorResult(
+ GeneratedPql generatedPql,
+ PinotQueryGeneratorContext context)
+ {
+ this.generatedPql = requireNonNull(generatedPql, "generatedPql is null");
+ this.context = requireNonNull(context, "context is null");
+ }
+
+ public GeneratedPql getGeneratedPql()
+ {
+ return generatedPql;
+ }
+
+ public PinotQueryGeneratorContext getContext()
+ {
+ return context;
+ }
+ }
+
+ public Optional generate(PlanNode plan, ConnectorSession session)
+ {
+ try {
+ boolean preferBrokerQueries = PinotSessionProperties.isPreferBrokerQueries(session);
+ PinotQueryGeneratorContext context = requireNonNull(plan.accept(new PinotQueryPlanVisitor(session, preferBrokerQueries), new PinotQueryGeneratorContext()), "Resulting context is null");
+ boolean isQueryShort = context.isQueryShort(PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session));
+ return Optional.of(new PinotQueryGeneratorResult(context.toQuery(pinotConfig, preferBrokerQueries, isQueryShort), context));
+ }
+ catch (PinotException e) {
+ log.debug(e, "Possibly benign error when pushing plan into scan node %s", plan);
+ return Optional.empty();
+ }
+ }
+
+ public static class GeneratedPql
+ {
+ final String table;
+ final String pql;
+ final List expectedColumnIndices;
+ final int groupByClauses;
+ final boolean haveFilter;
+ final boolean isQueryShort;
+
+ @JsonCreator
+ public GeneratedPql(
+ @JsonProperty("table") String table,
+ @JsonProperty("pql") String pql,
+ @JsonProperty("expectedColumnIndices") List expectedColumnIndices,
+ @JsonProperty("groupByClauses") int groupByClauses,
+ @JsonProperty("haveFilter") boolean haveFilter,
+ @JsonProperty("isQueryShort") boolean isQueryShort)
+ {
+ this.table = table;
+ this.pql = pql;
+ this.expectedColumnIndices = expectedColumnIndices;
+ this.groupByClauses = groupByClauses;
+ this.haveFilter = haveFilter;
+ this.isQueryShort = isQueryShort;
+ }
+
+ @JsonProperty("pql")
+ public String getPql()
+ {
+ return pql;
+ }
+
+ @JsonProperty("expectedColumnIndices")
+ public List getExpectedColumnIndices()
+ {
+ return expectedColumnIndices;
+ }
+
+ @JsonProperty("groupByClauses")
+ public int getGroupByClauses()
+ {
+ return groupByClauses;
+ }
+
+ @JsonProperty("table")
+ public String getTable()
+ {
+ return table;
+ }
+
+ @JsonProperty("haveFilter")
+ public boolean isHaveFilter()
+ {
+ return haveFilter;
+ }
+
+ @JsonProperty("isQueryShort")
+ public boolean isQueryShort()
+ {
+ return isQueryShort;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("pql", pql)
+ .add("table", table)
+ .add("expectedColumnIndices", expectedColumnIndices)
+ .add("groupByClauses", groupByClauses)
+ .add("haveFilter", haveFilter)
+ .add("isQueryShort", isQueryShort)
+ .toString();
+ }
+ }
+
+ class PinotQueryPlanVisitor
+ extends PlanVisitor
+ {
+ private final ConnectorSession session;
+ private final boolean preferBrokerQueries;
+
+ protected PinotQueryPlanVisitor(ConnectorSession session, boolean preferBrokerQueries)
+ {
+ this.session = session;
+ this.preferBrokerQueries = preferBrokerQueries;
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitPlan(PlanNode node, PinotQueryGeneratorContext context)
+ {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Don't know how to handle plan node of type " + node);
+ }
+
+ protected VariableReferenceExpression getVariableReference(RowExpression expression)
+ {
+ if (expression instanceof VariableReferenceExpression) {
+ return ((VariableReferenceExpression) expression);
+ }
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected a variable reference but got " + expression);
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitFilter(FilterNode node, PinotQueryGeneratorContext context)
+ {
+ context = node.getSource().accept(this, context);
+ requireNonNull(context, "context is null");
+ LinkedHashMap selections = context.getSelections();
+ String filter = node.getPredicate().accept(pinotFilterExpressionConverter, selections::get).getDefinition();
+ return context.withFilter(filter).withOutputColumns(node.getOutputVariables());
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitProject(ProjectNode node, PinotQueryGeneratorContext contextIn)
+ {
+ PinotQueryGeneratorContext context = node.getSource().accept(this, contextIn);
+ requireNonNull(context, "context is null");
+ LinkedHashMap newSelections = new LinkedHashMap<>();
+
+ node.getOutputVariables().forEach(variable -> {
+ RowExpression expression = node.getAssignments().get(variable);
+ PinotExpression pinotExpression = expression.accept(
+ contextIn.getVariablesInAggregation().contains(variable) ?
+ new PinotAggregationProjectConverter(typeManager, functionMetadataManager, standardFunctionResolution, session) : pinotProjectExpressionConverter,
+ context.getSelections());
+ newSelections.put(
+ variable,
+ new Selection(pinotExpression.getDefinition(), pinotExpression.getOrigin()));
+ });
+ return context.withProject(newSelections);
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitTableScan(TableScanNode node, PinotQueryGeneratorContext contextIn)
+ {
+ PinotTableHandle tableHandle = (PinotTableHandle) node.getTable().getConnectorHandle();
+ checkSupported(!tableHandle.getPql().isPresent(), "Expect to see no existing pql");
+ checkSupported(!tableHandle.getIsQueryShort().isPresent(), "Expect to see no existing pql");
+ LinkedHashMap selections = new LinkedHashMap<>();
+ node.getOutputVariables().forEach(outputColumn -> {
+ PinotColumnHandle pinotColumn = (PinotColumnHandle) (node.getAssignments().get(outputColumn));
+ checkSupported(pinotColumn.getType().equals(PinotColumnHandle.PinotColumnType.REGULAR), "Unexpected pinot column handle that is not regular: %s", pinotColumn);
+ selections.put(outputColumn, new Selection(pinotColumn.getColumnName(), TABLE_COLUMN));
+ });
+ return new PinotQueryGeneratorContext(selections, tableHandle.getTableName());
+ }
+
+ private String handleAggregationFunction(CallExpression aggregation, Map inputSelections)
+ {
+ String prestoAggregation = aggregation.getDisplayName().toLowerCase(ENGLISH);
+ List parameters = aggregation.getArguments();
+ switch (prestoAggregation) {
+ case "count":
+ if (parameters.size() <= 1) {
+ return format("count(%s)", parameters.isEmpty() ? "*" : inputSelections.get(getVariableReference(parameters.get(0))));
+ }
+ break;
+ case "approx_percentile":
+ return handleApproxPercentile(aggregation, inputSelections);
+ default:
+ if (UNARY_AGGREGATION_MAP.containsKey(prestoAggregation) && aggregation.getArguments().size() == 1) {
+ return format("%s(%s)", UNARY_AGGREGATION_MAP.get(prestoAggregation), inputSelections.get(getVariableReference(parameters.get(0))));
+ }
+ }
+
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), format("aggregation function '%s' not supported yet", aggregation));
+ }
+
+ private String handleApproxPercentile(CallExpression aggregation, Map inputSelections)
+ {
+ List inputs = aggregation.getArguments();
+ if (inputs.size() != 2) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Cannot handle approx_percentile function " + aggregation);
+ }
+
+ String fractionString;
+ RowExpression fractionInput = inputs.get(1);
+
+ if (fractionInput instanceof ConstantExpression) {
+ fractionString = getLiteralAsString((ConstantExpression) fractionInput);
+ }
+ else if (fractionInput instanceof VariableReferenceExpression) {
+ Selection fraction = inputSelections.get(fractionInput);
+ if (fraction.getOrigin() != LITERAL) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(),
+ "Cannot handle approx_percentile percentage argument be a non literal " + aggregation);
+ }
+ fractionString = fraction.getDefinition();
+ }
+ else {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected the fraction to be a constant or a variable " + fractionInput);
+ }
+
+ int percentile = getValidPercentile(fractionString);
+ if (percentile < 0) {
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(),
+ format("Cannot handle approx_percentile parsed as %d from input %s (function %s)", percentile, fractionString, aggregation));
+ }
+ return format("PERCENTILEEST%d(%s)", percentile, inputSelections.get(getVariableReference(inputs.get(0))));
+ }
+
+ private int getValidPercentile(String fraction)
+ {
+ try {
+ double percent = Double.parseDouble(fraction);
+ if (percent < 0 || percent > 1) {
+ throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Percentile must be between 0 and 1");
+ }
+ percent = percent * 100.0;
+ if (percent == Math.floor(percent)) {
+ return (int) percent;
+ }
+ }
+ catch (NumberFormatException ne) {
+ // Skip
+ }
+ return -1;
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQueryGeneratorContext contextIn)
+ {
+ List aggregationColumnNodes = computeAggregationNodes(node);
+
+ // Make two passes over the aggregatinColumnNodes: In the first pass identify all the variables that will be used
+ // Then pass that context to the source
+ // And finally, in the second pass actually generate the PQL
+
+ // 1st pass
+ Set variablesInAggregation = new HashSet<>();
+ for (AggregationColumnNode expression : aggregationColumnNodes) {
+ switch (expression.getExpressionType()) {
+ case GROUP_BY: {
+ GroupByColumnNode groupByColumn = (GroupByColumnNode) expression;
+ VariableReferenceExpression groupByInputColumn = getVariableReference(groupByColumn.getInputColumn());
+ variablesInAggregation.add(groupByInputColumn);
+ break;
+ }
+ case AGGREGATE: {
+ AggregationFunctionColumnNode aggregationNode = (AggregationFunctionColumnNode) expression;
+ variablesInAggregation.addAll(
+ aggregationNode
+ .getCallExpression()
+ .getArguments()
+ .stream()
+ .filter(argument -> argument instanceof VariableReferenceExpression)
+ .map(argument -> (VariableReferenceExpression) argument)
+ .collect(Collectors.toList()));
+ break;
+ }
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "unknown aggregation expression: " + expression.getExpressionType());
+ }
+ }
+
+ // now visit the child project node
+ PinotQueryGeneratorContext context = node.getSource().accept(this, contextIn.withVariablesInAggregation(variablesInAggregation));
+ requireNonNull(context, "context is null");
+ checkSupported(!node.getStep().isOutputPartial(), "partial aggregations are not supported in Pinot pushdown framework");
+ checkSupported(preferBrokerQueries, "Cannot push aggregation in segment mode");
+
+ // 2nd pass
+ LinkedHashMap newSelections = new LinkedHashMap<>();
+ LinkedHashSet groupByColumns = new LinkedHashSet<>();
+ Set hiddenColumnSet = new HashSet<>(context.getHiddenColumnSet());
+ int aggregations = 0;
+ boolean groupByExists = false;
+
+ for (AggregationColumnNode expression : aggregationColumnNodes) {
+ switch (expression.getExpressionType()) {
+ case GROUP_BY: {
+ GroupByColumnNode groupByColumn = (GroupByColumnNode) expression;
+ VariableReferenceExpression groupByInputColumn = getVariableReference(groupByColumn.getInputColumn());
+ VariableReferenceExpression outputColumn = getVariableReference(groupByColumn.getOutputColumn());
+ Selection pinotColumn = requireNonNull(context.getSelections().get(groupByInputColumn), "Group By column " + groupByInputColumn + " doesn't exist in input " + context.getSelections());
+
+ newSelections.put(outputColumn, new Selection(pinotColumn.getDefinition(), pinotColumn.getOrigin()));
+ groupByColumns.add(outputColumn);
+ groupByExists = true;
+ break;
+ }
+ case AGGREGATE: {
+ AggregationFunctionColumnNode aggregationNode = (AggregationFunctionColumnNode) expression;
+ String pinotAggFunction = handleAggregationFunction(aggregationNode.getCallExpression(), context.getSelections());
+ newSelections.put(getVariableReference(aggregationNode.getOutputColumn()), new Selection(pinotAggFunction, DERIVED));
+ aggregations++;
+ break;
+ }
+ default:
+ throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "unknown aggregation expression: " + expression.getExpressionType());
+ }
+ }
+
+ // Handling non-aggregated group by
+ if (groupByExists && aggregations == 0) {
+ VariableReferenceExpression hidden = new VariableReferenceExpression(UUID.randomUUID().toString(), BigintType.BIGINT);
+ newSelections.put(hidden, new Selection("count(*)", DERIVED));
+ hiddenColumnSet.add(hidden);
+ aggregations++;
+ }
+ return context.withAggregation(newSelections, groupByColumns, aggregations, hiddenColumnSet);
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitLimit(LimitNode node, PinotQueryGeneratorContext context)
+ {
+ checkSupported(!node.isPartial(), String.format("pinot query generator cannot handle partial limit"));
+ checkSupported(preferBrokerQueries, "Cannot push limit in segment mode");
+ context = node.getSource().accept(this, context);
+ requireNonNull(context, "context is null");
+ return context.withLimit(node.getCount()).withOutputColumns(node.getOutputVariables());
+ }
+
+ @Override
+ public PinotQueryGeneratorContext visitTopN(TopNNode node, PinotQueryGeneratorContext context)
+ {
+ context = node.getSource().accept(this, context);
+ requireNonNull(context, "context is null");
+ checkSupported(preferBrokerQueries, "Cannot push topn in segment mode");
+ checkSupported(node.getStep().equals(TopNNode.Step.SINGLE), "Can only push single logical topn in");
+ return context.withTopN(getOrderingScheme(node), node.getCount()).withOutputColumns(node.getOutputVariables());
+ }
+ }
+}
diff --git a/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java
new file mode 100644
index 0000000000000..e7388f3a3aea8
--- /dev/null
+++ b/presto-pinot-toolkit/src/main/java/com/facebook/presto/pinot/query/PinotQueryGeneratorContext.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot.query;
+
+import com.facebook.presto.pinot.PinotColumnHandle;
+import com.facebook.presto.pinot.PinotConfig;
+import com.facebook.presto.pinot.PinotException;
+import com.facebook.presto.spi.block.SortOrder;
+import com.facebook.presto.spi.relation.VariableReferenceExpression;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_QUERY_GENERATOR_FAILURE;
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNSUPPORTED_EXPRESSION;
+import static com.facebook.presto.pinot.PinotPushdownUtils.checkSupported;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.lang.StrictMath.toIntExact;
+import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulates the components needed to construct a PQL query and provides methods to update the current context with new operations.
+ */
+public class PinotQueryGeneratorContext
+{
+ public static final String TIME_BOUNDARY_FILTER_TEMPLATE = "__TIME_BOUNDARY_FILTER_TEMPLATE__";
+ public static final String TABLE_NAME_SUFFIX_TEMPLATE = "__TABLE_NAME_SUFFIX_TEMPLATE__";
+ // Fields defining the query
+ // order map that maps the column definition in terms of input relation column(s)
+ private final LinkedHashMap selections;
+ private final LinkedHashSet groupByColumns;
+ private final LinkedHashMap topNColumnOrderingMap;
+ private final Set hiddenColumnSet;
+ private final Set variablesInAggregation;
+ private final Optional from;
+ private final Optional filter;
+ private final OptionalInt limit;
+ private final int aggregations;
+
+ public boolean isQueryShort(int nonAggregateRowLimit)
+ {
+ return hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateRowLimit;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("selections", selections)
+ .add("groupByColumns", groupByColumns)
+ .add("hiddenColumnSet", hiddenColumnSet)
+ .add("from", from)
+ .add("filter", filter)
+ .add("limit", limit)
+ .add("aggregations", aggregations)
+ .toString();
+ }
+
+ PinotQueryGeneratorContext()
+ {
+ this(new LinkedHashMap<>(), null);
+ }
+
+ PinotQueryGeneratorContext(
+ LinkedHashMap selections,
+ String from)
+ {
+ this(
+ selections,
+ Optional.ofNullable(from),
+ Optional.empty(),
+ 0,
+ new LinkedHashSet<>(),
+ new LinkedHashMap<>(),
+ OptionalInt.empty(),
+ new HashSet<>(),
+ new HashSet<>());
+ }
+
+ private PinotQueryGeneratorContext(
+ LinkedHashMap selections,
+ Optional from,
+ Optional filter,
+ int aggregations,
+ LinkedHashSet groupByColumns,
+ LinkedHashMap topNColumnOrderingMap,
+ OptionalInt limit,
+ Set variablesInAggregation,
+ Set hiddenColumnSet)
+ {
+ this.selections = new LinkedHashMap<>(requireNonNull(selections, "selections can't be null"));
+ this.from = requireNonNull(from, "source can't be null");
+ this.aggregations = aggregations;
+ this.groupByColumns = new LinkedHashSet<>(requireNonNull(groupByColumns, "groupByColumns can't be null. It could be empty if not available"));
+ this.topNColumnOrderingMap = new LinkedHashMap<>(requireNonNull(topNColumnOrderingMap, "topNColumnOrderingMap can't be null. It could be empty if not available"));
+ this.filter = requireNonNull(filter, "filter is null");
+ this.limit = requireNonNull(limit, "limit is null");
+ this.hiddenColumnSet = requireNonNull(hiddenColumnSet, "hidden column set is null");
+ this.variablesInAggregation = requireNonNull(variablesInAggregation, "variables in aggregation is null");
+ }
+
+ /**
+ * Apply the given filter to current context and return the updated context. Throws error for invalid operations.
+ */
+ public PinotQueryGeneratorContext withFilter(String filter)
+ {
+ checkSupported(!hasFilter(), "There already exists a filter. Pinot doesn't support filters at multiple levels");
+ checkSupported(!hasAggregation(), "Pinot doesn't support filtering the results of aggregation");
+ checkSupported(!hasLimit(), "Pinot doesn't support filtering on top of the limit");
+ return new PinotQueryGeneratorContext(
+ selections,
+ from,
+ Optional.of(filter),
+ aggregations,
+ groupByColumns,
+ topNColumnOrderingMap,
+ limit,
+ variablesInAggregation,
+ hiddenColumnSet);
+ }
+
+ /**
+ * Apply the aggregation to current context and return the updated context. Throws error for invalid operations.
+ */
+ public PinotQueryGeneratorContext withAggregation(
+ LinkedHashMap newSelections,
+ LinkedHashSet groupByColumns,
+ int aggregations,
+ Set hiddenColumnSet)
+ {
+ // there is only one aggregation supported.
+ checkSupported(!hasAggregation(), "Pinot doesn't support aggregation on top of the aggregated data");
+ checkSupported(!hasLimit(), "Pinot doesn't support aggregation on top of the limit");
+ checkSupported(aggregations > 0, "Invalid number of aggregations");
+ return new PinotQueryGeneratorContext(newSelections, from, filter, aggregations, groupByColumns, topNColumnOrderingMap, limit, variablesInAggregation, hiddenColumnSet);
+ }
+
+ /**
+ * Apply new selections/project to current context and return the updated context. Throws error for invalid operations.
+ */
+ public PinotQueryGeneratorContext withProject(LinkedHashMap newSelections)
+ {
+ checkSupported(groupByColumns.isEmpty(), "Pinot doesn't yet support new selections on top of the grouped by data");
+ return new PinotQueryGeneratorContext(
+ newSelections,
+ from,
+ filter,
+ aggregations,
+ groupByColumns,
+ topNColumnOrderingMap,
+ limit,
+ variablesInAggregation,
+ hiddenColumnSet);
+ }
+
+ private static int checkForValidLimit(long limit)
+ {
+ if (limit <= 0 || limit > Integer.MAX_VALUE) {
+ throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Limit " + limit + " not supported: Limit is not being pushed down");
+ }
+ return toIntExact(limit);
+ }
+
+ /**
+ * Apply limit to current context and return the updated context. Throws error for invalid operations.
+ */
+ public PinotQueryGeneratorContext withLimit(long limit)
+ {
+ int intLimit = checkForValidLimit(limit);
+ checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support limit on top of another limit");
+ return new PinotQueryGeneratorContext(
+ selections,
+ from,
+ filter,
+ aggregations,
+ groupByColumns,
+ topNColumnOrderingMap,
+ OptionalInt.of(intLimit),
+ variablesInAggregation,
+ hiddenColumnSet);
+ }
+
+ /**
+ * Apply order by to current context and return the updated context. Throws error for invalid operations.
+ */
+ public PinotQueryGeneratorContext withTopN(LinkedHashMap orderByColumnOrderingMap, long limit)
+ {
+ checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support order by limit on top of another limit");
+ checkSupported(!hasAggregation(), "Pinot doesn't support ordering on top of the aggregated data");
+ int intLimit = checkForValidLimit(limit);
+ return new PinotQueryGeneratorContext(
+ selections,
+ from,
+ filter,
+ aggregations,
+ groupByColumns,
+ orderByColumnOrderingMap,
+ OptionalInt.of(intLimit),
+ variablesInAggregation,
+ hiddenColumnSet);
+ }
+
+ private boolean hasFilter()
+ {
+ return filter.isPresent();
+ }
+
+ private boolean hasLimit()
+ {
+ return limit.isPresent();
+ }
+
+ private boolean hasAggregation()
+ {
+ return aggregations > 0;
+ }
+
+ private boolean hasOrderBy()
+ {
+ return !topNColumnOrderingMap.isEmpty();
+ }
+
+ public LinkedHashMap getSelections()
+ {
+ return selections;
+ }
+
+ public Set getHiddenColumnSet()
+ {
+ return hiddenColumnSet;
+ }
+
+ Set getVariablesInAggregation()
+ {
+ return variablesInAggregation;
+ }
+
+ /**
+ * Convert the current context to a PQL
+ */
+ public PinotQueryGenerator.GeneratedPql toQuery(PinotConfig pinotConfig, boolean preferBrokerQueries, boolean isQueryShort)
+ {
+ boolean forBroker = preferBrokerQueries && isQueryShort;
+ if (!pinotConfig.isAllowMultipleAggregations() && aggregations > 1 && !groupByColumns.isEmpty()) {
+ throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by is forbidden");
+ }
+
+ if (hasLimit() && aggregations > 1 && !groupByColumns.isEmpty()) {
+ throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by and limit is forbidden");
+ }
+
+ String expressions = selections.entrySet().stream()
+ .filter(s -> !groupByColumns.contains(s.getKey())) // remove the group by columns from the query as Pinot barfs if the group by column is an expression
+ .map(s -> s.getValue().getDefinition())
+ .collect(Collectors.joining(", "));
+
+ String tableName = from.orElseThrow(() -> new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Table name not encountered yet"));
+ String query = "SELECT " + expressions + " FROM " + tableName + (forBroker ? "" : TABLE_NAME_SUFFIX_TEMPLATE);
+ if (filter.isPresent()) {
+ String filterString = filter.get();
+ // this is hack!!!. Ideally we want to clone the scan pipeline and create/update the filter in the scan pipeline to contain this filter and
+ // at the same time add the time column to scan so that the query generator doesn't fail when it looks up the time column in scan output columns
+ query += format(" WHERE %s%s", filterString, forBroker ? "" : TIME_BOUNDARY_FILTER_TEMPLATE);
+ }
+ else if (!forBroker) {
+ query += TIME_BOUNDARY_FILTER_TEMPLATE;
+ }
+
+ if (!groupByColumns.isEmpty()) {
+ String groupByExpr = groupByColumns.stream().map(x -> selections.get(x).getDefinition()).collect(Collectors.joining(", "));
+ query = query + " GROUP BY " + groupByExpr;
+ }
+
+ if (hasOrderBy()) {
+ String orderByExpressions = topNColumnOrderingMap.entrySet().stream().map(entry -> selections.get(entry.getKey()).getDefinition() + (entry.getValue().isAscending() ? "" : " DESC")).collect(Collectors.joining(", "));
+ query = query + " ORDER BY " + orderByExpressions;
+ }
+ // Rules for limit:
+ // - If its a selection query:
+ // + given limit or configured limit
+ // - Else if has group by:
+ // + ensure that only one aggregation
+ // + default limit or configured top limit
+ // - Fail if limit is invalid
+
+ String limitKeyWord = "";
+ int queryLimit = -1;
+
+ if (!hasAggregation()) {
+ if (!limit.isPresent() && forBroker) {
+ throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Broker non aggregate queries have to have a limit");
+ }
+ else {
+ queryLimit = limit.orElseGet(pinotConfig::getLimitLargeForSegment);
+ }
+ limitKeyWord = "LIMIT";
+ }
+ else if (!groupByColumns.isEmpty()) {
+ limitKeyWord = "TOP";
+ if (limit.isPresent()) {
+ if (aggregations > 1) {
+ throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.of(query),
+ "Pinot has weird semantics with group by and multiple aggregation functions and limits");
+ }
+ else {
+ queryLimit = limit.getAsInt();
+ }
+ }
+ else {
+ queryLimit = pinotConfig.getTopNLarge();
+ }
+ }
+
+ if (!limitKeyWord.isEmpty()) {
+ query += " " + limitKeyWord + " " + queryLimit;
+ }
+
+ List columnHandles = ImmutableList.copyOf(getAssignments().values());
+ return new PinotQueryGenerator.GeneratedPql(tableName, query, getIndicesMappingFromPinotSchemaToPrestoSchema(query, columnHandles), groupByColumns.size(), filter.isPresent(), isQueryShort);
+ }
+
+ private List