Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.aws.athena.projection.ProjectionType;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move this to a top level io.trino.plugin.hive.projection package. The two classes in aws need to be moved to the metastore.glue package anyway.

import io.trino.plugin.hive.projection.ProjectionType;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import java.time.temporal.ChronoUnit;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_DIGITS;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_FORMAT;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_INTERVAL;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_INTERVAL_UNIT;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_RANGE;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_TYPE;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.COLUMN_PROJECTION_VALUES;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_DIGITS;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_FORMAT;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_INTERVAL;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_INTERVAL_UNIT;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_RANGE;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_TYPE;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.COLUMN_PROJECTION_VALUES;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.trino.plugin.hive.LocationService.WriteInfo;
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.aws.athena.PartitionProjectionService;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
Expand Down Expand Up @@ -270,6 +269,9 @@
import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.trino.plugin.hive.metastore.thrift.ThriftMetastoreUtil.STATS_PROPERTIES;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.arePartitionProjectionPropertiesSet;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getPartitionProjectionHiveTableProperties;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getPartitionProjectionTrinoTableProperties;
import static io.trino.plugin.hive.type.Category.PRIMITIVE;
import static io.trino.plugin.hive.util.AcidTables.deltaSubdir;
import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable;
Expand Down Expand Up @@ -301,6 +303,7 @@
import static io.trino.plugin.hive.util.Statistics.reduce;
import static io.trino.plugin.hive.util.SystemTables.getSourceTableNameFromSystemTable;
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_COLUMN_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -383,7 +386,7 @@ public class HiveMetadata
private final HiveMaterializedViewMetadata hiveMaterializedViewMetadata;
private final AccessControlMetadata accessControlMetadata;
private final DirectoryLister directoryLister;
private final PartitionProjectionService partitionProjectionService;
private final boolean partitionProjectionEnabled;
private final boolean allowTableRename;
private final long maxPartitionDropsPerQuery;
private final HiveTimestampPrecision hiveViewsTimestampPrecision;
Expand Down Expand Up @@ -411,7 +414,7 @@ public HiveMetadata(
HiveMaterializedViewMetadata hiveMaterializedViewMetadata,
AccessControlMetadata accessControlMetadata,
DirectoryLister directoryLister,
PartitionProjectionService partitionProjectionService,
boolean partitionProjectionEnabled,
boolean allowTableRename,
long maxPartitionDropsPerQuery,
HiveTimestampPrecision hiveViewsTimestampPrecision)
Expand All @@ -438,7 +441,7 @@ public HiveMetadata(
this.hiveMaterializedViewMetadata = requireNonNull(hiveMaterializedViewMetadata, "hiveMaterializedViewMetadata is null");
this.accessControlMetadata = requireNonNull(accessControlMetadata, "accessControlMetadata is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.partitionProjectionService = requireNonNull(partitionProjectionService, "partitionProjectionService is null");
this.partitionProjectionEnabled = partitionProjectionEnabled;
this.allowTableRename = allowTableRename;
this.maxPartitionDropsPerQuery = maxPartitionDropsPerQuery;
this.hiveViewsTimestampPrecision = requireNonNull(hiveViewsTimestampPrecision, "hiveViewsTimestampPrecision is null");
Expand Down Expand Up @@ -741,7 +744,7 @@ else if (isTrinoView || isTrinoMaterializedView) {
}

// Partition Projection specific properties
properties.putAll(partitionProjectionService.getPartitionProjectionTrinoTableProperties(table));
properties.putAll(getPartitionProjectionTrinoTableProperties(table));

return new ConnectorTableMetadata(tableName, columns, properties.buildOrThrow(), comment);
}
Expand Down Expand Up @@ -1219,7 +1222,14 @@ else if (avroSchemaLiteral != null) {
tableMetadata.getComment().ifPresent(value -> tableProperties.put(TABLE_COMMENT, value));

// Partition Projection specific properties
tableProperties.putAll(partitionProjectionService.getPartitionProjectionHiveTableProperties(tableMetadata));
if (partitionProjectionEnabled) {
tableProperties.putAll(getPartitionProjectionHiveTableProperties(tableMetadata));
}
else if (arePartitionProjectionPropertiesSet(tableMetadata)) {
throw new TrinoException(
INVALID_COLUMN_PROPERTY,
"Partition projection is disabled. Enable it in configuration by setting " + HiveConfig.CONFIGURATION_HIVE_PARTITION_PROJECTION_ENABLED + "=true");
}

Map<String, String> baseProperties = tableProperties.buildOrThrow();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.units.Duration;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.aws.athena.PartitionProjectionService;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.fs.TransactionScopeCachingDirectoryListerFactory;
import io.trino.plugin.hive.metastore.HiveMetastoreConfig;
Expand Down Expand Up @@ -77,7 +76,7 @@ public class HiveMetadataFactory
private final ScheduledExecutorService heartbeatService;
private final DirectoryLister directoryLister;
private final TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory;
private final PartitionProjectionService partitionProjectionService;
private final boolean partitionProjectionEnabled;
private final boolean allowTableRename;
private final HiveTimestampPrecision hiveViewsTimestampPrecision;

Expand All @@ -103,7 +102,6 @@ public HiveMetadataFactory(
AccessControlMetadataFactory accessControlMetadataFactory,
DirectoryLister directoryLister,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
@AllowHiveTableRename boolean allowTableRename)
{
this(
Expand Down Expand Up @@ -139,7 +137,7 @@ public HiveMetadataFactory(
accessControlMetadataFactory,
directoryLister,
transactionScopeCachingDirectoryListerFactory,
partitionProjectionService,
hiveConfig.isPartitionProjectionEnabled(),
allowTableRename,
hiveConfig.getTimestampPrecision());
}
Expand Down Expand Up @@ -177,7 +175,7 @@ public HiveMetadataFactory(
AccessControlMetadataFactory accessControlMetadataFactory,
DirectoryLister directoryLister,
TransactionScopeCachingDirectoryListerFactory transactionScopeCachingDirectoryListerFactory,
PartitionProjectionService partitionProjectionService,
boolean partitionProjectionEnabled,
boolean allowTableRename,
HiveTimestampPrecision hiveViewsTimestampPrecision)
{
Expand Down Expand Up @@ -220,7 +218,7 @@ public HiveMetadataFactory(
this.heartbeatService = requireNonNull(heartbeatService, "heartbeatService is null");
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
this.transactionScopeCachingDirectoryListerFactory = requireNonNull(transactionScopeCachingDirectoryListerFactory, "transactionScopeCachingDirectoryListerFactory is null");
this.partitionProjectionService = requireNonNull(partitionProjectionService, "partitionProjectionService is null");
this.partitionProjectionEnabled = partitionProjectionEnabled;
this.allowTableRename = allowTableRename;
this.hiveViewsTimestampPrecision = requireNonNull(hiveViewsTimestampPrecision, "hiveViewsTimestampPrecision is null");
}
Expand All @@ -229,7 +227,7 @@ public HiveMetadataFactory(
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
CachingHiveMetastore cachingHiveMetastore = createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize);
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(cachingHiveMetastore);
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(cachingHiveMetastore, typeManager, partitionProjectionEnabled);

DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
Expand Down Expand Up @@ -268,7 +266,7 @@ public TransactionalMetadata create(ConnectorIdentity identity, boolean autoComm
hiveMaterializedViewMetadataFactory.create(hiveMetastoreClosure),
accessControlMetadataFactory.create(metastore),
directoryLister,
partitionProjectionService,
partitionProjectionEnabled,
allowTableRename,
maxPartitionDropsPerQuery,
hiveViewsTimestampPrecision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import io.trino.plugin.hive.metastore.PartitionWithStatistics;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.projection.PartitionProjection;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.function.LanguageFunction;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.RoleGrant;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import java.util.Collection;
import java.util.List;
Expand All @@ -47,20 +50,26 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.immutableEntry;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.getPartitionProjectionFromTable;
import static java.util.Objects.requireNonNull;

public class HiveMetastoreClosure
{
private final HiveMetastore delegate;
private final TypeManager typeManager;
private final boolean partitionProjectionEnabled;

/**
* Do not use this directly. Instead, the closure should be fetched from the current SemiTransactionalHiveMetastore,
* which can be fetched from the current HiveMetadata.
*/
public HiveMetastoreClosure(HiveMetastore delegate)
public HiveMetastoreClosure(HiveMetastore delegate, TypeManager typeManager, boolean partitionProjectionEnabled)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.partitionProjectionEnabled = partitionProjectionEnabled;
}

public Optional<Database> getDatabase(String databaseName)
Expand Down Expand Up @@ -247,12 +256,21 @@ public Optional<List<String>> getPartitionNamesByFilter(
List<String> columnNames,
TupleDomain<String> partitionKeysFilter)
{
if (partitionProjectionEnabled) {
Table table = getTable(databaseName, tableName)
.orElseThrow(() -> new TrinoException(HIVE_TABLE_DROPPED_DURING_QUERY, "Table does not exists: " + tableName));

Optional<PartitionProjection> projection = getPartitionProjectionFromTable(table, typeManager);
if (projection.isPresent()) {
return projection.get().getProjectedPartitionNamesByFilter(columnNames, partitionKeysFilter);
}
}
return delegate.getPartitionNamesByFilter(databaseName, tableName, columnNames, partitionKeysFilter);
}

private List<Partition> getExistingPartitionsByNames(Table table, List<String> partitionNames)
{
Map<String, Partition> partitions = delegate.getPartitionsByNames(table, partitionNames).entrySet().stream()
Map<String, Partition> partitions = getPartitionsByNames(table, partitionNames).entrySet().stream()
.map(entry -> immutableEntry(entry.getKey(), entry.getValue().orElseThrow(() ->
new PartitionNotFoundException(table.getSchemaTableName(), extractPartitionValues(entry.getKey())))))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -265,11 +283,22 @@ private List<Partition> getExistingPartitionsByNames(Table table, List<String> p
public Map<String, Optional<Partition>> getPartitionsByNames(String databaseName, String tableName, List<String> partitionNames)
{
return delegate.getTable(databaseName, tableName)
.map(table -> delegate.getPartitionsByNames(table, partitionNames))
.map(table -> getPartitionsByNames(table, partitionNames))
.orElseGet(() -> partitionNames.stream()
.collect(toImmutableMap(name -> name, name -> Optional.empty())));
}

private Map<String, Optional<Partition>> getPartitionsByNames(Table table, List<String> partitionNames)
{
if (partitionProjectionEnabled) {
Optional<PartitionProjection> projection = getPartitionProjectionFromTable(table, typeManager);
if (projection.isPresent()) {
return projection.get().getProjectedPartitionsByNames(table, partitionNames);
}
}
return delegate.getPartitionsByNames(table, partitionNames);
}

public void addPartitions(String databaseName, String tableName, List<PartitionWithStatistics> partitions)
{
delegate.addPartitions(databaseName, tableName, partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr
handle.getLocationHandle(),
locationService,
session.getQueryId(),
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), new HiveMetastoreClosure(cachingHiveMetastore)),
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), new HiveMetastoreClosure(cachingHiveMetastore, typeManager, false)),
typeManager,
pageSorter,
writerSortBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.PARTITION_PROJECTION_ENABLED;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.PARTITION_PROJECTION_IGNORE;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.PARTITION_PROJECTION_LOCATION_TEMPLATE;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.PARTITION_PROJECTION_ENABLED;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.PARTITION_PROJECTION_IGNORE;
import static io.trino.plugin.hive.projection.PartitionProjectionProperties.PARTITION_PROJECTION_LOCATION_TEMPLATE;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V2;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
import io.trino.plugin.base.jmx.MBeanServerModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.aws.athena.PartitionProjectionModule;
import io.trino.plugin.hive.fs.CachingDirectoryListerModule;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastore;
Expand Down Expand Up @@ -106,7 +105,6 @@ public static Connector createConnector(
new JsonModule(),
new TypeDeserializerModule(context.getTypeManager()),
new HiveModule(),
new PartitionProjectionModule(),
new CachingDirectoryListerModule(directoryLister),
new HiveMetastoreModule(metastore),
new HiveSecurityModule(),
Expand Down
Loading