types = ImmutableList.builder();
+ columnHandles.stream()
+ .map(PinotPageSource::getTypeForBlock)
+ .forEach(types::add);
+ columnTypes = types.build();
+ readTimeNanos += System.nanoTime() - startTimeNanos;
+ isPinotDataFetched = true;
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ /**
+ * Generates the {@link com.facebook.presto.spi.block.Block} for the specific column from the {@link #currentDataTable}.
+ *
+ * Based on the original Pinot column types, write as Presto-supported values to {@link com.facebook.presto.spi.block.BlockBuilder}, e.g.
+ * FLOAT -> Double, INT -> Long, String -> Slice.
+ *
+ * @param blockBuilder blockBuilder for the current column
+ * @param columnType type of the column
+ * @param columnIdx column index
+ */
+
+ private void writeBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ Class> javaType = columnType.getJavaType();
+ DataSchema.ColumnDataType pinotColumnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIdx);
+ if (javaType.equals(boolean.class)) {
+ writeBooleanBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(long.class)) {
+ writeLongBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(double.class)) {
+ writeDoubleBlock(blockBuilder, columnType, columnIdx);
+ }
+ else if (javaType.equals(Slice.class)) {
+ writeSliceBlock(blockBuilder, columnType, columnIdx);
+ }
+ else {
+ throw new PrestoException(
+ PINOT_UNSUPPORTED_COLUMN_TYPE,
+ String.format(
+ "Failed to write column %s. pinotColumnType %s, javaType %s",
+ columnHandles.get(columnIdx).getColumnName(), pinotColumnType, javaType));
+ }
+ }
+
+ private void writeBooleanBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ IntStream.rangeClosed(0, currentDataTable.getDataTable().getNumberOfRows() - 1).forEach(i ->
+ {
+ columnType.writeBoolean(blockBuilder, getBoolean(i, columnIdx));
+ completedBytes++;
+ });
+ }
+
+ private void writeLongBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ IntStream.rangeClosed(0, currentDataTable.getDataTable().getNumberOfRows() - 1).forEach(i ->
+ {
+ columnType.writeLong(blockBuilder, getLong(i, columnIdx));
+ completedBytes += Long.BYTES;
+ });
+ }
+
+ private void writeDoubleBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ IntStream.rangeClosed(0, currentDataTable.getDataTable().getNumberOfRows() - 1).forEach(i ->
+ {
+ columnType.writeDouble(blockBuilder, getDouble(i, columnIdx));
+ completedBytes += Double.BYTES;
+ });
+ }
+
+ private void writeSliceBlock(BlockBuilder blockBuilder, Type columnType, int columnIdx)
+ {
+ IntStream.rangeClosed(0, currentDataTable.getDataTable().getNumberOfRows() - 1).forEach(i ->
+ {
+ Slice slice = getSlice(i, columnIdx);
+ columnType.writeSlice(blockBuilder, slice, 0, slice.length());
+ completedBytes += slice.getBytes().length;
+ });
+ }
+
+ private static Type getType(int columnIndex)
+ {
+ checkArgument(columnIndex < columnHandles.size(), "Invalid field index");
+ return columnHandles.get(columnIndex).getColumnType();
+ }
+
+ private boolean getBoolean(int rowIndex, int columnIndex)
+ {
+ return Boolean.getBoolean(currentDataTable.getDataTable().getString(rowIndex, columnIndex));
+ }
+
+ private long getLong(int rowIndex, int columnIndex)
+ {
+ DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ // Note columnType in the dataTable could be different from the original columnType in the columnHandle.
+ // e.g. when original column type is int/long and aggregation value is requested, the returned dataType from Pinot would be double.
+ // So need to cast it back to the original columnType.
+ if (dataType.equals(DataSchema.ColumnDataType.DOUBLE)) {
+ return (long) currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
+ }
+ if (dataType.equals(DataSchema.ColumnDataType.INT)) {
+ return (long) currentDataTable.getDataTable().getInt(rowIndex, columnIndex);
+ }
+ return currentDataTable.getDataTable().getLong(rowIndex, columnIndex);
+ }
+
+ private double getDouble(int rowIndex, int columnIndex)
+ {
+ DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ if (dataType.equals(DataType.FLOAT)) {
+ return currentDataTable.getDataTable().getFloat(rowIndex, columnIndex);
+ }
+ return currentDataTable.getDataTable().getDouble(rowIndex, columnIndex);
+ }
+
+ private Slice getSlice(int rowIndex, int columnIndex)
+ {
+ checkColumnType(columnIndex, VARCHAR);
+ DataSchema.ColumnDataType columnType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex);
+ switch (columnType) {
+ case INT_ARRAY:
+ int[] intArray = currentDataTable.getDataTable().getIntArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(intArray));
+ case LONG_ARRAY:
+ long[] longArray = currentDataTable.getDataTable().getLongArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(longArray));
+ case FLOAT_ARRAY:
+ float[] floatArray = currentDataTable.getDataTable().getFloatArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(floatArray));
+ case DOUBLE_ARRAY:
+ double[] doubleArray = currentDataTable.getDataTable().getDoubleArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(doubleArray));
+ case STRING_ARRAY:
+ String[] stringArray = currentDataTable.getDataTable().getStringArray(rowIndex, columnIndex);
+ return utf8Slice(Arrays.toString(stringArray));
+ case STRING:
+ String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
+ if (field == null || field.isEmpty()) {
+ return Slices.EMPTY_SLICE;
+ }
+ return Slices.utf8Slice(field);
+ }
+ return Slices.EMPTY_SLICE;
+ }
+
+ /**
+ * Get estimated size in bytes for the Pinot column.
+ * Deterministic for numeric fields; use estimate for other types to save calculation.
+ *
+ * @param dataType FieldSpec.dataType for Pinot column.
+ * @return estimated size in bytes.
+ */
+ private int getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType)
+ {
+ if (dataType.isNumber()) {
+ switch (dataType) {
+ case LONG:
+ return Long.BYTES;
+ case FLOAT:
+ return Float.BYTES;
+ case DOUBLE:
+ return Double.BYTES;
+ case INT:
+ default:
+ return Integer.BYTES;
+ }
+ }
+ return pinotConfig.getEstimatedSizeInBytesForNonNumericColumn();
+ }
+
+ private static void checkColumnType(int columnIndex, Type expected)
+ {
+ Type actual = getType(columnIndex);
+ checkArgument(actual.equals(expected), "Expected column %s to be type %s but is %s", columnIndex, expected, actual);
+ }
+
+ private static Type getTypeForBlock(PinotColumnHandle pinotColumnHandle)
+ {
+ if (pinotColumnHandle.getColumnType().equals(INTEGER)) {
+ return BIGINT;
+ }
+ return pinotColumnHandle.getColumnType();
+ }
+
+ private static class PinotDataTableWithSize
+ {
+ final DataTable dataTable;
+ int estimatedSizeInBytes;
+
+ PinotDataTableWithSize(DataTable dataTable, int estimatedSizeInBytes)
+ {
+ this.dataTable = requireNonNull(dataTable);
+ this.estimatedSizeInBytes = estimatedSizeInBytes;
+ }
+
+ public DataTable getDataTable()
+ {
+ return dataTable;
+ }
+
+ public int getEstimatedSizeInBytes()
+ {
+ return estimatedSizeInBytes;
+ }
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java
new file mode 100644
index 0000000000000..a25b120e78e93
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPageSourceProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static com.facebook.presto.pinot.PinotUtils.checkType;
+import static java.util.Objects.requireNonNull;
+
+public class PinotPageSourceProvider
+ implements ConnectorPageSourceProvider
+{
+ private final PinotConfig pinotConfig;
+ private final PinotScatterGatherQueryClient pinotQueryClient;
+
+ @Inject
+ public PinotPageSourceProvider(PinotConfig pinotConfig, PinotScatterGatherQueryClient pinotQueryClient)
+ {
+ this.pinotConfig = requireNonNull(pinotConfig, "pinotConfig is null");
+ this.pinotQueryClient = requireNonNull(pinotQueryClient, "pinotQueryClient is null");
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns)
+ {
+ requireNonNull(split, "partitionChunk is null");
+ PinotSplit pinotSplit = checkType(split, PinotSplit.class, "split");
+
+ ImmutableList.Builder handlesBuilder = ImmutableList.builder();
+ if (columns.isEmpty()) {
+ // For COUNT(*) and COUNT(1), no columns are passed down to Pinot
+ // Since this is the only known type of queries for this scenario, we just select time column from Pinot to facilitate the COUNT
+ handlesBuilder.add(new PinotColumnHandle(pinotSplit.getTimeColumn().getName(), pinotSplit.getTimeColumn().getType(), 0));
+ }
+ else {
+ columns.forEach(handle -> handlesBuilder.add(checkType(handle, PinotColumnHandle.class, "handle")));
+ }
+ return new PinotPageSource(pinotConfig, pinotQueryClient, pinotSplit, handlesBuilder.build());
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPlugin.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPlugin.java
new file mode 100644
index 0000000000000..d2875462b73a5
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotPlugin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+
+public class PinotPlugin
+ implements Plugin
+{
+ @Override
+ public synchronized Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new PinotConnectorFactory());
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotQueryBuilder.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotQueryBuilder.java
new file mode 100644
index 0000000000000..e40cd68d14603
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotQueryBuilder.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Marker;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.type.VarcharType;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import org.apache.pinot.common.utils.CommonConstants;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringJoiner;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.pinot.PinotUtils.QUOTE;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * This class manages how to generate the query to send to Pinot servers.
+ */
+public final class PinotQueryBuilder
+{
+ private static final Logger log = Logger.get(PinotQueryBuilder.class);
+
+ private PinotQueryBuilder()
+ {
+ }
+
+ /**
+ * QUERY_TEMPLATE looks like this:
+ * SELECT $fields FROM $tableName $predicate LIMIT $limit.
+ *
+ * Note $predicate is optional, and we intentionally add a space between $tableName $predicate for readability.
+ * When $predicate is absent, there would be 2 spaces between $tableName and LIMIT, which is should not hurt the query itself.
+ */
+ private static final String QUERY_TEMPLATE = "SELECT %s FROM %s %s LIMIT %d";
+
+ /**
+ * Returns the Pinot Query to send for each split.
+ *
+ *
Pinot Query would be constructed based on {$link #QUERY_TEMPLATE} and predicates (WHERE ...).
+ *
+ * @return the constructed Pinot Query
+ */
+ static String getPinotQuery(PinotConfig pinotConfig, List columnHandles, String pinotFilter, String timeFilter, String tableName, long splitLimit)
+ {
+ requireNonNull(pinotConfig, "pinotConfig is null");
+ StringJoiner fieldsJoiner = new StringJoiner(", ");
+ for (PinotColumnHandle columnHandle : columnHandles) {
+ // No aggregation pushdown
+ fieldsJoiner.add(columnHandle.getColumnName());
+ }
+
+ // Add predicates
+ StringJoiner predicatesJoiner = new StringJoiner(" AND ");
+ if (!pinotFilter.isEmpty()) {
+ predicatesJoiner.add(String.format("(%s)", pinotFilter));
+ }
+ if (!timeFilter.isEmpty()) {
+ predicatesJoiner.add(String.format("(%s)", timeFilter));
+ }
+
+ // Note pinotPredicate is optional. It would be empty when no predicates are pushed down.
+ String pinotPredicate = "";
+ if (predicatesJoiner.length() > 0) {
+ pinotPredicate = "WHERE " + predicatesJoiner.toString();
+ }
+
+ long limit = splitLimit > 0 ? splitLimit : pinotConfig.getLimitAll();
+
+ String finalQuery = String.format(QUERY_TEMPLATE, fieldsJoiner.toString(), tableName, pinotPredicate, limit);
+ log.debug("Plan to send PQL : %s", finalQuery);
+ return finalQuery;
+ }
+
+ /**
+ * Get the predicates for a column in string format, for constructing Pinot queries directly
+ *
+ * @param domain TupleDomain representing the allowed ranges for a column
+ * @param columnName Pinot column name
+ * @return Predicate in Pinot Query Language for the column. Empty string would be returned if no constraints
+ */
+ @VisibleForTesting
+ static String getColumnPredicate(Domain domain, String columnName)
+ {
+ List discreteConstraintList = new ArrayList<>();
+ List singleValueRangeConstraintList = new ArrayList<>();
+ List rangeConstraintList = new ArrayList<>();
+
+ return domain.getValues().getValuesProcessor().transform(
+ ranges ->
+ {
+ for (Range range : ranges.getOrderedRanges()) {
+ if (range.isSingleValue()) {
+ singleValueRangeConstraintList.add(getMarkerValue(range.getLow()));
+ }
+ else {
+ StringBuilder builder = new StringBuilder();
+ ImmutableList.Builder bounds = ImmutableList.builder();
+ // Get low bound
+ String equationMark = (range.getLow().getBound() == Marker.Bound.EXACTLY) ? "= " : " ";
+ if (!range.getLow().isLowerUnbounded()) {
+ bounds.add(getMarkerValue(range.getLow()) + " <" + equationMark + columnName);
+ }
+ // Get high bound
+ equationMark = (range.getHigh().getBound() == Marker.Bound.EXACTLY) ? "= " : " ";
+ if (!range.getHigh().isUpperUnbounded()) {
+ bounds.add(columnName + " <" + equationMark + getMarkerValue(range.getHigh()));
+ }
+ // Use AND to combine bounds within the same range
+ builder.append("(").append(Joiner.on(" AND ").join(bounds.build())).append(")");
+ rangeConstraintList.add(builder.toString());
+ }
+ }
+ // Multiple ranges on the same column are OR'ed together.
+ String rangeConstraint = Joiner.on(" OR ").join(rangeConstraintList);
+ String discreteConstraint = getDiscretePredicate(true, columnName, singleValueRangeConstraintList);
+
+ return Stream.of(rangeConstraint, discreteConstraint)
+ .filter(s -> !s.isEmpty())
+ .collect(joining(" OR "));
+ },
+ discreteValues ->
+ {
+ /*
+ * For some types like {@link com.facebook.presto.type.ColorType} that are not orderable, discreteValues would appear here.
+ * For most regular types like boolean, char, number, the discrete values would be converted to singleValues in ranges above,
+ * and would not appear here. So far the column types supported by Pinot all fall in that category.
+ */
+ discreteConstraintList.addAll(discreteValues.getValues().stream().map(Object::toString).collect(toImmutableList()));
+ return getDiscretePredicate(discreteValues.isWhiteList(), columnName, discreteConstraintList);
+ },
+ allOrNone ->
+ {
+ // no-op
+ return "";
+ });
+ }
+
+ /**
+ * Construct the IN predicate for discrete values
+ *
+ * @param isWhitelist true for IN predicate, false for NOT IN predicate
+ * @param columnName name of the column
+ * @param discreteConstraintList list of allowed or not allowed values
+ * @return Stringified clause with IN or NOT IN
+ */
+ static String getDiscretePredicate(boolean isWhitelist, String columnName, List discreteConstraintList)
+ {
+ if (discreteConstraintList.size() == 0) {
+ return "";
+ }
+ return columnName + (isWhitelist ? " " : " NOT ") + "IN (" + Joiner.on(',').join(discreteConstraintList) + ")";
+ }
+
+ /**
+ * Get the value for the Marker.
+ *
+ * @param marker marker in the Domain
+ * @return Underlying value for the block in the marker. For string, encapsulating quotes will be added.
+ */
+ private static String getMarkerValue(Marker marker)
+ {
+ if (marker.getType() instanceof VarcharType) {
+ Block highBlock = marker.getValueBlock().get();
+ Slice slice = highBlock.getSlice(0, 0, highBlock.getSliceLength(0));
+ return QUOTE + slice.toStringUtf8() + QUOTE;
+ }
+ return marker.getValue().toString();
+ }
+
+ static String getTimePredicate(String tableType, String timeColumn, String maxTimeStamp)
+ {
+ if (CommonConstants.Helix.TableType.OFFLINE.toString().equalsIgnoreCase(tableType)) {
+ return String.format("%s < %s", timeColumn, maxTimeStamp);
+ }
+ if (CommonConstants.Helix.TableType.REALTIME.toString().equalsIgnoreCase(tableType)) {
+ return String.format("%s >= %s", timeColumn, maxTimeStamp);
+ }
+ return null;
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotScatterGatherQueryClient.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotScatterGatherQueryClient.java
new file mode 100644
index 0000000000000..3117cff46075e
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotScatterGatherQueryClient.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.PrestoException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import com.yammer.metrics.core.MetricsRegistry;
+import io.airlift.log.Logger;
+import io.airlift.units.Duration;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.response.ServerInstance;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.pql.parsers.Pql2Compiler;
+import org.apache.pinot.serde.SerDe;
+import org.apache.pinot.transport.common.CompositeFuture;
+import org.apache.pinot.transport.metrics.NettyClientMetrics;
+import org.apache.pinot.transport.netty.PooledNettyClientResourceManager;
+import org.apache.pinot.transport.pool.KeyedPool;
+import org.apache.pinot.transport.pool.KeyedPoolImpl;
+import org.apache.pinot.transport.scattergather.ScatterGather;
+import org.apache.pinot.transport.scattergather.ScatterGatherImpl;
+import org.apache.pinot.transport.scattergather.ScatterGatherRequest;
+import org.apache.pinot.transport.scattergather.ScatterGatherStats;
+import org.apache.thrift.protocol.TCompactProtocol;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_FAILURE_QUERYING_DATA;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * This class acts as the Pinot broker, fetches data from Pinot segments, gathers and returns the result.
+ * Many components were taken from ConnectionPoolBrokerRequestHandler
+ */
+public class PinotScatterGatherQueryClient
+{
+ private static final Logger log = Logger.get(PinotScatterGatherQueryClient.class);
+
+ private static final Pql2Compiler REQUEST_COMPILER = new Pql2Compiler();
+ private static final String PRESTO_HOST_PREFIX = "presto-pinot-master";
+ private static final boolean DEFAULT_EMIT_TABLE_LEVEL_METRICS = true;
+
+ private final AtomicLong requestIdGenerator;
+ private final String prestoHostId;
+ private final BrokerMetrics brokerMetrics;
+ private final ScatterGather scatterGatherer;
+
+ // Netty Specific
+ private EventLoopGroup eventLoopGroup;
+
+ private Duration connectionTimeout;
+
+ @Inject
+ public PinotScatterGatherQueryClient(PinotConfig pinotConfig)
+ {
+ requestIdGenerator = new AtomicLong(0);
+ prestoHostId = getDefaultPrestoId();
+
+ final MetricsRegistry registry = new MetricsRegistry();
+ brokerMetrics = new BrokerMetrics(registry, DEFAULT_EMIT_TABLE_LEVEL_METRICS);
+ brokerMetrics.initializeGlobalMeters();
+ eventLoopGroup = new NioEventLoopGroup();
+
+ /*
+ * Some of the client metrics uses histogram which is doing synchronous operation.
+ * These are fixed overhead per request/response.
+ * TODO: Measure the overhead of this.
+ */
+ final NettyClientMetrics clientMetrics = new NettyClientMetrics(registry, "presto_pinot_client_");
+
+ // Setup Netty Connection Pool
+ PooledNettyClientResourceManager resourceManager = new PooledNettyClientResourceManager(eventLoopGroup, new HashedWheelTimer(), clientMetrics);
+ // Connection Pool Related
+ ExecutorService requestSenderPool = Executors.newFixedThreadPool(pinotConfig.getThreadPoolSize());
+ ScheduledThreadPoolExecutor poolTimeoutExecutor = new ScheduledThreadPoolExecutor(pinotConfig.getCorePoolSize());
+ connectionTimeout = pinotConfig.getConnectionTimeout();
+ KeyedPool connectionPool = new KeyedPoolImpl<>(
+ pinotConfig.getMinConnectionsPerServer(),
+ pinotConfig.getMaxConnectionsPerServer(),
+ pinotConfig.getIdleTimeout().toMillis(),
+ pinotConfig.getMaxBacklogPerServer(),
+ resourceManager,
+ poolTimeoutExecutor,
+ requestSenderPool,
+ registry);
+
+ resourceManager.setPool(connectionPool);
+ // Setup ScatterGather
+ scatterGatherer = new ScatterGatherImpl(connectionPool, requestSenderPool);
+ }
+
+ private String getDefaultPrestoId()
+ {
+ String defaultBrokerId;
+ try {
+ defaultBrokerId = PRESTO_HOST_PREFIX + InetAddress.getLocalHost().getHostName();
+ }
+ catch (UnknownHostException e) {
+ log.error("Caught exception while getting default broker id", e);
+ defaultBrokerId = PRESTO_HOST_PREFIX;
+ }
+ return defaultBrokerId;
+ }
+
+ public Map queryPinotServerForDataTable(String pql, String serverHost, String segment)
+ {
+ long requestId = requestIdGenerator.incrementAndGet();
+ BrokerRequest brokerRequest;
+ try {
+ brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(pql);
+ }
+ catch (Exception e) {
+ throw new PrestoException(
+ PINOT_FAILURE_QUERYING_DATA,
+ String.format("Parsing error on requestId %d, PQL = %s", requestId, pql),
+ e);
+ }
+
+ ImmutableMap.Builder> routingTableBuilder = ImmutableMap.builder();
+ List segmentList = Arrays.asList(segment);
+ routingTableBuilder.put(serverHost, segmentList);
+ ScatterGatherRequest scatterRequest = new SimpleScatterGatherRequest(brokerRequest, routingTableBuilder.build(), 0, connectionTimeout.toMillis(), prestoHostId);
+
+ ScatterGatherStats scatterGatherStats = new ScatterGatherStats();
+ CompositeFuture compositeFuture = routeScatterGather(scatterRequest, scatterGatherStats);
+
+ if (compositeFuture == null) {
+ throw new PrestoException(
+ PINOT_FAILURE_QUERYING_DATA,
+ String.format("Failed to get data from table. PQL = %s.", pql));
+ }
+
+ ImmutableMap.Builder dataTableMapBuilder = ImmutableMap.builder();
+ ImmutableList.Builder processingExceptionsBuilder = ImmutableList.builder();
+ Map serverResponseMap = gatherServerResponses(compositeFuture, scatterGatherStats, true, brokerRequest.getQuerySource().getTableName(), processingExceptionsBuilder);
+
+ deserializeServerResponses(serverResponseMap, dataTableMapBuilder, brokerRequest.getQuerySource().getTableName(), processingExceptionsBuilder);
+ return dataTableMapBuilder.build();
+ }
+
+ /**
+ * Gather responses from servers, append processing exceptions to the processing exception list passed in.
+ *
+ * @param compositeFuture composite future returned from scatter phase.
+ * @param scatterGatherStats scatter-gather statistics.
+ * @param isOfflineTable whether the scatter-gather target is an OFFLINE table.
+ * @param tableNameWithType table name with type suffix.
+ * @param processingExceptionsBuilder list of processing exceptions.
+ * @return server response map.
+ */
+ private Map gatherServerResponses(
+ CompositeFuture compositeFuture,
+ ScatterGatherStats scatterGatherStats,
+ boolean isOfflineTable,
+ String tableNameWithType,
+ ImmutableList.Builder processingExceptionsBuilder)
+ {
+ try {
+ Map serverResponseMap = compositeFuture.get();
+ for (Map.Entry entry : serverResponseMap.entrySet()) {
+ checkState(entry.getValue().length > 0, "Got empty data for table: %s in server %s.", tableNameWithType, entry.getKey().getShortHostName());
+ }
+ Map responseTimes = compositeFuture.getResponseTimes();
+ scatterGatherStats.setResponseTimeMillis(responseTimes, isOfflineTable);
+ return serverResponseMap;
+ }
+ catch (Exception e) {
+ brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.RESPONSE_FETCH_EXCEPTIONS, 1L);
+ processingExceptionsBuilder.add(QueryException.getException(QueryException.BROKER_GATHER_ERROR, e));
+ throw new PrestoException(
+ PINOT_FAILURE_QUERYING_DATA,
+ String.format("Caught exception while fetching responses for table: %s. Processing Exceptions: %s", tableNameWithType, processingExceptionsBuilder.build().toString()),
+ e);
+ }
+ }
+
+ /**
+ * Deserialize the server responses, put the de-serialized data table into the data table map passed in, append
+ * processing exceptions to the processing exception list passed in.
+ * For hybrid use case, multiple responses might be from the same instance. Use response sequence to distinguish
+ * them.
+ *
+ * @param responseMap map from server to response.
+ * @param dataTableMapBuilder map from server to data table.
+ * @param tableNameWithType table name with type suffix.
+ * @param processingExceptionsBuilder list of processing exceptions.
+ */
+ private void deserializeServerResponses(
+ Map responseMap,
+ ImmutableMap.Builder dataTableMapBuilder,
+ String tableNameWithType,
+ ImmutableList.Builder processingExceptionsBuilder)
+ {
+ for (Map.Entry entry : responseMap.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
+ try {
+ dataTableMapBuilder.put(serverInstance, DataTableFactory.getDataTable(entry.getValue()));
+ }
+ catch (Exception e) {
+ brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS, 1L);
+ processingExceptionsBuilder.add(QueryException.getException(QueryException.DATA_TABLE_DESERIALIZATION_ERROR, e));
+ throw new PrestoException(
+ PINOT_FAILURE_QUERYING_DATA,
+ String.format("Caught exceptions while deserializing response for table: %s from server: %s", tableNameWithType, serverInstance),
+ e);
+ }
+ }
+ }
+
+ private CompositeFuture routeScatterGather(ScatterGatherRequest scatterRequest, ScatterGatherStats scatterGatherStats)
+ {
+ try {
+ return scatterGatherer.scatterGather(scatterRequest, scatterGatherStats, true, brokerMetrics);
+ }
+ catch (InterruptedException e) {
+ throw new PrestoException(
+ PINOT_FAILURE_QUERYING_DATA,
+ "Caught exception querying Pinot servers",
+ e);
+ }
+ }
+
+ private static class SimpleScatterGatherRequest
+ implements ScatterGatherRequest
+ {
+ private final BrokerRequest brokerRequest;
+ private final Map> routingTable;
+ private final long requestId;
+ private final long requestTimeoutMs;
+ private final String brokerId;
+
+ public SimpleScatterGatherRequest(BrokerRequest request, Map> routingTable, long requestId, long requestTimeoutMs, String brokerId)
+ {
+ this.brokerRequest = requireNonNull(request);
+ this.routingTable = requireNonNull(routingTable);
+ this.requestId = requireNonNull(requestId);
+ this.requestTimeoutMs = requireNonNull(requestTimeoutMs);
+ this.brokerId = requireNonNull(brokerId);
+ }
+
+ @Override
+ public Map> getRoutingTable()
+ {
+ return routingTable;
+ }
+
+ @Override
+ public byte[] getRequestForService(List segments)
+ {
+ InstanceRequest request = new InstanceRequest();
+ request.setRequestId(requestId);
+ request.setEnableTrace(brokerRequest.isEnableTrace());
+ request.setQuery(brokerRequest);
+ request.setSearchSegments(segments);
+ request.setBrokerId(brokerId);
+ return new SerDe(new TCompactProtocol.Factory()).serialize(request);
+ }
+
+ @Override
+ public long getRequestId()
+ {
+ return requestId;
+ }
+
+ @Override
+ public long getRequestTimeoutMs()
+ {
+ return requestTimeoutMs;
+ }
+
+ @Override
+ public BrokerRequest getBrokerRequest()
+ {
+ return brokerRequest;
+ }
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplit.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplit.java
new file mode 100644
index 0000000000000..2f1cd1c0fdf47
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplit.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class PinotSplit
+ implements ConnectorSplit
+{
+ private final String tableName;
+ private final String host;
+ private final String segment;
+ private final List addresses;
+ private final PinotColumn timeColumn;
+ private final String timeFilter;
+ private final String pinotFilter;
+ private final long limit;
+
+ @JsonCreator
+ public PinotSplit(
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("host") String host,
+ @JsonProperty("segment") String segment,
+ @JsonProperty("timeColumn") PinotColumn timeColumn,
+ @JsonProperty("timeFilter") String timeFilter,
+ @JsonProperty("pinotFilter") String pinotFilter,
+ @JsonProperty("limit") long limit)
+ {
+ this.tableName = requireNonNull(tableName, "table name is null");
+ this.host = requireNonNull(host, "host is null");
+ this.segment = requireNonNull(segment, "segment is null");
+ this.timeColumn = requireNonNull(timeColumn, "timeColumn is null");
+ this.addresses = null;
+ this.pinotFilter = pinotFilter;
+ this.timeFilter = timeFilter;
+ this.limit = limit;
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public String getHost()
+ {
+ return host;
+ }
+
+ @JsonProperty
+ public String getSegment()
+ {
+ return segment;
+ }
+
+ @JsonProperty
+ public PinotColumn getTimeColumn()
+ {
+ return timeColumn;
+ }
+
+ @JsonProperty
+ public String getTimeFilter()
+ {
+ return timeFilter;
+ }
+
+ @JsonProperty
+ public String getPinotFilter()
+ {
+ return pinotFilter;
+ }
+
+ @JsonProperty
+ public long getLimit()
+ {
+ return limit;
+ }
+
+ @Override
+ public boolean isRemotelyAccessible()
+ {
+ // only http or https is remotely accessible
+ return true;
+ }
+
+ @Override
+ public List getAddresses()
+ {
+ return addresses;
+ }
+
+ @Override
+ public Object getInfo()
+ {
+ return this;
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java
new file mode 100644
index 0000000000000..93e79254cd720
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotSplitManager.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.PrestoException;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.pinot.common.config.TableNameBuilder;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.facebook.presto.pinot.PinotErrorCode.PINOT_FAILURE_GETTING_TABLE;
+import static com.facebook.presto.pinot.PinotQueryBuilder.getColumnPredicate;
+import static com.facebook.presto.pinot.PinotQueryBuilder.getTimePredicate;
+import static com.facebook.presto.pinot.PinotUtils.TIME_COLUMN_NAME;
+import static com.facebook.presto.pinot.PinotUtils.TIME_COLUMN_VALUE;
+import static com.facebook.presto.pinot.PinotUtils.checkType;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class PinotSplitManager
+ implements ConnectorSplitManager
+{
+ private final PinotConnection pinotPrestoConnection;
+
+ @Inject
+ public PinotSplitManager(PinotConnection pinotPrestoConnection)
+ {
+ this.pinotPrestoConnection = requireNonNull(pinotPrestoConnection, "pinotPrestoConnection is null");
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableLayoutHandle layout,
+ SplitSchedulingContext splitSchedulingContext)
+ {
+ PinotTableLayoutHandle layoutHandle = checkType(layout, PinotTableLayoutHandle.class, "layout");
+ PinotTableHandle tableHandle = layoutHandle.getTable();
+ PinotTable table;
+ PinotColumn timeColumn;
+ Map>> routingTable;
+ Map timeBoundary;
+ try {
+ table = pinotPrestoConnection.getTable(tableHandle.getTableName());
+ timeColumn = pinotPrestoConnection.getPinotTimeColumnForTable(tableHandle.getTableName());
+ routingTable = pinotPrestoConnection.getRoutingTable(tableHandle.getTableName());
+ timeBoundary = pinotPrestoConnection.getTimeBoundary(tableHandle.getTableName());
+ // this can happen if table is removed during a query
+ checkState(table != null, "Table %s no longer exists", tableHandle.getTableName());
+ }
+ catch (Exception e) {
+ throw new PrestoException(
+ PINOT_FAILURE_GETTING_TABLE,
+ "Failed to fetch table status for Pinot table: " + tableHandle.getTableName(),
+ e);
+ }
+
+ List splits = new ArrayList<>();
+ if (!routingTable.isEmpty()) {
+ setSplits(splits, timeColumn, routingTable, timeBoundary, TableNameBuilder.OFFLINE.tableNameWithType(tableHandle.getTableName()), tableHandle.getConstraintSummary());
+ setSplits(splits, timeColumn, routingTable, timeBoundary, TableNameBuilder.REALTIME.tableNameWithType(tableHandle.getTableName()), tableHandle.getConstraintSummary());
+ }
+
+ Collections.shuffle(splits);
+
+ return new FixedSplitSource(splits);
+ }
+
+ /**
+ * Get the predicates for Pinot columns in string format, for constructing Pinot queries directly
+ * Note that for predicates like UDF (WHERE ROUND(fare) > 10), column comparison (WHERE colA - colB > 10, WHERE col/100 > 5),
+ * constraintSummary passed to Pinot will be empty, since those predicates would be in remainingExpression and not passed here.
+ *
+ * @param constraintSummary TupleDomain representing the allowed ranges for Pinot columns
+ * @return Predicate in Pinot Query Language for Pinot columns
+ */
+ @VisibleForTesting
+ static String getPinotPredicate(TupleDomain constraintSummary)
+ {
+ ImmutableList.Builder pinotFilterBuilder = ImmutableList.builder();
+
+ Map columnHandleDomainMap = constraintSummary.getDomains().get();
+ for (ColumnHandle k : columnHandleDomainMap.keySet()) {
+ Domain domain = columnHandleDomainMap.get(k);
+ String columnPredicate = getColumnPredicate(domain, ((PinotColumnHandle) k).getColumnName());
+ if (!columnPredicate.isEmpty()) {
+ pinotFilterBuilder.add("(" + columnPredicate + ")");
+ }
+ }
+ return Joiner.on(" AND ").join(pinotFilterBuilder.build());
+ }
+
+ private void setSplits(List splits, PinotColumn timeColumn, Map>> routingTable, Map timeBoundary, String tableName, TupleDomain constraintSummary)
+ {
+ String pinotFilter = getPinotPredicate(constraintSummary);
+ String timeFilter = "";
+ long limit = -1;
+ if (timeBoundary.containsKey(TIME_COLUMN_NAME) && timeBoundary.containsKey(TIME_COLUMN_VALUE)) {
+ timeFilter = getTimePredicate(TableNameBuilder.getTableTypeFromTableName(tableName).toString(), timeBoundary.get(TIME_COLUMN_NAME), timeBoundary.get(TIME_COLUMN_VALUE));
+ }
+ for (String routingTableName : routingTable.keySet()) {
+ if (routingTableName.equalsIgnoreCase(tableName)) {
+ Map> hostToSegmentsMap = routingTable.get(routingTableName);
+ for (String host : hostToSegmentsMap.keySet()) {
+ for (String segment : hostToSegmentsMap.get(host)) {
+ splits.add(new PinotSplit(routingTableName, host, segment, timeColumn, timeFilter, pinotFilter, limit));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTable.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTable.java
new file mode 100644
index 0000000000000..b25294e575498
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class PinotTable
+{
+ private final String name;
+ private final List columns;
+ private final List columnsMetadata;
+
+ @JsonCreator
+ public PinotTable(
+ @JsonProperty("name") String name,
+ @JsonProperty("columns") List columns)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or is empty");
+ this.name = requireNonNull(name, "name is null");
+ this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
+
+ ImmutableList.Builder columnsMetadata = ImmutableList.builder();
+ for (PinotColumn column : this.columns) {
+ columnsMetadata.add(new PinotColumnMetadata(column.getName(), column.getType()));
+ }
+ this.columnsMetadata = columnsMetadata.build();
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public List getColumns()
+ {
+ return columns;
+ }
+
+ public List getColumnsMetadata()
+ {
+ return columnsMetadata;
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java
new file mode 100644
index 0000000000000..44d55af122921
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableHandle.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public final class PinotTableHandle
+ implements ConnectorTableHandle
+{
+ private final String schemaName;
+ private final String tableName;
+ private TupleDomain constraintSummary;
+
+ @JsonCreator
+ public PinotTableHandle(
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName)
+ {
+ this.schemaName = requireNonNull(schemaName, "schemaName is null");
+ this.tableName = requireNonNull(tableName, "tableName is null");
+ }
+
+ @JsonProperty
+ public String getSchemaName()
+ {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ public TupleDomain getConstraintSummary()
+ {
+ return constraintSummary;
+ }
+
+ public void setConstraintSummary(TupleDomain constraintSummary)
+ {
+ this.constraintSummary = constraintSummary;
+ }
+
+ public SchemaTableName toSchemaTableName()
+ {
+ return new SchemaTableName(schemaName, tableName);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(schemaName, tableName);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ PinotTableHandle other = (PinotTableHandle) obj;
+ return Objects.equals(this.schemaName, other.schemaName) && Objects.equals(this.tableName, other.tableName);
+ }
+
+ @Override
+ public String toString()
+ {
+ return Joiner.on(":").join(schemaName, tableName);
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java
new file mode 100644
index 0000000000000..713f0b79d266e
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTableLayoutHandle.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class PinotTableLayoutHandle
+ implements ConnectorTableLayoutHandle
+{
+ private final PinotTableHandle table;
+
+ @JsonCreator
+ public PinotTableLayoutHandle(
+ @JsonProperty("table") PinotTableHandle table)
+ {
+ this.table = table;
+ }
+
+ @JsonProperty
+ public PinotTableHandle getTable()
+ {
+ return table;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PinotTableLayoutHandle that = (PinotTableLayoutHandle) o;
+ return Objects.equals(table, that.table);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(table);
+ }
+
+ @Override
+ public String toString()
+ {
+ return table.toString();
+ }
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java
new file mode 100644
index 0000000000000..5e0fe3f7b31e2
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum PinotTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
diff --git a/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotUtils.java b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotUtils.java
new file mode 100644
index 0000000000000..ee3d47e56f73d
--- /dev/null
+++ b/presto-pinot/src/main/java/com/facebook/presto/pinot/PinotUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+final class PinotUtils
+{
+ private PinotUtils()
+ {
+ }
+
+ public static final String TIME_COLUMN_NAME = "timeColumnName";
+ public static final String TIME_COLUMN_VALUE = "timeColumnValue";
+ public static final String QUOTE = "\"";
+
+ public static B checkType(A value, Class target, String name)
+ {
+ checkArgument(target.isInstance(requireNonNull(value)), "%s must be of type %s, not %s", name, target.getName(), value.getClass().getName());
+ return target.cast(value);
+ }
+}
diff --git a/presto-pinot/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin b/presto-pinot/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin
new file mode 100644
index 0000000000000..0a21e8a372311
--- /dev/null
+++ b/presto-pinot/src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin
@@ -0,0 +1 @@
+com.facebook.presto.pinot.PinotPlugin
diff --git a/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java
new file mode 100644
index 0000000000000..914cf78d04b03
--- /dev/null
+++ b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotColumnHandle.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import io.airlift.testing.EquivalenceTester;
+import org.testng.annotations.Test;
+
+import static com.facebook.presto.pinot.TestPinotMetadataUtil.COLUMN_CODEC;
+import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static org.testng.Assert.assertEquals;
+
+public class TestPinotColumnHandle
+{
+ private static final PinotColumnHandle columnHandle = new PinotColumnHandle("columnName", VARCHAR, 0);
+
+ @Test
+ public void testJsonRoundTrip()
+ {
+ String json = COLUMN_CODEC.toJson(columnHandle);
+ PinotColumnHandle copy = COLUMN_CODEC.fromJson(json);
+ assertEquals(copy, columnHandle);
+ }
+
+ @Test
+ public void testEquivalence()
+ {
+ EquivalenceTester
+ .equivalenceTester()
+ .addEquivalentGroup(
+ new PinotColumnHandle("columnName", VARCHAR, 0),
+ new PinotColumnHandle("columnName", VARCHAR, 0),
+ new PinotColumnHandle("columnName", BIGINT, 0),
+ new PinotColumnHandle("columnName", VARCHAR, 1))
+ .addEquivalentGroup(
+ new PinotColumnHandle("columnNameX", VARCHAR, 0),
+ new PinotColumnHandle("columnNameX", VARCHAR, 0),
+ new PinotColumnHandle("columnNameX", BIGINT, 0),
+ new PinotColumnHandle("columnNameX", VARCHAR, 1))
+ .check();
+ }
+}
diff --git a/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java
new file mode 100644
index 0000000000000..5607fff5abe0e
--- /dev/null
+++ b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.google.common.collect.ImmutableMap;
+import io.airlift.configuration.testing.ConfigAssertions;
+import io.airlift.units.Duration;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class TestPinotConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ ConfigAssertions.assertRecordedDefaults(ConfigAssertions.recordDefaults(PinotConfig.class)
+ .setZookeeperUrl(null)
+ .setPinotCluster(null)
+ .setControllerUrl(null)
+ .setIdleTimeout(new Duration(5, TimeUnit.MINUTES))
+ .setLimitAll(null)
+ .setMaxBacklogPerServer(null)
+ .setMaxConnectionsPerServer(null)
+ .setMinConnectionsPerServer(null)
+ .setCorePoolSize("50")
+ .setThreadPoolSize("64")
+ .setEstimatedSizeInBytesForNonNumericColumn(20)
+ .setConnectionTimeout(new Duration(1, TimeUnit.MINUTES)));
+ }
+
+ @Test
+ public void testExplicitPropertyMappings()
+ {
+ Map properties = new ImmutableMap.Builder()
+ .put("zookeeper-uri", "localhost:2181")
+ .put("pinot-cluster", "pinot")
+ .put("controller-url", "localhost:12345")
+ .put("idle-timeout", "1h")
+ .put("limit-all", "2147483646")
+ .put("max-backlog-per-server", "15")
+ .put("max-connections-per-server", "10")
+ .put("min-connections-per-server", "1")
+ .put("core-pool-size", "100")
+ .put("thread-pool-size", "101")
+ .put("estimated-size-in-bytes-for-non-numeric-column", "30")
+ .put("connection-timeout", "8m").build();
+
+ PinotConfig expected = new PinotConfig()
+ .setZookeeperUrl("localhost:2181")
+ .setPinotCluster("pinot")
+ .setControllerUrl("localhost:12345")
+ .setIdleTimeout(new Duration(1, TimeUnit.HOURS))
+ .setLimitAll("2147483646").setMaxBacklogPerServer("15")
+ .setMaxConnectionsPerServer("10")
+ .setMinConnectionsPerServer("1")
+ .setCorePoolSize("100")
+ .setThreadPoolSize("101")
+ .setEstimatedSizeInBytesForNonNumericColumn(30)
+ .setConnectionTimeout(new Duration(8, TimeUnit.MINUTES));
+
+ ConfigAssertions.assertFullMapping(properties, expected);
+ }
+}
diff --git a/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotMetadataUtil.java b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotMetadataUtil.java
new file mode 100644
index 0000000000000..980a7d9ccc264
--- /dev/null
+++ b/presto-pinot/src/test/java/com/facebook/presto/pinot/TestPinotMetadataUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.pinot;
+
+import com.facebook.presto.spi.type.StandardTypes;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.json.JsonCodec;
+import io.airlift.json.JsonCodecFactory;
+import io.airlift.json.ObjectMapperProvider;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.facebook.presto.spi.type.BigintType.BIGINT;
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.IntegerType.INTEGER;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static io.airlift.json.JsonCodec.listJsonCodec;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public final class TestPinotMetadataUtil
+{
+ public static final JsonCodec COLUMN_CODEC;
+
+ private static final JsonCodec