Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum PinotErrorCode
implements ErrorCodeSupplier
{
PINOT_UNSUPPORTED_COLUMN_TYPE(0, EXTERNAL), // schema issues
PINOT_AMBIGUOUS_TABLE_NAME(1, EXTERNAL), // Duplicate case insensitive table name
PINOT_INSUFFICIENT_SERVER_RESPONSE(2, EXTERNAL), // numServersResponded < numServersQueried
PINOT_EXCEPTION(3, EXTERNAL), // Exception reported by pinot
PINOT_HTTP_ERROR(4, EXTERNAL), // Some non okay http error code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.trino.collect.cache.NonEvictableLoadingCache;
import io.trino.plugin.base.aggregation.AggregateFunctionRewriter;
import io.trino.plugin.base.aggregation.AggregateFunctionRule;
Expand Down Expand Up @@ -50,7 +49,6 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.expression.Variable;
import io.trino.spi.predicate.Domain;
Expand All @@ -66,7 +64,7 @@
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand All @@ -89,14 +87,9 @@ public class PinotMetadata
implements ConnectorMetadata
{
public static final String PINOT_COLUMN_NAME_PROPERTY = "pinotColumnName";

private static final Logger log = Logger.get(PinotMetadata.class);

private static final Object ALL_TABLES_CACHE_KEY = new Object();
private static final String SCHEMA_NAME = "default";
public static final String SCHEMA_NAME = "default";

private final NonEvictableLoadingCache<String, List<ColumnMetadata>> pinotTableColumnCache;
private final NonEvictableLoadingCache<Object, List<String>> allTablesCache;
private final int maxRowsPerBrokerQuery;
private final AggregateFunctionRewriter<AggregateExpression, Void> aggregateFunctionRewriter;
private final ImplementCountDistinct implementCountDistinct;
Expand All @@ -106,15 +99,11 @@ public class PinotMetadata
public PinotMetadata(
PinotClient pinotClient,
PinotConfig pinotConfig,
@ForPinot Executor executor)
@ForPinot ExecutorService executor)
{
requireNonNull(pinotConfig, "pinot config");
this.pinotClient = requireNonNull(pinotClient, "pinotClient is null");
long metadataCacheExpiryMillis = pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS);
this.allTablesCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS),
asyncReloading(CacheLoader.from(pinotClient::getAllTables), executor));
this.pinotTableColumnCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS),
Expand All @@ -129,7 +118,6 @@ public List<ColumnMetadata> load(String tableName)
}
}, executor));

executor.execute(() -> this.allTablesCache.refresh(ALL_TABLES_CACHE_KEY));
this.maxRowsPerBrokerQuery = pinotConfig.getMaxRowsForBrokerQueries();
Function<String, String> identifierQuote = identity(); // TODO identifier quoting not needed here?
this.implementCountDistinct = new ImplementCountDistinct(identifierQuote);
Expand Down Expand Up @@ -158,15 +146,11 @@ public PinotTableHandle getTableHandle(ConnectorSession session, SchemaTableName
DynamicTable dynamicTable = DynamicTableBuilder.buildFromPql(this, tableName, pinotClient);
return new PinotTableHandle(tableName.getSchemaName(), dynamicTable.getTableName(), TupleDomain.all(), OptionalLong.empty(), Optional.of(dynamicTable));
}

try {
String pinotTableName = getPinotTableNameFromTrinoTableName(tableName.getTableName());
return new PinotTableHandle(tableName.getSchemaName(), pinotTableName);
}
catch (TableNotFoundException e) {
log.debug(e, "Table not found: %s", tableName);
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableNameIfExists(tableName.getTableName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It might be helpful to use a name like toRemoteTableName and toRemoteSchemaName since many other connectors use those method names and it'll make it easier for readers to figure out what this is trying to do.

I don't have a strong opinion on this though.

if (pinotTableName == null) {
return null;
}
return new PinotTableHandle(tableName.getSchemaName(), pinotTableName);
}

@Override
Expand All @@ -192,11 +176,11 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaNameOrNull)
{
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
for (String table : getPinotTableNames()) {
ImmutableSet.Builder<SchemaTableName> builder = ImmutableSet.builder();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this silently swallow colliding names?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will push an update shortly that returns colliding table names. It is based off of #9098.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed. This is based off of #9098

for (String table : pinotClient.getPinotTableNames()) {
builder.add(new SchemaTableName(SCHEMA_NAME, table));
}
return builder.build();
return ImmutableList.copyOf(builder.build());
}

@Override
Expand Down Expand Up @@ -494,15 +478,10 @@ private static PinotColumnHandle resolveAggregateExpressionWithAlias(PinotColumn
@VisibleForTesting
public List<ColumnMetadata> getColumnsMetadata(String tableName)
{
String pinotTableName = getPinotTableNameFromTrinoTableName(tableName);
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableName(tableName);
return getFromCache(pinotTableColumnCache, pinotTableName);
}

private List<String> getPinotTableNames()
{
return getFromCache(allTablesCache, ALL_TABLES_CACHE_KEY);
}

private static <K, V> V getFromCache(LoadingCache<K, V> cache, K key)
{
try {
Expand All @@ -513,22 +492,6 @@ private static <K, V> V getFromCache(LoadingCache<K, V> cache, K key)
}
}

private String getPinotTableNameFromTrinoTableName(String trinoTableName)
{
List<String> allTables = getPinotTableNames();
String pinotTableName = null;
for (String candidate : allTables) {
if (trinoTableName.equalsIgnoreCase(candidate)) {
pinotTableName = candidate;
break;
}
}
if (pinotTableName == null) {
throw new TableNotFoundException(new SchemaTableName(SCHEMA_NAME, trinoTableName));
}
return pinotTableName;
}

private Map<String, ColumnHandle> getDynamicTableColumnHandles(PinotTableHandle pinotTableHandle)
{
checkState(pinotTableHandle.getQuery().isPresent(), "dynamic table not present");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import javax.management.MBeanServer;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.threadsNamed;
Expand Down Expand Up @@ -72,7 +72,7 @@ public void configure(Binder binder)
binder.bind(PinotPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(PinotClient.class).in(Scopes.SINGLETON);
binder.bind(PinotQueryClient.class).in(Scopes.SINGLETON);
binder.bind(Executor.class).annotatedWith(ForPinot.class)
binder.bind(ExecutorService.class).annotatedWith(ForPinot.class)
.toInstance(newCachedThreadPool(threadsNamed("pinot-metadata-fetcher-" + catalogName)));

binder.bind(PinotSessionProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.OptionalLong;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class PinotTableHandle
Expand All @@ -52,7 +51,7 @@ public PinotTableHandle(

{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null").toLowerCase(ENGLISH);
this.tableName = requireNonNull(tableName, "tableName is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.limit = requireNonNull(limit, "limit is null");
this.query = requireNonNull(query, "query is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
Expand All @@ -46,6 +48,8 @@
import io.trino.plugin.pinot.auth.PinotControllerAuthenticationProvider;
import io.trino.plugin.pinot.query.PinotQueryInfo;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.spi.data.Schema;
Expand All @@ -55,12 +59,14 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -70,6 +76,7 @@
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.cache.CacheLoader.asyncReloading;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.net.HttpHeaders.ACCEPT;
Expand All @@ -81,13 +88,16 @@
import static io.airlift.json.JsonCodec.listJsonCodec;
import static io.airlift.json.JsonCodec.mapJsonCodec;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_AMBIGUOUS_TABLE_NAME;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_EXCEPTION;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_INVALID_CONFIGURATION;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNABLE_TO_FIND_BROKER;
import static io.trino.plugin.pinot.PinotMetadata.SCHEMA_NAME;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.function.UnaryOperator.identity;
import static java.util.stream.Collectors.joining;
import static org.apache.pinot.spi.utils.builder.TableNameBuilder.extractRawTableName;

public class PinotClient
Expand All @@ -97,6 +107,7 @@ public class PinotClient
private static final Pattern BROKER_PATTERN = Pattern.compile("Broker_(.*)_(\\d+)");
private static final String TIME_BOUNDARY_NOT_FOUND_ERROR_CODE = "404";
private static final JsonCodec<Map<String, Map<String, List<String>>>> ROUTING_TABLE_CODEC = mapJsonCodec(String.class, mapJsonCodec(String.class, listJsonCodec(String.class)));
private static final Object ALL_TABLES_CACHE_KEY = new Object();
private static final JsonCodec<QueryRequest> QUERY_REQUEST_JSON_CODEC = jsonCodec(QueryRequest.class);

private static final String GET_ALL_TABLES_API_TEMPLATE = "tables";
Expand All @@ -111,6 +122,7 @@ public class PinotClient
private final PinotHostMapper pinotHostMapper;

private final NonEvictableLoadingCache<String, List<String>> brokersForTableCache;
private final NonEvictableLoadingCache<Object, Multimap<String, String>> allTablesCache;

private final JsonCodec<GetTables> tablesJsonCodec;
private final JsonCodec<BrokersForTable> brokersForTableJsonCodec;
Expand All @@ -125,6 +137,7 @@ public PinotClient(
PinotConfig config,
PinotHostMapper pinotHostMapper,
@ForPinot HttpClient httpClient,
@ForPinot ExecutorService executor,
JsonCodec<GetTables> tablesJsonCodec,
JsonCodec<BrokersForTable> brokersForTableJsonCodec,
JsonCodec<TimeBoundary> timeBoundaryJsonCodec,
Expand All @@ -150,6 +163,10 @@ public PinotClient(
CacheBuilder.newBuilder()
.expireAfterWrite(config.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
CacheLoader.from(this::getAllBrokersForTable));
this.allTablesCache = buildNonEvictableCache(
CacheBuilder.newBuilder()
.refreshAfterWrite(config.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
asyncReloading(CacheLoader.from(this::getAllTables), executor));
this.controllerAuthenticationProvider = controllerAuthenticationProvider;
this.brokerAuthenticationProvider = brokerAuthenticationProvider;
}
Expand Down Expand Up @@ -239,9 +256,14 @@ public List<String> getTables()
}
}

public List<String> getAllTables()
protected Multimap<String, String> getAllTables()
{
return sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
List<String> allTables = sendHttpGetToControllerJson(GET_ALL_TABLES_API_TEMPLATE, tablesJsonCodec).getTables();
ImmutableListMultimap.Builder<String, String> builder = ImmutableListMultimap.builder();
for (String table : allTables) {
builder.put(table.toLowerCase(ENGLISH), table);
}
return builder.build();
}

public Schema getTableSchema(String table)
Expand All @@ -250,6 +272,46 @@ public Schema getTableSchema(String table)
return sendHttpGetToControllerJson(format(TABLE_SCHEMA_API_TEMPLATE, table), schemaJsonCodec);
}

public List<String> getPinotTableNames()
{
return ImmutableList.copyOf(getFromCache(allTablesCache, ALL_TABLES_CACHE_KEY).keySet());
}

public static <K, V> V getFromCache(LoadingCache<K, V> cache, K key)
{
V value = cache.getIfPresent(key);
if (value != null) {
return value;
}
try {
return cache.get(key);
}
catch (ExecutionException e) {
throw new PinotException(PinotErrorCode.PINOT_UNCLASSIFIED_ERROR, Optional.empty(), "Cannot fetch from cache " + key, e.getCause());
}
}

public String getPinotTableNameFromTrinoTableNameIfExists(String trinoTableName)
{
Collection<String> candidates = getFromCache(allTablesCache, ALL_TABLES_CACHE_KEY).get(trinoTableName.toLowerCase(ENGLISH));
if (candidates.isEmpty()) {
return null;
}
if (candidates.size() == 1) {
return getOnlyElement(candidates);
}
throw new PinotException(PINOT_AMBIGUOUS_TABLE_NAME, Optional.empty(), format("Ambiguous table names: %s", candidates.stream().collect(joining(", "))));
}

public String getPinotTableNameFromTrinoTableName(String trinoTableName)
{
String pinotTableName = getPinotTableNameFromTrinoTableNameIfExists(trinoTableName);
if (pinotTableName == null) {
throw new TableNotFoundException(new SchemaTableName(SCHEMA_NAME, trinoTableName));
}
return pinotTableName;
}

public static class BrokersForTable
{
public static class InstancesInBroker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,12 @@ public static DynamicTable buildFromPql(PinotMetadata pinotMetadata, SchemaTable
BrokerRequest request = REQUEST_COMPILER.compileToBrokerRequest(query);
PinotQuery pinotQuery = request.getPinotQuery();
QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(request);
String pinotTableName = stripSuffix(request.getQuerySource().getTableName());
Optional<String> suffix = getSuffix(request.getQuerySource().getTableName());
String tableName = request.getQuerySource().getTableName();
String trinoTableName = stripSuffix(tableName).toLowerCase(ENGLISH);
String pinotTableName = pinotClient.getPinotTableNameFromTrinoTableName(trinoTableName);
Optional<String> suffix = getSuffix(tableName);

Map<String, ColumnHandle> columnHandles = pinotMetadata.getPinotColumnHandles(pinotTableName);
Map<String, ColumnHandle> columnHandles = pinotMetadata.getPinotColumnHandles(trinoTableName);
List<OrderByExpression> orderBy = ImmutableList.of();
PinotTypeResolver pinotTypeResolver = new PinotTypeResolver(pinotClient, pinotTableName);
List<PinotColumnHandle> selectColumns = ImmutableList.of();
Expand Down
Loading