Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugin/trino-pinot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
<artifactId>guice</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<!-- Used for basic authentication only -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
Expand Down
139 changes: 38 additions & 101 deletions plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

Expand All @@ -30,22 +32,24 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;

import static io.airlift.units.DataSize.Unit.MEGABYTE;

@DefunctConfig({
"pinot.thread-pool-size",
"pinot.idle-timeout",
"pinot.max-backlog-per-server",
"pinot.max-connections-per-server",
"pinot.min-connections-per-server",
"pinot.request-timeout"
})
public class PinotConfig
{
private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();

private int maxConnectionsPerServer = 30;

private List<URI> controllerUrls = ImmutableList.of();

private Duration idleTimeout = new Duration(5, TimeUnit.MINUTES);
private Duration connectionTimeout = new Duration(1, TimeUnit.MINUTES);
private Duration requestTimeout = new Duration(30, TimeUnit.SECONDS);

private int threadPoolSize = 30;
private int minConnectionsPerServer = 10;
private int maxBacklogPerServer = 30;
private int estimatedSizeInBytesForNonNumericColumn = 20;
private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES);

Expand All @@ -54,10 +58,11 @@ public class PinotConfig
private int segmentsPerSplit = 1;
private int fetchRetryCount = 2;
private int nonAggregateLimitForBrokerQueries = 25_000;
private int maxRowsPerSplitForSegmentQueries = 50_000;
private int maxRowsForBrokerQueries = 50_000;
private boolean aggregationPushdownEnabled = true;
private boolean countDistinctPushdownEnabled = true;
private boolean grpcEnabled = true;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);

@NotNull
public List<URI> getControllerUrls()
Expand All @@ -74,72 +79,6 @@ public PinotConfig setControllerUrls(String controllerUrl)
return this;
}

@NotNull
public int getThreadPoolSize()
{
return threadPoolSize;
}

@Config("pinot.thread-pool-size")
public PinotConfig setThreadPoolSize(int threadPoolSize)
{
this.threadPoolSize = threadPoolSize;
return this;
}

@NotNull
public int getMinConnectionsPerServer()
{
return minConnectionsPerServer;
}

@Config("pinot.min-connections-per-server")
public PinotConfig setMinConnectionsPerServer(int minConnectionsPerServer)
{
this.minConnectionsPerServer = minConnectionsPerServer;
return this;
}

@NotNull
public int getMaxConnectionsPerServer()
{
return maxConnectionsPerServer;
}

@Config("pinot.max-connections-per-server")
public PinotConfig setMaxConnectionsPerServer(int maxConnectionsPerServer)
{
this.maxConnectionsPerServer = maxConnectionsPerServer;
return this;
}

@NotNull
public int getMaxBacklogPerServer()
{
return maxBacklogPerServer;
}

@Config("pinot.max-backlog-per-server")
public PinotConfig setMaxBacklogPerServer(int maxBacklogPerServer)
{
this.maxBacklogPerServer = maxBacklogPerServer;
return this;
}

@MinDuration("15s")
@NotNull
public Duration getIdleTimeout()
{
return idleTimeout;
}

@Config("pinot.idle-timeout")
public PinotConfig setIdleTimeout(Duration idleTimeout)
{
this.idleTimeout = idleTimeout;
return this;
}

@MinDuration("15s")
@NotNull
public Duration getConnectionTimeout()
Expand All @@ -154,20 +93,6 @@ public PinotConfig setConnectionTimeout(Duration connectionTimeout)
return this;
}

@MinDuration("15s")
@NotNull
public Duration getRequestTimeout()
{
return requestTimeout;
}

@Config("pinot.request-timeout")
public PinotConfig setRequestTimeout(Duration requestTimeout)
{
this.requestTimeout = requestTimeout;
return this;
}

@MinDuration("0s")
@NotNull
public Duration getMetadataCacheExpiry()
Expand Down Expand Up @@ -256,18 +181,6 @@ public PinotConfig setNonAggregateLimitForBrokerQueries(int nonAggregateLimitFor
return this;
}

public int getMaxRowsPerSplitForSegmentQueries()
{
return maxRowsPerSplitForSegmentQueries;
}

@Config("pinot.max-rows-per-split-for-segment-queries")
public PinotConfig setMaxRowsPerSplitForSegmentQueries(int maxRowsPerSplitForSegmentQueries)
{
this.maxRowsPerSplitForSegmentQueries = maxRowsPerSplitForSegmentQueries;
return this;
}

private static URI stringToUri(String server)
{
if (server.startsWith("http://") || server.startsWith("https://")) {
Expand Down Expand Up @@ -313,6 +226,30 @@ public PinotConfig setCountDistinctPushdownEnabled(boolean countDistinctPushdown
return this;
}

public boolean isGrpcEnabled()
{
return grpcEnabled;
}

@Config("pinot.grpc.enabled")
public PinotConfig setGrpcEnabled(boolean grpcEnabled)
{
this.grpcEnabled = grpcEnabled;
return this;
}

public DataSize getTargetSegmentPageSize()
{
return this.targetSegmentPageSize;
}

@Config("pinot.target-segment-page-size")
public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize)
{
this.targetSegmentPageSize = targetSegmentPageSize;
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.base.jmx.RebindSafeMBeanServer;
import io.trino.plugin.pinot.client.IdentityPinotHostMapper;
import io.trino.plugin.pinot.client.PinotClient;
import io.trino.plugin.pinot.client.PinotDataFetcher;
import io.trino.plugin.pinot.client.PinotGrpcDataFetcher;
import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientConfig;
import io.trino.plugin.pinot.client.PinotGrpcServerQueryClientTlsConfig;
import io.trino.plugin.pinot.client.PinotHostMapper;
import io.trino.plugin.pinot.client.PinotQueryClient;
import io.trino.plugin.pinot.client.PinotLegacyDataFetcher;
import io.trino.plugin.pinot.client.PinotLegacyServerQueryClientConfig;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.ConnectorNodePartitioningProvider;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -39,6 +44,7 @@

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.http.client.HttpClientBinder.httpClientBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
Expand All @@ -51,7 +57,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

public class PinotModule
implements Module
extends AbstractConfigurationAwareModule
{
private final String catalogName;
private final NodeManager nodeManager;
Expand All @@ -63,15 +69,14 @@ public PinotModule(String catalogName, NodeManager nodeManager)
}

@Override
public void configure(Binder binder)
public void setup(Binder binder)
{
configBinder(binder).bindConfig(PinotConfig.class);
binder.bind(PinotConnector.class).in(Scopes.SINGLETON);
binder.bind(PinotMetadata.class).in(Scopes.SINGLETON);
binder.bind(PinotSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(PinotClient.class).in(Scopes.SINGLETON);
binder.bind(PinotQueryClient.class).in(Scopes.SINGLETON);
binder.bind(ExecutorService.class).annotatedWith(ForPinot.class)
.toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName)));

Expand All @@ -95,6 +100,12 @@ public void configure(Binder binder)
binder.bind(NodeManager.class).toInstance(nodeManager);
binder.bind(ConnectorNodePartitioningProvider.class).to(PinotNodePartitioningProvider.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, PinotHostMapper.class).setDefault().to(IdentityPinotHostMapper.class).in(Scopes.SINGLETON);

install(conditionalModule(
PinotConfig.class,
config -> config.isGrpcEnabled(),
new PinotGrpcModule(),
new LegacyClientModule()));
}

public static final class DataSchemaDeserializer
Expand All @@ -118,4 +129,34 @@ public DataSchema deserialize(JsonParser p, DeserializationContext ctxt)
return new DataSchema(columnNames, columnTypes);
}
}

public static class PinotGrpcModule
extends AbstractConfigurationAwareModule
{
@Override
public void setup(Binder binder)
{
configBinder(binder).bindConfig(PinotGrpcServerQueryClientConfig.class);
binder.bind(PinotDataFetcher.Factory.class).to(PinotGrpcDataFetcher.Factory.class).in(Scopes.SINGLETON);
install(conditionalModule(
PinotGrpcServerQueryClientConfig.class,
config -> config.isUsePlainText(),
plainTextBinder -> plainTextBinder.bind(PinotGrpcDataFetcher.GrpcQueryClientFactory.class).to(PinotGrpcDataFetcher.PlainTextGrpcQueryClientFactory.class).in(Scopes.SINGLETON),
tlsBinder -> {
configBinder(tlsBinder).bindConfig(PinotGrpcServerQueryClientTlsConfig.class);
tlsBinder.bind(PinotGrpcDataFetcher.GrpcQueryClientFactory.class).to(PinotGrpcDataFetcher.TlsGrpcQueryClientFactory.class).in(Scopes.SINGLETON);
}));
}
}

public static class LegacyClientModule
extends AbstractConfigurationAwareModule
{
@Override
public void setup(Binder binder)
{
configBinder(binder).bindConfig(PinotLegacyServerQueryClientConfig.class);
binder.bind(PinotDataFetcher.Factory.class).to(PinotLegacyDataFetcher.Factory.class).in(Scopes.SINGLETON);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.trino.plugin.pinot;

import io.trino.plugin.pinot.client.PinotClient;
import io.trino.plugin.pinot.client.PinotQueryClient;
import io.trino.plugin.pinot.client.PinotDataFetcher;
import io.trino.plugin.pinot.query.DynamicTable;
import io.trino.plugin.pinot.query.PinotQueryInfo;
import io.trino.spi.connector.ColumnHandle;
Expand All @@ -38,24 +38,24 @@
public class PinotPageSourceProvider
implements ConnectorPageSourceProvider
{
private final PinotQueryClient pinotQueryClient;
private final PinotClient clusterInfoFetcher;
private final int limitForSegmentQueries;
private final int limitForBrokerQueries;
private final int estimatedNonNumericColumnSize;
private final long targetSegmentPageSizeBytes;
private final PinotDataFetcher.Factory pinotDataFetcherFactory;

@Inject
public PinotPageSourceProvider(
PinotConfig pinotConfig,
PinotClient clusterInfoFetcher,
PinotQueryClient pinotQueryClient)
PinotDataFetcher.Factory pinotDataFetcherFactory)
{
requireNonNull(pinotConfig, "pinotConfig is null");
this.pinotQueryClient = requireNonNull(pinotQueryClient, "pinotQueryClient is null");
this.clusterInfoFetcher = requireNonNull(clusterInfoFetcher, "clusterInfoFetcher is null");
this.limitForSegmentQueries = pinotConfig.getMaxRowsPerSplitForSegmentQueries();
this.pinotDataFetcherFactory = requireNonNull(pinotDataFetcherFactory, "pinotDataFetcherFactory is null");
this.limitForSegmentQueries = pinotDataFetcherFactory.getRowLimit();
this.limitForBrokerQueries = pinotConfig.getMaxRowsForBrokerQueries();
estimatedNonNumericColumnSize = pinotConfig.getEstimatedSizeInBytesForNonNumericColumn();
this.targetSegmentPageSizeBytes = pinotConfig.getTargetSegmentPageSize().toBytes();
}

@Override
Expand All @@ -80,14 +80,11 @@ public ConnectorPageSource createPageSource(

switch (pinotSplit.getSplitType()) {
case SEGMENT:
PinotDataFetcher pinotDataFetcher = pinotDataFetcherFactory.create(session, query, pinotSplit);
return new PinotSegmentPageSource(
session,
estimatedNonNumericColumnSize,
limitForSegmentQueries,
this.pinotQueryClient,
pinotSplit,
targetSegmentPageSizeBytes,
handles,
query);
pinotDataFetcher);
case BROKER:
PinotQueryInfo pinotQueryInfo;
if (pinotTableHandle.getQuery().isPresent()) {
Expand Down
Loading