diff --git a/plugin/trino-pinot/pom.xml b/plugin/trino-pinot/pom.xml index 7920cb92a220..a3fc84322c01 100755 --- a/plugin/trino-pinot/pom.xml +++ b/plugin/trino-pinot/pom.xml @@ -125,6 +125,11 @@ guice + + com.google.protobuf + protobuf-java + + com.squareup.okhttp3 diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index 26a7304f6a0c..9df8da6675d0 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -17,6 +17,8 @@ import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.DefunctConfig; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.airlift.units.MinDuration; @@ -30,22 +32,24 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; - +import static io.airlift.units.DataSize.Unit.MEGABYTE; + +@DefunctConfig({ + "pinot.thread-pool-size", + "pinot.idle-timeout", + "pinot.max-backlog-per-server", + "pinot.max-connections-per-server", + "pinot.min-connections-per-server", + "pinot.request-timeout" +}) public class PinotConfig { private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); - private int maxConnectionsPerServer = 30; - private List controllerUrls = ImmutableList.of(); - private Duration idleTimeout = new Duration(5, TimeUnit.MINUTES); private Duration connectionTimeout = new Duration(1, TimeUnit.MINUTES); - private Duration requestTimeout = new Duration(30, TimeUnit.SECONDS); - private int threadPoolSize = 30; - private int minConnectionsPerServer = 10; - private int maxBacklogPerServer = 30; private int estimatedSizeInBytesForNonNumericColumn = 20; private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES); @@ -54,10 +58,11 @@ public class PinotConfig private int segmentsPerSplit = 1; private int fetchRetryCount = 2; private int nonAggregateLimitForBrokerQueries = 25_000; - private int maxRowsPerSplitForSegmentQueries = 50_000; private int maxRowsForBrokerQueries = 50_000; private boolean aggregationPushdownEnabled = true; private boolean countDistinctPushdownEnabled = true; + private boolean grpcEnabled = true; + private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE); @NotNull public List getControllerUrls() @@ -74,72 +79,6 @@ public PinotConfig setControllerUrls(String controllerUrl) return this; } - @NotNull - public int getThreadPoolSize() - { - return threadPoolSize; - } - - @Config("pinot.thread-pool-size") - public PinotConfig setThreadPoolSize(int threadPoolSize) - { - this.threadPoolSize = threadPoolSize; - return this; - } - - @NotNull - public int getMinConnectionsPerServer() - { - return minConnectionsPerServer; - } - - @Config("pinot.min-connections-per-server") - public PinotConfig setMinConnectionsPerServer(int minConnectionsPerServer) - { - this.minConnectionsPerServer = minConnectionsPerServer; - return this; - } - - @NotNull - public int getMaxConnectionsPerServer() - { - return maxConnectionsPerServer; - } - - @Config("pinot.max-connections-per-server") - public PinotConfig setMaxConnectionsPerServer(int maxConnectionsPerServer) - { - this.maxConnectionsPerServer = maxConnectionsPerServer; - return this; - } - - @NotNull - public int getMaxBacklogPerServer() - { - return maxBacklogPerServer; - } - - @Config("pinot.max-backlog-per-server") - public PinotConfig setMaxBacklogPerServer(int maxBacklogPerServer) - { - this.maxBacklogPerServer = maxBacklogPerServer; - return this; - } - - @MinDuration("15s") - @NotNull - public Duration getIdleTimeout() - { - return idleTimeout; - } - - @Config("pinot.idle-timeout") - public PinotConfig setIdleTimeout(Duration idleTimeout) - { - this.idleTimeout = idleTimeout; - return this; - } - @MinDuration("15s") @NotNull public Duration getConnectionTimeout() @@ -154,20 +93,6 @@ public PinotConfig setConnectionTimeout(Duration connectionTimeout) return this; } - @MinDuration("15s") - @NotNull - public Duration getRequestTimeout() - { - return requestTimeout; - } - - @Config("pinot.request-timeout") - public PinotConfig setRequestTimeout(Duration requestTimeout) - { - this.requestTimeout = requestTimeout; - return this; - } - @MinDuration("0s") @NotNull public Duration getMetadataCacheExpiry() @@ -256,18 +181,6 @@ public PinotConfig setNonAggregateLimitForBrokerQueries(int nonAggregateLimitFor return this; } - public int getMaxRowsPerSplitForSegmentQueries() - { - return maxRowsPerSplitForSegmentQueries; - } - - @Config("pinot.max-rows-per-split-for-segment-queries") - public PinotConfig setMaxRowsPerSplitForSegmentQueries(int maxRowsPerSplitForSegmentQueries) - { - this.maxRowsPerSplitForSegmentQueries = maxRowsPerSplitForSegmentQueries; - return this; - } - private static URI stringToUri(String server) { if (server.startsWith("http://") || server.startsWith("https://")) { @@ -313,6 +226,30 @@ public PinotConfig setCountDistinctPushdownEnabled(boolean countDistinctPushdown return this; } + public boolean isGrpcEnabled() + { + return grpcEnabled; + } + + @Config("pinot.grpc.enabled") + public PinotConfig setGrpcEnabled(boolean grpcEnabled) + { + this.grpcEnabled = grpcEnabled; + return this; + } + + public DataSize getTargetSegmentPageSize() + { + return this.targetSegmentPageSize; + } + + @Config("pinot.target-segment-page-size") + public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize) + { + this.targetSegmentPageSize = targetSegmentPageSize; + return this; + } + @PostConstruct public void validate() { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java index f09c089211d3..e7709f8ba119 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotModule.java @@ -19,15 +19,20 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.inject.Binder; -import com.google.inject.Module; import com.google.inject.Scopes; +import io.airlift.configuration.AbstractConfigurationAwareModule; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.plugin.base.jmx.RebindSafeMBeanServer; import io.trino.plugin.pinot.client.IdentityPinotHostMapper; import io.trino.plugin.pinot.client.PinotClient; +import io.trino.plugin.pinot.client.PinotDataFetcher; +import io.trino.plugin.pinot.client.PinotGrpcDataFetcher; +import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientConfig; +import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientTlsConfig; import io.trino.plugin.pinot.client.PinotHostMapper; -import io.trino.plugin.pinot.client.PinotQueryClient; +import io.trino.plugin.pinot.client.PinotLegacyDataFetcher; +import io.trino.plugin.pinot.client.PinotLegacyServerQueryClientConfig; import io.trino.spi.NodeManager; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import org.apache.pinot.common.utils.DataSchema; @@ -39,6 +44,7 @@ import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.http.client.HttpClientBinder.httpClientBinder; import static io.airlift.json.JsonBinder.jsonBinder; @@ -51,7 +57,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; public class PinotModule - implements Module + extends AbstractConfigurationAwareModule { private final String catalogName; private final NodeManager nodeManager; @@ -63,7 +69,7 @@ public PinotModule(String catalogName, NodeManager nodeManager) } @Override - public void configure(Binder binder) + public void setup(Binder binder) { configBinder(binder).bindConfig(PinotConfig.class); binder.bind(PinotConnector.class).in(Scopes.SINGLETON); @@ -71,7 +77,6 @@ public void configure(Binder binder) binder.bind(PinotSplitManager.class).in(Scopes.SINGLETON); binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(PinotClient.class).in(Scopes.SINGLETON); - binder.bind(PinotQueryClient.class).in(Scopes.SINGLETON); binder.bind(ExecutorService.class).annotatedWith(ForPinot.class) .toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName))); @@ -95,6 +100,12 @@ public void configure(Binder binder) binder.bind(NodeManager.class).toInstance(nodeManager); binder.bind(ConnectorNodePartitioningProvider.class).to(PinotNodePartitioningProvider.class).in(Scopes.SINGLETON); newOptionalBinder(binder, PinotHostMapper.class).setDefault().to(IdentityPinotHostMapper.class).in(Scopes.SINGLETON); + + install(conditionalModule( + PinotConfig.class, + config -> config.isGrpcEnabled(), + new PinotGrpcModule(), + new LegacyClientModule())); } public static final class DataSchemaDeserializer @@ -118,4 +129,34 @@ public DataSchema deserialize(JsonParser p, DeserializationContext ctxt) return new DataSchema(columnNames, columnTypes); } } + + public static class PinotGrpcModule + extends AbstractConfigurationAwareModule + { + @Override + public void setup(Binder binder) + { + configBinder(binder).bindConfig(PinotGrpcServerQueryClientConfig.class); + binder.bind(PinotDataFetcher.Factory.class).to(PinotGrpcDataFetcher.Factory.class).in(Scopes.SINGLETON); + install(conditionalModule( + PinotGrpcServerQueryClientConfig.class, + config -> config.isUsePlainText(), + plainTextBinder -> plainTextBinder.bind(PinotGrpcDataFetcher.GrpcQueryClientFactory.class).to(PinotGrpcDataFetcher.PlainTextGrpcQueryClientFactory.class).in(Scopes.SINGLETON), + tlsBinder -> { + configBinder(tlsBinder).bindConfig(PinotGrpcServerQueryClientTlsConfig.class); + tlsBinder.bind(PinotGrpcDataFetcher.GrpcQueryClientFactory.class).to(PinotGrpcDataFetcher.TlsGrpcQueryClientFactory.class).in(Scopes.SINGLETON); + })); + } + } + + public static class LegacyClientModule + extends AbstractConfigurationAwareModule + { + @Override + public void setup(Binder binder) + { + configBinder(binder).bindConfig(PinotLegacyServerQueryClientConfig.class); + binder.bind(PinotDataFetcher.Factory.class).to(PinotLegacyDataFetcher.Factory.class).in(Scopes.SINGLETON); + } + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java index d69816bc321b..b4c4efe25bd4 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java @@ -14,7 +14,7 @@ package io.trino.plugin.pinot; import io.trino.plugin.pinot.client.PinotClient; -import io.trino.plugin.pinot.client.PinotQueryClient; +import io.trino.plugin.pinot.client.PinotDataFetcher; import io.trino.plugin.pinot.query.DynamicTable; import io.trino.plugin.pinot.query.PinotQueryInfo; import io.trino.spi.connector.ColumnHandle; @@ -38,24 +38,24 @@ public class PinotPageSourceProvider implements ConnectorPageSourceProvider { - private final PinotQueryClient pinotQueryClient; private final PinotClient clusterInfoFetcher; private final int limitForSegmentQueries; private final int limitForBrokerQueries; - private final int estimatedNonNumericColumnSize; + private final long targetSegmentPageSizeBytes; + private final PinotDataFetcher.Factory pinotDataFetcherFactory; @Inject public PinotPageSourceProvider( PinotConfig pinotConfig, PinotClient clusterInfoFetcher, - PinotQueryClient pinotQueryClient) + PinotDataFetcher.Factory pinotDataFetcherFactory) { requireNonNull(pinotConfig, "pinotConfig is null"); - this.pinotQueryClient = requireNonNull(pinotQueryClient, "pinotQueryClient is null"); this.clusterInfoFetcher = requireNonNull(clusterInfoFetcher, "clusterInfoFetcher is null"); - this.limitForSegmentQueries = pinotConfig.getMaxRowsPerSplitForSegmentQueries(); + this.pinotDataFetcherFactory = requireNonNull(pinotDataFetcherFactory, "pinotDataFetcherFactory is null"); + this.limitForSegmentQueries = pinotDataFetcherFactory.getRowLimit(); this.limitForBrokerQueries = pinotConfig.getMaxRowsForBrokerQueries(); - estimatedNonNumericColumnSize = pinotConfig.getEstimatedSizeInBytesForNonNumericColumn(); + this.targetSegmentPageSizeBytes = pinotConfig.getTargetSegmentPageSize().toBytes(); } @Override @@ -80,14 +80,11 @@ public ConnectorPageSource createPageSource( switch (pinotSplit.getSplitType()) { case SEGMENT: + PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(session, query, pinotSplit); return new PinotSegmentPageSource( - session, - estimatedNonNumericColumnSize, - limitForSegmentQueries, - this.pinotQueryClient, - pinotSplit, + targetSegmentPageSizeBytes, handles, - query); + pinotDataFetcher); case BROKER: PinotQueryInfo pinotQueryInfo; if (pinotTableHandle.getQuery().isPresent()) { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java index 5213508e5573..e0dc220caf8e 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSegmentPageSource.java @@ -13,18 +13,16 @@ */ package io.trino.plugin.pinot; -import com.google.common.collect.ImmutableMap; -import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.Slices; -import io.trino.plugin.pinot.client.PinotQueryClient; +import io.trino.plugin.pinot.client.PinotDataFetcher; +import io.trino.plugin.pinot.client.PinotDataTableWithSize; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ConnectorPageSource; -import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.Type; import io.trino.spi.type.VarbinaryType; import io.trino.spi.type.VarcharType; @@ -32,22 +30,14 @@ import org.apache.commons.codec.binary.Hex; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.common.utils.DataTable; -import org.apache.pinot.core.transport.ServerInstance; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE; import static java.lang.Float.floatToIntBits; import static java.lang.String.format; @@ -56,57 +46,28 @@ public class PinotSegmentPageSource implements ConnectorPageSource { - private static final Logger LOG = Logger.get(PinotSegmentPageSource.class); - private final List columnHandles; - private final PinotSplit split; - private final PinotQueryClient pinotQueryClient; - private final ConnectorSession session; - private final String query; - private final int limitForSegmentQueries; - private final AtomicLong currentRowCount = new AtomicLong(); - private final int estimatedNonNumericColumnSize; - private List columnTypes; - // dataTableList stores the dataTable returned from each server. Each dataTable is constructed to a Page, and then destroyed to save memory. - private LinkedList dataTableList = new LinkedList<>(); + private final List columnTypes; private long completedBytes; - private long readTimeNanos; private long estimatedMemoryUsageInBytes; private PinotDataTableWithSize currentDataTable; private boolean closed; - private boolean isPinotDataFetched; + private long targetSegmentPageSizeBytes; + private PinotDataFetcher pinotDataFetcher; public PinotSegmentPageSource( - ConnectorSession session, - int estimatedNonNumericColumnSize, - int limitForSegmentQueries, - PinotQueryClient pinotQueryClient, - PinotSplit split, + long targetSegmentPageSizeBytes, List columnHandles, - String query) + PinotDataFetcher pinotDataFetcher) { - this.limitForSegmentQueries = limitForSegmentQueries; - this.estimatedNonNumericColumnSize = estimatedNonNumericColumnSize; - this.split = requireNonNull(split, "split is null"); - this.pinotQueryClient = requireNonNull(pinotQueryClient, "pinotQueryClient is null"); this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); - this.session = requireNonNull(session, "session is null"); - this.query = requireNonNull(query, "query is null"); - } - - private static void checkExceptions(DataTable dataTable, PinotSplit split, String query) - { - Map metadata = dataTable.getMetadata(); - List exceptions = new ArrayList<>(); - metadata.forEach((k, v) -> { - if (k.startsWith(DataTable.EXCEPTION_METADATA_KEY)) { - exceptions.add(v); - } - }); - if (!exceptions.isEmpty()) { - throw new PinotException(PinotErrorCode.PINOT_EXCEPTION, Optional.of(query), format("Encountered %d pinot exceptions for split %s: %s", exceptions.size(), split, exceptions)); - } + this.columnTypes = columnHandles + .stream() + .map(columnHandle -> columnHandle.getDataType()) + .collect(Collectors.toList()); + this.targetSegmentPageSizeBytes = targetSegmentPageSizeBytes; + this.pinotDataFetcher = requireNonNull(pinotDataFetcher, "pinotDataFetcher is null"); } @Override @@ -118,7 +79,7 @@ public long getCompletedBytes() @Override public long getReadTimeNanos() { - return readTimeNanos; + return pinotDataFetcher.getReadTimeNanos(); } @Override @@ -133,7 +94,7 @@ public long getMemoryUsage() @Override public boolean isFinished() { - return closed || (isPinotDataFetched && dataTableList.isEmpty()); + return closed || (pinotDataFetcher.isDataFetched() && pinotDataFetcher.endOfData()); } /** @@ -146,87 +107,37 @@ public Page getNextPage() close(); return null; } - if (!isPinotDataFetched) { - fetchPinotData(); + if (!pinotDataFetcher.isDataFetched()) { + pinotDataFetcher.fetchData(); + estimatedMemoryUsageInBytes = pinotDataFetcher.getMemoryUsageBytes(); } - // To reduce memory usage, remove dataTable from dataTableList once it's processed. - if (currentDataTable != null) { - estimatedMemoryUsageInBytes -= currentDataTable.getEstimatedSizeInBytes(); - } - if (dataTableList.size() == 0) { + if (pinotDataFetcher.endOfData()) { close(); return null; } - currentDataTable = dataTableList.pop(); + long pageSizeBytes = 0L; PageBuilder pageBuilder = new PageBuilder(columnTypes); - // Note that declared positions in the Page should be the same with number of rows in each Block - pageBuilder.declarePositions(currentDataTable.getDataTable().getNumberOfRows()); - for (int columnHandleIdx = 0; columnHandleIdx < columnHandles.size(); columnHandleIdx++) { - BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(columnHandleIdx); - Type columnType = columnTypes.get(columnHandleIdx); - // Write a block for each column in the original order. - writeBlock(blockBuilder, columnType, columnHandleIdx); + while (!pinotDataFetcher.endOfData() && pageSizeBytes < targetSegmentPageSizeBytes) { + // To reduce memory usage, remove dataTable from dataTableList once it's processed. + if (currentDataTable != null) { + estimatedMemoryUsageInBytes -= currentDataTable.getEstimatedSizeInBytes(); + } + currentDataTable = pinotDataFetcher.getNextDataTable(); + estimatedMemoryUsageInBytes += currentDataTable.getEstimatedSizeInBytes(); + pageSizeBytes += currentDataTable.getEstimatedSizeInBytes(); + pageBuilder.declarePositions(currentDataTable.getDataTable().getNumberOfRows()); + for (int columnHandleIdx = 0; columnHandleIdx < columnHandles.size(); columnHandleIdx++) { + BlockBuilder blockBuilder = pageBuilder.getBlockBuilder(columnHandleIdx); + Type columnType = columnTypes.get(columnHandleIdx); + // Write a block for each column in the original order. + writeBlock(blockBuilder, columnType, columnHandleIdx); + } } return pageBuilder.build(); } - /** - * Fetch data from Pinot for the current split and store the data returned from each Pinot server. - */ - private void fetchPinotData() - { - long startTimeNanos = System.nanoTime(); - try { - Map dataTableMap = queryPinot(); - dataTableMap.values().stream() - // ignore empty tables and tables with 0 rows - .filter(table -> table != null && table.getNumberOfRows() > 0) - .forEach(dataTable -> - { - checkExceptions(dataTable, split, query); - checkTooManyRows(dataTable); - // Store each dataTable which will later be constructed into Pages. - // Also update estimatedMemoryUsage, mostly represented by the size of all dataTables, using numberOfRows and fieldTypes combined as an estimate - int estimatedTableSizeInBytes = IntStream.rangeClosed(0, dataTable.getDataSchema().size() - 1) - .map(i -> getEstimatedColumnSizeInBytes(dataTable.getDataSchema().getColumnDataType(i)) * dataTable.getNumberOfRows()) - .reduce(0, Integer::sum); - dataTableList.add(new PinotDataTableWithSize(dataTable, estimatedTableSizeInBytes)); - estimatedMemoryUsageInBytes += estimatedTableSizeInBytes; - }); - - this.columnTypes = columnHandles - .stream() - .map(columnHandle -> columnHandle.getDataType()) - .collect(Collectors.toList()); - isPinotDataFetched = true; - } - finally { - readTimeNanos += System.nanoTime() - startTimeNanos; - } - } - - private void checkTooManyRows(DataTable dataTable) - { - if (currentRowCount.addAndGet(dataTable.getNumberOfRows()) > limitForSegmentQueries) { - throw new PinotException(PINOT_EXCEPTION, Optional.of(query), format("Segment query returned '%s' rows per split, maximum allowed is '%s' rows.", currentRowCount.get(), limitForSegmentQueries)); - } - } - - private Map queryPinot() - { - String host = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host")); - LOG.info("Query '%s' on host '%s' for segment splits: %s", query, split.getSegmentHost(), split.getSegments()); - return ImmutableMap.copyOf( - pinotQueryClient.queryPinotServerForDataTable( - query, - host, - split.getSegments(), - PinotSessionProperties.getConnectionTimeout(session).toMillis(), - PinotSessionProperties.getPinotRetryCount(session))); - } - @Override public void close() { @@ -317,18 +228,18 @@ private void writeArrayBlock(BlockBuilder blockBuilder, Type columnType, int col } } - Type getType(int columnIndex) + private Type getType(int columnIndex) { checkArgument(columnIndex < columnHandles.size(), "Invalid field index"); return columnHandles.get(columnIndex).getDataType(); } - boolean getBoolean(int rowIdx, int columnIndex) + private boolean getBoolean(int rowIdx, int columnIndex) { return currentDataTable.getDataTable().getInt(rowIdx, columnIndex) != 0; } - long getLong(int rowIndex, int columnIndex) + private long getLong(int rowIndex, int columnIndex) { DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex); // Note columnType in the dataTable could be different from the original columnType in the columnHandle. @@ -348,7 +259,7 @@ long getLong(int rowIndex, int columnIndex) } } - double getDouble(int rowIndex, int columnIndex) + private double getDouble(int rowIndex, int columnIndex) { DataSchema.ColumnDataType dataType = currentDataTable.getDataTable().getDataSchema().getColumnDataType(columnIndex); if (dataType.equals(ColumnDataType.FLOAT)) { @@ -359,7 +270,7 @@ long getLong(int rowIndex, int columnIndex) } } - Block getArrayBlock(int rowIndex, int columnIndex) + private Block getArrayBlock(int rowIndex, int columnIndex) { Type trinoType = getType(columnIndex); Type elementType = trinoType.getTypeParameters().get(0); @@ -408,7 +319,7 @@ Block getArrayBlock(int rowIndex, int columnIndex) return blockBuilder.build(); } - Slice getSlice(int rowIndex, int columnIndex) + private Slice getSlice(int rowIndex, int columnIndex) { Type trinoType = getType(columnIndex); if (trinoType instanceof VarcharType) { @@ -421,7 +332,7 @@ else if (trinoType instanceof VarbinaryType) { return Slices.EMPTY_SLICE; } - static byte[] toBytes(String stringValue) + private static byte[] toBytes(String stringValue) { try { return Hex.decodeHex(stringValue.toCharArray()); @@ -431,58 +342,11 @@ static byte[] toBytes(String stringValue) } } - Slice getUtf8Slice(String value) + private Slice getUtf8Slice(String value) { if (isNullOrEmpty(value)) { return Slices.EMPTY_SLICE; } return Slices.utf8Slice(value); } - - /** - * Get estimated size in bytes for the Pinot column. - * Deterministic for numeric fields; use estimate for other types to save calculation. - * - * @param dataType FieldSpec.dataType for Pinot column. - * @return estimated size in bytes. - */ - private int getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType) - { - if (dataType.isNumber()) { - switch (dataType) { - case LONG: - return Long.BYTES; - case FLOAT: - return Float.BYTES; - case DOUBLE: - return Double.BYTES; - case INT: - default: - return Integer.BYTES; - } - } - return estimatedNonNumericColumnSize; - } - - private static class PinotDataTableWithSize - { - DataTable dataTable; - int estimatedSizeInBytes; - - PinotDataTableWithSize(DataTable dataTable, int estimatedSizeInBytes) - { - this.dataTable = dataTable; - this.estimatedSizeInBytes = estimatedSizeInBytes; - } - - DataTable getDataTable() - { - return dataTable; - } - - int getEstimatedSizeInBytes() - { - return estimatedSizeInBytes; - } - } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/IdentityPinotHostMapper.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/IdentityPinotHostMapper.java index ee2e040bef51..27328f02183d 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/IdentityPinotHostMapper.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/IdentityPinotHostMapper.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.pinot.client; +import com.google.common.net.HostAndPort; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.core.transport.ServerInstance; @@ -32,4 +33,11 @@ public ServerInstance getServerInstance(String serverHost) { return new ServerInstance(InstanceConfig.toInstanceConfig(serverHost)); } + + @Override + public HostAndPort getServerGrpcHostAndPort(String serverHost, int grpcPort) + { + ServerInstance serverInstance = getServerInstance(serverHost); + return HostAndPort.fromParts(serverInstance.getHostname(), grpcPort); + } } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java new file mode 100644 index 000000000000..460a1039559e --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataFetcher.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import io.trino.plugin.pinot.PinotException; +import io.trino.plugin.pinot.PinotSplit; +import io.trino.spi.connector.ConnectorSession; +import org.apache.pinot.common.utils.DataTable; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.pinot.common.utils.DataTable.EXCEPTION_METADATA_KEY; + +public interface PinotDataFetcher +{ + default void checkExceptions(DataTable dataTable, PinotSplit split, String query) + { + List exceptions = dataTable.getMetadata().entrySet().stream() + .filter(metadataEntry -> metadataEntry.getKey().startsWith(EXCEPTION_METADATA_KEY)) + .map(Map.Entry::getValue) + .collect(toImmutableList()); + if (!exceptions.isEmpty()) { + throw new PinotException(PINOT_EXCEPTION, Optional.of(query), format("Encountered %d pinot exceptions for split %s: %s", exceptions.size(), split, exceptions)); + } + } + + long getReadTimeNanos(); + + long getMemoryUsageBytes(); + + boolean endOfData(); + + boolean isDataFetched(); + + void fetchData(); + + PinotDataTableWithSize getNextDataTable(); + + class RowCountChecker + { + private final AtomicLong currentRowCount = new AtomicLong(); + private final int limit; + private final String query; + + public RowCountChecker(int limit, String query) + { + this.limit = limit; + this.query = requireNonNull(query, "query is null"); + } + + public void checkTooManyRows(DataTable dataTable) + { + if (currentRowCount.addAndGet(dataTable.getNumberOfRows()) > limit) { + throw new PinotException(PINOT_EXCEPTION, Optional.of(query), format("Segment query returned '%s' rows per split, maximum allowed is '%s' rows.", currentRowCount.get(), limit)); + } + } + } + + interface Factory + { + PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split); + + int getRowLimit(); + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataTableWithSize.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataTableWithSize.java new file mode 100644 index 000000000000..31881ee37d24 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotDataTableWithSize.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import org.apache.pinot.common.utils.DataTable; + +public class PinotDataTableWithSize +{ + private final DataTable dataTable; + private final long estimatedSizeInBytes; + + public PinotDataTableWithSize(DataTable dataTable, long estimatedSizeInBytes) + { + this.dataTable = dataTable; + this.estimatedSizeInBytes = estimatedSizeInBytes; + } + + public DataTable getDataTable() + { + return dataTable; + } + + public long getEstimatedSizeInBytes() + { + return estimatedSizeInBytes; + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java new file mode 100644 index 000000000000..eee143e123c4 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcDataFetcher.java @@ -0,0 +1,284 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closer; +import com.google.common.net.HostAndPort; +import io.trino.plugin.pinot.PinotErrorCode; +import io.trino.plugin.pinot.PinotException; +import io.trino.plugin.pinot.PinotSplit; +import io.trino.spi.connector.ConnectorSession; +import org.apache.pinot.common.proto.Server; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.grpc.GrpcQueryClient; +import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; +import org.apache.pinot.core.common.datatable.DataTableFactory; +import org.apache.pinot.spi.utils.CommonConstants.Query.Response.MetadataKeys; +import org.apache.pinot.spi.utils.CommonConstants.Query.Response.ResponseType; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; + +import javax.annotation.PreDestroy; +import javax.inject.Inject; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE; +import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.CONFIG_USE_PLAIN_TEXT; +import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.GRPC_TLS_PREFIX; + +public class PinotGrpcDataFetcher + implements PinotDataFetcher +{ + private final PinotSplit split; + private final PinotGrpcServerQueryClient pinotGrpcClient; + private final String query; + private long readTimeNanos; + private Iterator responseIterator; + private boolean isPinotDataFetched; + private final RowCountChecker rowCountChecker; + private long estimatedMemoryUsageInBytes; + + public PinotGrpcDataFetcher(PinotGrpcServerQueryClient pinotGrpcClient, PinotSplit split, String query, RowCountChecker rowCountChecker) + { + this.pinotGrpcClient = requireNonNull(pinotGrpcClient, "pinotGrpcClient is null"); + this.split = requireNonNull(split, "split is null"); + this.query = requireNonNull(query, "query is null"); + this.rowCountChecker = requireNonNull(rowCountChecker, "rowCountChecker is null"); + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public long getMemoryUsageBytes() + { + return estimatedMemoryUsageInBytes; + } + + @Override + public boolean endOfData() + { + return !responseIterator.hasNext(); + } + + @Override + public boolean isDataFetched() + { + return isPinotDataFetched; + } + + @Override + public void fetchData() + { + long startTimeNanos = System.nanoTime(); + String serverHost = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host")); + this.responseIterator = pinotGrpcClient.queryPinot(null, query, serverHost, split.getSegments()); + readTimeNanos += System.nanoTime() - startTimeNanos; + isPinotDataFetched = true; + } + + @Override + public PinotDataTableWithSize getNextDataTable() + { + PinotDataTableWithSize dataTableWithSize = responseIterator.next(); + estimatedMemoryUsageInBytes = dataTableWithSize.getEstimatedSizeInBytes(); + rowCountChecker.checkTooManyRows(dataTableWithSize.getDataTable()); + checkExceptions(dataTableWithSize.getDataTable(), split, query); + return dataTableWithSize; + } + + public static class Factory + implements PinotDataFetcher.Factory + { + private final PinotGrpcServerQueryClient queryClient; + private final int limitForSegmentQueries; + private final Closer closer = Closer.create(); + + @Inject + public Factory(PinotHostMapper pinotHostMapper, PinotGrpcServerQueryClientConfig pinotGrpcServerQueryClientConfig, GrpcQueryClientFactory grpcQueryClientFactory) + { + requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + requireNonNull(pinotGrpcServerQueryClientConfig, "pinotGrpcServerQueryClientConfig is null"); + this.limitForSegmentQueries = pinotGrpcServerQueryClientConfig.getMaxRowsPerSplitForSegmentQueries(); + this.queryClient = new PinotGrpcServerQueryClient(pinotHostMapper, pinotGrpcServerQueryClientConfig, grpcQueryClientFactory, closer); + } + + @PreDestroy + public void shutdown() + throws IOException + { + closer.close(); + } + + @Override + public PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split) + { + return new PinotGrpcDataFetcher(queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query)); + } + + @Override + public int getRowLimit() + { + return limitForSegmentQueries; + } + } + + public interface GrpcQueryClientFactory + { + GrpcQueryClient create(HostAndPort hostAndPort); + } + + public static class PlainTextGrpcQueryClientFactory + implements GrpcQueryClientFactory + { + private final GrpcQueryClient.Config config; + + @Inject + public PlainTextGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig) + { + requireNonNull(grpcClientConfig, "grpcClientConfig is null"); + this.config = new GrpcQueryClient.Config(ImmutableMap.builder() + .put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes())) + .put(CONFIG_USE_PLAIN_TEXT, String.valueOf(grpcClientConfig.isUsePlainText())) + .buildOrThrow()); + } + + @Override + public GrpcQueryClient create(HostAndPort hostAndPort) + { + return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); + } + } + + public static class TlsGrpcQueryClientFactory + implements GrpcQueryClientFactory + { + // Extracted from org.apache.pinot.common.utils.TlsUtils + private static final String KEYSTORE_TYPE = "keystore.type"; + private static final String KEYSTORE_PATH = "keystore.path"; + private static final String KEYSTORE_PASSWORD = "keystore.password"; + private static final String TRUSTSTORE_TYPE = "truststore.type"; + private static final String TRUSTSTORE_PATH = "truststore.path"; + private static final String TRUSTSTORE_PASSWORD = "truststore.password"; + private static final String SSL_PROVIDER = "ssl.provider"; + + private final GrpcQueryClient.Config config; + + @Inject + public TlsGrpcQueryClientFactory(PinotGrpcServerQueryClientConfig grpcClientConfig, PinotGrpcServerQueryClientTlsConfig tlsConfig) + { + requireNonNull(grpcClientConfig, "grpcClientConfig is null"); + requireNonNull(tlsConfig, "tlsConfig is null"); + this.config = new GrpcQueryClient.Config(ImmutableMap.builder() + .put(CONFIG_MAX_INBOUND_MESSAGE_BYTES_SIZE, String.valueOf(grpcClientConfig.getMaxInboundMessageSize().toBytes())) + .put(CONFIG_USE_PLAIN_TEXT, String.valueOf(grpcClientConfig.isUsePlainText())) + .put(GRPC_TLS_PREFIX + "." + KEYSTORE_TYPE, tlsConfig.getKeystoreType()) + .put(GRPC_TLS_PREFIX + "." + KEYSTORE_PATH, tlsConfig.getKeystorePath()) + .put(GRPC_TLS_PREFIX + "." + KEYSTORE_PASSWORD, tlsConfig.getKeystorePassword()) + .put(GRPC_TLS_PREFIX + "." + TRUSTSTORE_TYPE, tlsConfig.getTruststoreType()) + .put(GRPC_TLS_PREFIX + "." + TRUSTSTORE_PATH, tlsConfig.getTruststorePath()) + .put(GRPC_TLS_PREFIX + "." + TRUSTSTORE_PASSWORD, tlsConfig.getTruststorePassword()) + .put(GRPC_TLS_PREFIX + "." + SSL_PROVIDER, tlsConfig.getSslProvider()) + .buildOrThrow()); + } + + @Override + public GrpcQueryClient create(HostAndPort hostAndPort) + { + return new GrpcQueryClient(hostAndPort.getHost(), hostAndPort.getPort(), config); + } + } + + public static class PinotGrpcServerQueryClient + { + private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); + + private final PinotHostMapper pinotHostMapper; + private final Map clientCache = new ConcurrentHashMap<>(); + private final int grpcPort; + private final GrpcQueryClientFactory grpcQueryClientFactory; + private final Closer closer; + + private PinotGrpcServerQueryClient(PinotHostMapper pinotHostMapper, PinotGrpcServerQueryClientConfig pinotGrpcServerQueryClientConfig, GrpcQueryClientFactory grpcQueryClientFactory, Closer closer) + { + this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + requireNonNull(pinotGrpcServerQueryClientConfig, "pinotGrpcServerQueryClientConfig is null"); + this.grpcPort = pinotGrpcServerQueryClientConfig.getGrpcPort(); + this.grpcQueryClientFactory = requireNonNull(grpcQueryClientFactory, "grpcQueryClientFactory is null"); + this.closer = requireNonNull(closer, "closer is null"); + } + + public Iterator queryPinot(ConnectorSession session, String query, String serverHost, List segments) + { + HostAndPort mappedHostAndPort = pinotHostMapper.getServerGrpcHostAndPort(serverHost, grpcPort); + // GrpcQueryClient does not implement Closeable. The idle timeout is 30 minutes (grpc default). + GrpcQueryClient client = clientCache.computeIfAbsent(mappedHostAndPort, hostAndPort -> { + GrpcQueryClient queryClient = grpcQueryClientFactory.create(hostAndPort); + closer.register(queryClient::close); + return queryClient; + }); + BrokerRequest brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query); + GrpcRequestBuilder requestBuilder = new GrpcRequestBuilder() + .setSql(query) + .setSegments(segments) + .setEnableStreaming(true) + .setBrokerRequest(brokerRequest); + return new ResponseIterator(client.submit(requestBuilder.build())); + } + + public static class ResponseIterator + extends AbstractIterator + { + private final Iterator responseIterator; + + public ResponseIterator(Iterator responseIterator) + { + this.responseIterator = requireNonNull(responseIterator, "responseIterator is null"); + } + + @Override + protected PinotDataTableWithSize computeNext() + { + if (!responseIterator.hasNext()) { + return endOfData(); + } + Server.ServerResponse response = responseIterator.next(); + String responseType = response.getMetadataMap().get(MetadataKeys.RESPONSE_TYPE); + if (responseType.equals(ResponseType.METADATA)) { + return endOfData(); + } + ByteBuffer buffer = response.getPayload().asReadOnlyByteBuffer(); + try { + return new PinotDataTableWithSize(DataTableFactory.getDataTable(buffer), buffer.remaining()); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java new file mode 100644 index 000000000000..df0c1739a613 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import io.airlift.configuration.Config; +import io.airlift.units.DataSize; + +import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; + +public class PinotGrpcServerQueryClientConfig +{ + private int maxRowsPerSplitForSegmentQueries = Integer.MAX_VALUE - 1; + private int grpcPort = 8090; + private DataSize maxInboundMessageSize = DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE); + private boolean usePlainText = true; + + public int getMaxRowsPerSplitForSegmentQueries() + { + return maxRowsPerSplitForSegmentQueries; + } + + @Config("pinot.max-rows-per-split-for-segment-queries") + public PinotGrpcServerQueryClientConfig setMaxRowsPerSplitForSegmentQueries(int maxRowsPerSplitForSegmentQueries) + { + this.maxRowsPerSplitForSegmentQueries = maxRowsPerSplitForSegmentQueries; + return this; + } + + public int getGrpcPort() + { + return grpcPort; + } + + @Config("pinot.grpc.port") + public PinotGrpcServerQueryClientConfig setGrpcPort(int grpcPort) + { + this.grpcPort = grpcPort; + return this; + } + + public DataSize getMaxInboundMessageSize() + { + return maxInboundMessageSize; + } + + @Config("pinot.grpc.max-inbound-message-size") + public PinotGrpcServerQueryClientConfig setMaxInboundMessageSize(DataSize maxInboundMessageSize) + { + this.maxInboundMessageSize = maxInboundMessageSize; + return this; + } + + public boolean isUsePlainText() + { + return usePlainText; + } + + @Config("pinot.grpc.use-plain-text") + public PinotGrpcServerQueryClientConfig setUsePlainText(boolean usePlainText) + { + this.usePlainText = usePlainText; + return this; + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java new file mode 100644 index 000000000000..0d931b8b05f1 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotGrpcServerQueryClientTlsConfig.java @@ -0,0 +1,113 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import io.airlift.configuration.Config; + +import static io.trino.plugin.pinot.client.PinotKeystoreTrustStoreType.JKS; + +public class PinotGrpcServerQueryClientTlsConfig +{ + private PinotKeystoreTrustStoreType keystoreType = JKS; + private String keystorePath; + private String keystorePassword; + private PinotKeystoreTrustStoreType truststoreType = JKS; + private String truststorePath; + private String truststorePassword; + private String sslProvider = "JDK"; + + public PinotKeystoreTrustStoreType getKeystoreType() + { + return keystoreType; + } + + @Config("pinot.grpc.tls.keystore-type") + public PinotGrpcServerQueryClientTlsConfig setKeystoreType(PinotKeystoreTrustStoreType keystoreType) + { + this.keystoreType = keystoreType; + return this; + } + + public String getKeystorePath() + { + return keystorePath; + } + + @Config("pinot.grpc.tls.keystore-path") + public PinotGrpcServerQueryClientTlsConfig setKeystorePath(String keystorePath) + { + this.keystorePath = keystorePath; + return this; + } + + public String getKeystorePassword() + { + return keystorePassword; + } + + @Config("pinot.grpc.tls.keystore-password") + public PinotGrpcServerQueryClientTlsConfig setKeystorePassword(String keystorePassword) + { + this.keystorePassword = keystorePassword; + return this; + } + + public PinotKeystoreTrustStoreType getTruststoreType() + { + return truststoreType; + } + + @Config("pinot.grpc.tls.truststore-type") + public PinotGrpcServerQueryClientTlsConfig setTruststoreType(PinotKeystoreTrustStoreType truststoreType) + { + this.truststoreType = truststoreType; + return this; + } + + public String getTruststorePath() + { + return truststorePath; + } + + @Config("pinot.grpc.tls.truststore-path") + public PinotGrpcServerQueryClientTlsConfig setTruststorePath(String truststorePath) + { + this.truststorePath = truststorePath; + return this; + } + + public String getTruststorePassword() + { + return truststorePassword; + } + + @Config("pinot.grpc.tls.truststore-password") + public PinotGrpcServerQueryClientTlsConfig setTruststorePassword(String truststorePassword) + { + this.truststorePassword = truststorePassword; + return this; + } + + public String getSslProvider() + { + return sslProvider; + } + + @Config("pinot.grpc.tls.ssl-provider") + public PinotGrpcServerQueryClientTlsConfig setSslProvider(String sslProvider) + { + this.sslProvider = sslProvider; + return this; + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotHostMapper.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotHostMapper.java index d199daf53cae..76b43f14882d 100644 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotHostMapper.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotHostMapper.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.pinot.client; +import com.google.common.net.HostAndPort; import org.apache.pinot.core.transport.ServerInstance; public interface PinotHostMapper @@ -20,4 +21,6 @@ public interface PinotHostMapper String getBrokerHost(String host, String port); ServerInstance getServerInstance(String serverHost); + + HostAndPort getServerGrpcHostAndPort(String serverHost, int grpcPort); } diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotKeystoreTrustStoreType.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotKeystoreTrustStoreType.java new file mode 100644 index 000000000000..036d622d1755 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotKeystoreTrustStoreType.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +public enum PinotKeystoreTrustStoreType +{ + JKS, + PKCS12, +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java new file mode 100644 index 000000000000..1825fe90cda3 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyDataFetcher.java @@ -0,0 +1,300 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import com.google.common.collect.ImmutableList; +import io.airlift.log.Logger; +import io.trino.plugin.pinot.PinotConfig; +import io.trino.plugin.pinot.PinotErrorCode; +import io.trino.plugin.pinot.PinotException; +import io.trino.plugin.pinot.PinotSessionProperties; +import io.trino.plugin.pinot.PinotSplit; +import io.trino.spi.connector.ConnectorSession; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.PinotMetricUtils; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.transport.AsyncQueryResponse; +import org.apache.pinot.core.transport.QueryRouter; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.core.transport.ServerResponse; +import org.apache.pinot.core.transport.ServerRoutingInstance; +import org.apache.pinot.spi.metrics.PinotMetricsRegistry; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.sql.parsers.SqlCompilationException; + +import javax.inject.Inject; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.IntStream; + +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; +import static io.trino.plugin.pinot.PinotErrorCode.PINOT_INVALID_PQL_GENERATED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class PinotLegacyDataFetcher + implements PinotDataFetcher +{ + private static final Logger LOG = Logger.get(PinotLegacyDataFetcher.class); + + private final ConnectorSession session; + private final PinotLegacyServerQueryClient pinotQueryClient; + private final PinotSplit split; + + private final String query; + private final LinkedList dataTableList = new LinkedList<>(); + private final RowCountChecker rowCountChecker; + private long readTimeNanos; + private long estimatedMemoryUsageInBytes; + private boolean isPinotDataFetched; + + public PinotLegacyDataFetcher(ConnectorSession session, PinotLegacyServerQueryClient pinotQueryClient, PinotSplit split, String query, RowCountChecker rowCountChecker) + { + this.session = requireNonNull(session, "session is null"); + this.pinotQueryClient = requireNonNull(pinotQueryClient, "pinotQueryClient is null"); + this.split = requireNonNull(split, "split is null"); + this.query = requireNonNull(query, "query is null"); + this.rowCountChecker = requireNonNull(rowCountChecker, "rowCountChecker is null"); + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public long getMemoryUsageBytes() + { + return estimatedMemoryUsageInBytes; + } + + @Override + public boolean endOfData() + { + return dataTableList.isEmpty(); + } + + @Override + public boolean isDataFetched() + { + return isPinotDataFetched; + } + + @Override + public void fetchData() + { + long startTimeNanos = System.nanoTime(); + try { + queryPinot().forEachRemaining(dataTableWithSize -> { + checkExceptions(dataTableWithSize.getDataTable(), split, query); + rowCountChecker.checkTooManyRows(dataTableWithSize.getDataTable()); + dataTableList.add(dataTableWithSize); + estimatedMemoryUsageInBytes += dataTableWithSize.getEstimatedSizeInBytes(); + }); + + isPinotDataFetched = true; + } + finally { + readTimeNanos += System.nanoTime() - startTimeNanos; + } + } + + @Override + public PinotDataTableWithSize getNextDataTable() + { + PinotDataTableWithSize dataTableWithSize = dataTableList.pop(); + estimatedMemoryUsageInBytes -= dataTableWithSize.getEstimatedSizeInBytes(); + return dataTableWithSize; + } + + private Iterator queryPinot() + { + String host = split.getSegmentHost().orElseThrow(() -> new PinotException(PinotErrorCode.PINOT_INVALID_PQL_GENERATED, Optional.empty(), "Expected the segment split to contain the host")); + LOG.debug("Query '%s' on host '%s' for segment splits: %s", query, split.getSegmentHost(), split.getSegments()); + return pinotQueryClient.queryPinot( + session, + query, + host, + split.getSegments()); + } + + public static class Factory + implements PinotDataFetcher.Factory + { + private final PinotLegacyServerQueryClient queryClient; + private final int limitForSegmentQueries; + + @Inject + public Factory(PinotHostMapper pinotHostMapper, PinotConfig pinotConfig, PinotLegacyServerQueryClientConfig pinotLegacyServerQueryClientConfig) + { + requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + requireNonNull(pinotConfig, "pinotConfig is null"); + this.limitForSegmentQueries = requireNonNull(pinotLegacyServerQueryClientConfig, "pinotLegacyServerQueryClientConfig is null").getMaxRowsPerSplitForSegmentQueries(); + this.queryClient = new PinotLegacyServerQueryClient(pinotHostMapper, pinotConfig); + } + + @Override + public PinotDataFetcher create(ConnectorSession session, String query, PinotSplit split) + { + return new PinotLegacyDataFetcher(session, queryClient, split, query, new RowCountChecker(limitForSegmentQueries, query)); + } + + @Override + public int getRowLimit() + { + return limitForSegmentQueries; + } + } + + public static class PinotLegacyServerQueryClient + { + private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); + private static final String TRINO_HOST_PREFIX = "trino-pinot-master"; + + private final String trinoHostId; + private final BrokerMetrics brokerMetrics; + private final QueryRouter queryRouter; + private final PinotHostMapper pinotHostMapper; + private final AtomicLong requestIdGenerator = new AtomicLong(); + private final int estimatedNonNumericColumnSize; + + public PinotLegacyServerQueryClient(PinotHostMapper pinotHostMapper, PinotConfig pinotConfig) + { + requireNonNull(pinotConfig, "pinotConfig is null"); + trinoHostId = getDefaultTrinoId(); + this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null"); + this.estimatedNonNumericColumnSize = pinotConfig.getEstimatedSizeInBytesForNonNumericColumn(); + PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry(); + this.brokerMetrics = new BrokerMetrics(registry); + brokerMetrics.initializeGlobalMeters(); + queryRouter = new QueryRouter(trinoHostId, brokerMetrics); + } + + private static String getDefaultTrinoId() + { + String defaultBrokerId; + try { + defaultBrokerId = TRINO_HOST_PREFIX + InetAddress.getLocalHost().getHostName(); + } + catch (UnknownHostException e) { + defaultBrokerId = TRINO_HOST_PREFIX; + } + return defaultBrokerId; + } + + public Iterator queryPinot(ConnectorSession session, String query, String serverHost, List segments) + { + long connectionTimeoutInMillis = PinotSessionProperties.getConnectionTimeout(session).toMillis(); + int pinotRetryCount = PinotSessionProperties.getPinotRetryCount(session); + // TODO: separate into offline and realtime methods + BrokerRequest brokerRequest; + try { + brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query); + } + catch (SqlCompilationException e) { + throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e); + } + ServerInstance serverInstance = pinotHostMapper.getServerInstance(serverHost); + Map> routingTable = new HashMap<>(); + routingTable.put(serverInstance, new ArrayList<>(segments)); + String tableName = brokerRequest.getQuerySource().getTableName(); + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + Map> offlineRoutingTable = TableNameBuilder.isOfflineTableResource(tableName) ? routingTable : null; + Map> realtimeRoutingTable = TableNameBuilder.isRealtimeTableResource(tableName) ? routingTable : null; + BrokerRequest offlineBrokerRequest = TableNameBuilder.isOfflineTableResource(tableName) ? brokerRequest : null; + BrokerRequest realtimeBrokerRequest = TableNameBuilder.isRealtimeTableResource(tableName) ? brokerRequest : null; + AsyncQueryResponse asyncQueryResponse = + doWithRetries(pinotRetryCount, requestId -> queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, connectionTimeoutInMillis)); + try { + Map response = asyncQueryResponse.getResponse(); + ImmutableList.Builder pinotDataTableWithSizeBuilder = ImmutableList.builder(); + for (Map.Entry entry : response.entrySet()) { + ServerResponse serverResponse = entry.getValue(); + DataTable dataTable = serverResponse.getDataTable(); + // ignore empty tables and tables with 0 rows + if (dataTable != null && dataTable.getNumberOfRows() > 0) { + // Store each dataTable which will later be constructed into Pages. + // Also update estimatedMemoryUsage, mostly represented by the size of all dataTables, using numberOfRows and fieldTypes combined as an estimate + long estimatedTableSizeInBytes = IntStream.rangeClosed(0, dataTable.getDataSchema().size() - 1) + .mapToLong(i -> getEstimatedColumnSizeInBytes(dataTable.getDataSchema().getColumnDataType(i)) * dataTable.getNumberOfRows()) + .reduce(0, Long::sum); + pinotDataTableWithSizeBuilder.add(new PinotDataTableWithSize(dataTable, estimatedTableSizeInBytes)); + } + } + return pinotDataTableWithSizeBuilder.build().iterator(); + } + catch (InterruptedException e) { + throw new PinotException(PINOT_EXCEPTION, Optional.of(query), "Pinot query execution was interrupted", e); + } + } + + private T doWithRetries(int retries, Function caller) + { + PinotException firstError = null; + for (int i = 0; i < retries; ++i) { + try { + return caller.apply(requestIdGenerator.getAndIncrement()); + } + catch (PinotException e) { + if (firstError == null) { + firstError = e; + } + if (!e.isRetryable()) { + throw e; + } + } + } + throw firstError; + } + + /** + * Get estimated size in bytes for the Pinot column. + * Deterministic for numeric fields; use estimate for other types to save calculation. + * + * @param dataType FieldSpec.dataType for Pinot column. + * @return estimated size in bytes. + */ + private long getEstimatedColumnSizeInBytes(DataSchema.ColumnDataType dataType) + { + if (dataType.isNumber()) { + switch (dataType) { + case LONG: + return Long.BYTES; + case FLOAT: + return Float.BYTES; + case DOUBLE: + return Double.BYTES; + case INT: + default: + return Integer.BYTES; + } + } + return estimatedNonNumericColumnSize; + } + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyServerQueryClientConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyServerQueryClientConfig.java new file mode 100644 index 000000000000..42e1f274ed28 --- /dev/null +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotLegacyServerQueryClientConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.client; + +import io.airlift.configuration.Config; + +public class PinotLegacyServerQueryClientConfig +{ + private int maxRowsPerSplitForSegmentQueries = 50_000; + + public int getMaxRowsPerSplitForSegmentQueries() + { + return maxRowsPerSplitForSegmentQueries; + } + + @Config("pinot.max-rows-per-split-for-segment-queries") + public PinotLegacyServerQueryClientConfig setMaxRowsPerSplitForSegmentQueries(int maxRowsPerSplitForSegmentQueries) + { + this.maxRowsPerSplitForSegmentQueries = maxRowsPerSplitForSegmentQueries; + return this; + } +} diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotQueryClient.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotQueryClient.java deleted file mode 100644 index d4fc30e170c1..000000000000 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/client/PinotQueryClient.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.pinot.client; - -import io.trino.plugin.pinot.PinotException; -import org.apache.helix.model.InstanceConfig; -import org.apache.pinot.common.metrics.BrokerMetrics; -import org.apache.pinot.common.metrics.PinotMetricUtils; -import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.utils.DataTable; -import org.apache.pinot.core.transport.AsyncQueryResponse; -import org.apache.pinot.core.transport.QueryRouter; -import org.apache.pinot.core.transport.ServerInstance; -import org.apache.pinot.core.transport.ServerResponse; -import org.apache.pinot.core.transport.ServerRoutingInstance; -import org.apache.pinot.spi.metrics.PinotMetricsRegistry; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.sql.parsers.CalciteSqlCompiler; -import org.apache.pinot.sql.parsers.SqlCompilationException; - -import javax.inject.Inject; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; - -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION; -import static io.trino.plugin.pinot.PinotErrorCode.PINOT_INVALID_PQL_GENERATED; -import static java.lang.String.format; -import static java.util.Objects.requireNonNull; - -public class PinotQueryClient -{ - private static final CalciteSqlCompiler REQUEST_COMPILER = new CalciteSqlCompiler(); - private static final String TRINO_HOST_PREFIX = "trino-pinot-master"; - private static final String SERVER_INSTANCE_PREFIX = "Server"; - - private final String trinoHostId; - private final BrokerMetrics brokerMetrics; - private final QueryRouter queryRouter; - private final PinotHostMapper pinotHostMapper; - private final AtomicLong requestIdGenerator = new AtomicLong(); - - @Inject - public PinotQueryClient(PinotHostMapper pinotHostMapper) - { - trinoHostId = getDefaultTrinoId(); - this.pinotHostMapper = requireNonNull(pinotHostMapper, "pinotHostMapper is null"); - PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry(); - this.brokerMetrics = new BrokerMetrics(registry); - brokerMetrics.initializeGlobalMeters(); - queryRouter = new QueryRouter(trinoHostId, brokerMetrics); - } - - private static String getDefaultTrinoId() - { - String defaultBrokerId; - try { - defaultBrokerId = TRINO_HOST_PREFIX + InetAddress.getLocalHost().getHostName(); - } - catch (UnknownHostException e) { - defaultBrokerId = TRINO_HOST_PREFIX; - } - return defaultBrokerId; - } - - public Map queryPinotServerForDataTable(String query, String serverHost, List segments, long connectionTimeoutInMillis, int pinotRetryCount) - { - // TODO: separate into offline and realtime methods - BrokerRequest brokerRequest; - try { - brokerRequest = REQUEST_COMPILER.compileToBrokerRequest(query); - } - catch (SqlCompilationException e) { - throw new PinotException(PINOT_INVALID_PQL_GENERATED, Optional.of(query), format("Parsing error with on %s, Error = %s", serverHost, e.getMessage()), e); - } - ServerInstance serverInstance = pinotHostMapper.getServerInstance(serverHost); - Map> routingTable = new HashMap<>(); - routingTable.put(serverInstance, new ArrayList<>(segments)); - String tableName = brokerRequest.getQuerySource().getTableName(); - String rawTableName = TableNameBuilder.extractRawTableName(tableName); - Map> offlineRoutingTable = TableNameBuilder.isOfflineTableResource(tableName) ? routingTable : null; - Map> realtimeRoutingTable = TableNameBuilder.isRealtimeTableResource(tableName) ? routingTable : null; - BrokerRequest offlineBrokerRequest = TableNameBuilder.isOfflineTableResource(tableName) ? brokerRequest : null; - BrokerRequest realtimeBrokerRequest = TableNameBuilder.isRealtimeTableResource(tableName) ? brokerRequest : null; - AsyncQueryResponse asyncQueryResponse = - doWithRetries(pinotRetryCount, requestId -> queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, connectionTimeoutInMillis)); - try { - Map response = asyncQueryResponse.getResponse(); - Map dataTableMap = new HashMap<>(); - for (Map.Entry entry : response.entrySet()) { - ServerResponse serverResponse = entry.getValue(); - DataTable dataTable = serverResponse.getDataTable(); - dataTableMap.put(toServerInstance(entry.getKey()), dataTable); - } - return dataTableMap; - } - catch (InterruptedException e) { - throw new PinotException(PINOT_EXCEPTION, Optional.of(query), "Pinot query execution was interrupted", e); - } - } - - private static ServerInstance toServerInstance(ServerRoutingInstance serverRoutingInstance) - { - return new ServerInstance(InstanceConfig.toInstanceConfig(format("%s_%s_%s", SERVER_INSTANCE_PREFIX, serverRoutingInstance.getHostname(), serverRoutingInstance.getPort()))); - } - - private T doWithRetries(int retries, Function caller) - { - PinotException firstError = null; - for (int i = 0; i < retries; ++i) { - try { - return caller.apply(requestIdGenerator.getAndIncrement()); - } - catch (PinotException e) { - if (firstError == null) { - firstError = e; - } - if (!e.isRetryable()) { - throw e; - } - } - } - throw firstError; - } -} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java index 38454afc92b4..77749be289de 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/AbstractPinotIntegrationSmokeTest.java @@ -122,6 +122,11 @@ public abstract class AbstractPinotIntegrationSmokeTest protected abstract boolean isSecured(); + protected boolean isGrpcEnabled() + { + return true; + } + protected String getPinotImageName() { return PINOT_PREVIOUS_IMAGE_NAME; @@ -500,7 +505,7 @@ protected QueryRunner createQueryRunner() ImmutableMap.of(), pinotProperties(pinot), Optional.of(binder -> newOptionalBinder(binder, PinotHostMapper.class).setBinding() - .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort())))); + .toInstance(new TestingPinotHostMapper(pinot.getBrokerHostAndPort(), pinot.getServerHostAndPort(), pinot.getServerGrpcHostAndPort())))); } private Map pinotProperties(TestingPinotCluster pinot) @@ -515,6 +520,11 @@ private Map pinotProperties(TestingPinotCluster pinot) protected Map additionalPinotProperties() { + if (isGrpcEnabled()) { + return ImmutableMap.builder() + .put("pinot.grpc.enabled", "true") + .buildOrThrow(); + } return ImmutableMap.of(); } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index 2d1cabf7484f..fcd75036696d 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -15,12 +15,14 @@ import com.google.common.collect.ImmutableMap; import io.airlift.configuration.testing.ConfigAssertions; +import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.testng.annotations.Test; import java.util.Map; import java.util.concurrent.TimeUnit; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestPinotConfig @@ -31,11 +33,6 @@ public void testDefaults() ConfigAssertions.assertRecordedDefaults( ConfigAssertions.recordDefaults(PinotConfig.class) .setControllerUrls("") - .setIdleTimeout(new Duration(5, TimeUnit.MINUTES)) - .setMaxBacklogPerServer(30) - .setMaxConnectionsPerServer(30) - .setMinConnectionsPerServer(10) - .setThreadPoolSize(30) .setEstimatedSizeInBytesForNonNumericColumn(20) .setConnectionTimeout(new Duration(1, TimeUnit.MINUTES)) .setMetadataCacheExpiry(new Duration(2, TimeUnit.MINUTES)) @@ -44,11 +41,11 @@ public void testDefaults() .setFetchRetryCount(2) .setForbidSegmentQueries(false) .setNonAggregateLimitForBrokerQueries(25_000) - .setRequestTimeout(new Duration(30, TimeUnit.SECONDS)) - .setMaxRowsPerSplitForSegmentQueries(50_000) .setMaxRowsForBrokerQueries(50_000) .setAggregationPushdownEnabled(true) - .setCountDistinctPushdownEnabled(true)); + .setCountDistinctPushdownEnabled(true) + .setGrpcEnabled(true) + .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))); } @Test @@ -56,11 +53,6 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("pinot.controller-urls", "host1:1111,host2:1111") - .put("pinot.idle-timeout", "1h") - .put("pinot.max-backlog-per-server", "15") - .put("pinot.max-connections-per-server", "10") - .put("pinot.min-connections-per-server", "1") - .put("pinot.thread-pool-size", "100") .put("pinot.estimated-size-in-bytes-for-non-numeric-column", "30") .put("pinot.connection-timeout", "8m") .put("pinot.metadata-expiry", "1m") @@ -69,20 +61,15 @@ public void testExplicitPropertyMappings() .put("pinot.fetch-retry-count", "3") .put("pinot.non-aggregate-limit-for-broker-queries", "10") .put("pinot.forbid-segment-queries", "true") - .put("pinot.request-timeout", "1m") - .put("pinot.max-rows-per-split-for-segment-queries", "10") .put("pinot.max-rows-for-broker-queries", "5000") .put("pinot.aggregation-pushdown.enabled", "false") .put("pinot.count-distinct-pushdown.enabled", "false") + .put("pinot.grpc.enabled", "false") + .put("pinot.target-segment-page-size", "2MB") .buildOrThrow(); PinotConfig expected = new PinotConfig() .setControllerUrls("host1:1111,host2:1111") - .setIdleTimeout(new Duration(1, TimeUnit.HOURS)) - .setMaxBacklogPerServer(15) - .setMaxConnectionsPerServer(10) - .setMinConnectionsPerServer(1) - .setThreadPoolSize(100) .setEstimatedSizeInBytesForNonNumericColumn(30) .setConnectionTimeout(new Duration(8, TimeUnit.MINUTES)) .setMetadataCacheExpiry(new Duration(1, TimeUnit.MINUTES)) @@ -91,11 +78,11 @@ public void testExplicitPropertyMappings() .setFetchRetryCount(3) .setNonAggregateLimitForBrokerQueries(10) .setForbidSegmentQueries(true) - .setRequestTimeout(new Duration(1, TimeUnit.MINUTES)) - .setMaxRowsPerSplitForSegmentQueries(10) .setMaxRowsForBrokerQueries(5000) .setAggregationPushdownEnabled(false) - .setCountDistinctPushdownEnabled(false); + .setCountDistinctPushdownEnabled(false) + .setGrpcEnabled(false) + .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java new file mode 100644 index 000000000000..4375d828a076 --- /dev/null +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientConfig.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.google.common.collect.ImmutableMap; +import io.airlift.configuration.testing.ConfigAssertions; +import io.airlift.units.DataSize; +import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientConfig; +import org.testng.annotations.Test; + +import java.util.Map; + +import static org.apache.pinot.common.utils.grpc.GrpcQueryClient.Config.DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE; + +public class TestPinotGrpcServerQueryClientConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults( + ConfigAssertions.recordDefaults(PinotGrpcServerQueryClientConfig.class) + .setMaxRowsPerSplitForSegmentQueries(Integer.MAX_VALUE - 1) + .setGrpcPort(8090) + .setUsePlainText(true) + .setMaxInboundMessageSize(DataSize.ofBytes(DEFAULT_MAX_INBOUND_MESSAGE_BYTES_SIZE))); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("pinot.max-rows-per-split-for-segment-queries", "10") + .put("pinot.grpc.port", "8091") + .put("pinot.grpc.use-plain-text", "false") + .put("pinot.grpc.max-inbound-message-size", String.valueOf(DataSize.ofBytes(1))) + .buildOrThrow(); + PinotGrpcServerQueryClientConfig expected = new PinotGrpcServerQueryClientConfig() + .setMaxRowsPerSplitForSegmentQueries(10) + .setGrpcPort(8091) + .setUsePlainText(false) + .setMaxInboundMessageSize(DataSize.ofBytes(1)); + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientTlsConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientTlsConfig.java new file mode 100644 index 000000000000..35dd18068086 --- /dev/null +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotGrpcServerQueryClientTlsConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.google.common.collect.ImmutableMap; +import io.airlift.configuration.testing.ConfigAssertions; +import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientTlsConfig; +import org.testng.annotations.Test; + +import java.util.Map; + +import static io.trino.plugin.pinot.client.PinotKeystoreTrustStoreType.JKS; +import static io.trino.plugin.pinot.client.PinotKeystoreTrustStoreType.PKCS12; + +public class TestPinotGrpcServerQueryClientTlsConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults( + ConfigAssertions.recordDefaults(PinotGrpcServerQueryClientTlsConfig.class) + .setKeystoreType(JKS) + .setKeystorePath(null) + .setKeystorePassword(null) + .setTruststoreType(JKS) + .setTruststorePath(null) + .setTruststorePassword(null) + .setSslProvider("JDK")); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("pinot.grpc.tls.keystore-type", "PKCS12") + .put("pinot.grpc.tls.keystore-path", "/root") + .put("pinot.grpc.tls.keystore-password", "password") + .put("pinot.grpc.tls.truststore-type", "PKCS12") + .put("pinot.grpc.tls.truststore-path", "/root") + .put("pinot.grpc.tls.truststore-password", "password") + .put("pinot.grpc.tls.ssl-provider", "OPENSSL") + .buildOrThrow(); + PinotGrpcServerQueryClientTlsConfig expected = new PinotGrpcServerQueryClientTlsConfig() + .setKeystoreType(PKCS12) + .setKeystorePath("/root") + .setKeystorePassword("password") + .setTruststoreType(PKCS12) + .setTruststorePath("/root") + .setTruststorePassword("password") + .setSslProvider("OPENSSL"); + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotLegacyServerQueryClientConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotLegacyServerQueryClientConfig.java new file mode 100644 index 000000000000..1ece0b27ce12 --- /dev/null +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotLegacyServerQueryClientConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import com.google.common.collect.ImmutableMap; +import io.airlift.configuration.testing.ConfigAssertions; +import io.trino.plugin.pinot.client.PinotLegacyServerQueryClientConfig; +import org.testng.annotations.Test; + +import java.util.Map; + +public class TestPinotLegacyServerQueryClientConfig +{ + @Test + public void testDefaults() + { + ConfigAssertions.assertRecordedDefaults( + ConfigAssertions.recordDefaults(PinotLegacyServerQueryClientConfig.class) + .setMaxRowsPerSplitForSegmentQueries(50_000)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = ImmutableMap.builder() + .put("pinot.max-rows-per-split-for-segment-queries", "10") + .buildOrThrow(); + PinotLegacyServerQueryClientConfig expected = new PinotLegacyServerQueryClientConfig() + .setMaxRowsPerSplitForSegmentQueries(10); + ConfigAssertions.assertFullMapping(properties, expected); + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java new file mode 100644 index 000000000000..979ed75d5c8b --- /dev/null +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot; + +import static io.trino.plugin.pinot.TestingPinotCluster.PINOT_LATEST_IMAGE_NAME; + +public class TestPinotWithoutAuthenticationIntegrationSmokeTestLatestVersionNoGrpc + extends AbstractPinotIntegrationSmokeTest +{ + @Override + protected boolean isSecured() + { + return false; + } + + @Override + protected String getPinotImageName() + { + return PINOT_LATEST_IMAGE_NAME; + } + + @Override + protected boolean isGrpcEnabled() + { + return false; + } +} diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java index 07fa01bf5a1b..a98b87b3cd72 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotCluster.java @@ -78,6 +78,7 @@ public class TestingPinotCluster public static final int BROKER_PORT = 8099; public static final int SERVER_ADMIN_PORT = 8097; public static final int SERVER_PORT = 8098; + public static final int GRPC_PORT = 8090; private final GenericContainer controller; private final GenericContainer broker; @@ -123,7 +124,7 @@ public TestingPinotCluster(Network network, boolean secured, String pinotImageNa .withEnv("JAVA_OPTS", "-Xmx512m -Dlog4j2.configurationFile=/opt/pinot/conf/pinot-server-log4j2.xml -Dplugins.dir=/opt/pinot/plugins") .withCommand("StartServer", "-clusterName", "pinot", "-zkAddress", getZookeeperInternalHostPort(), "-configFileName", "/var/pinot/server/config/pinot-server.conf") .withNetworkAliases("pinot-server", "localhost") - .withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT); + .withExposedPorts(SERVER_PORT, SERVER_ADMIN_PORT, GRPC_PORT); closer.register(server::stop); this.secured = secured; @@ -164,6 +165,11 @@ public HostAndPort getServerHostAndPort() return HostAndPort.fromParts(server.getContainerIpAddress(), server.getMappedPort(SERVER_PORT)); } + public HostAndPort getServerGrpcHostAndPort() + { + return HostAndPort.fromParts(server.getContainerIpAddress(), server.getMappedPort(GRPC_PORT)); + } + public void createSchema(InputStream tableSchemaSpec, String tableName) throws Exception { diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotHostMapper.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotHostMapper.java index 168213da70c6..61ae71a8b1f9 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotHostMapper.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestingPinotHostMapper.java @@ -26,11 +26,13 @@ public class TestingPinotHostMapper { private final HostAndPort brokerHostAndPort; private final HostAndPort serverHostAndPort; + private final HostAndPort serverGrpcHostAndPort; - public TestingPinotHostMapper(HostAndPort brokerHostAndPort, HostAndPort serverHostAndPort) + public TestingPinotHostMapper(HostAndPort brokerHostAndPort, HostAndPort serverHostAndPort, HostAndPort serverGrpcHostAndPort) { this.brokerHostAndPort = requireNonNull(brokerHostAndPort, "brokerHostAndPort is null"); this.serverHostAndPort = requireNonNull(serverHostAndPort, "serverHostAndPort is null"); + this.serverGrpcHostAndPort = requireNonNull(serverGrpcHostAndPort, "serverGrpcHostAndPort is null"); } @Override @@ -47,4 +49,10 @@ public ServerInstance getServerInstance(String serverHost) instanceConfig.setPort(String.valueOf(serverHostAndPort.getPort())); return new ServerInstance(instanceConfig); } + + @Override + public HostAndPort getServerGrpcHostAndPort(String serverHost, int grpcPort) + { + return serverGrpcHostAndPort; + } } diff --git a/plugin/trino-pinot/src/test/resources/pinot-server/pinot-server.conf b/plugin/trino-pinot/src/test/resources/pinot-server/pinot-server.conf index 5a3c6b109c30..642d46031e11 100644 --- a/plugin/trino-pinot/src/test/resources/pinot-server/pinot-server.conf +++ b/plugin/trino-pinot/src/test/resources/pinot-server/pinot-server.conf @@ -3,3 +3,4 @@ pinot.server.adminapi.port=8097 pinot.server.instance.dataDir=/var/pinot/server/data/index pinot.server.instance.segmentTarDir=/var/pinot/server/data/segment pinot.set.instance.id.to.hostname=true +pinot.server.grpc.enable=true