Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,39 +473,38 @@ public LocatedTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
// Pretend the table does not exist to produce better error message in case of table redirects to Hive
return null;
}
SchemaTableName dataTableName = new SchemaTableName(tableName.getSchemaName(), tableName.getTableName());
Optional<DeltaMetastoreTable> table = metastore.getTable(dataTableName.getSchemaName(), dataTableName.getTableName());
Optional<DeltaMetastoreTable> table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName());
if (table.isEmpty()) {
return null;
}
boolean managed = table.get().managed();

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(dataTableName, tableLocation, session);
TableSnapshot tableSnapshot = getSnapshot(tableName, tableLocation, session);
MetadataEntry metadataEntry;
try {
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return new CorruptedDeltaLakeTableHandle(dataTableName, managed, tableLocation, e);
return new CorruptedDeltaLakeTableHandle(tableName, managed, tableLocation, e);
}
throw e;
}
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.getMinReaderVersion() > MAX_READER_VERSION) {
LOG.debug("Skip %s because the reader version is unsupported: %d", dataTableName, protocolEntry.getMinReaderVersion());
LOG.debug("Skip %s because the reader version is unsupported: %d", tableName, protocolEntry.getMinReaderVersion());
return null;
}
Set<String> unsupportedReaderFeatures = unsupportedReaderFeatures(protocolEntry.getReaderFeatures().orElse(ImmutableSet.of()));
if (!unsupportedReaderFeatures.isEmpty()) {
LOG.debug("Skip %s because the table contains unsupported reader features: %s", dataTableName, unsupportedReaderFeatures);
LOG.debug("Skip %s because the table contains unsupported reader features: %s", tableName, unsupportedReaderFeatures);
return null;
}
verifySupportedColumnMapping(getColumnMappingMode(metadataEntry));
return new DeltaLakeTableHandle(
dataTableName.getSchemaName(),
dataTableName.getTableName(),
tableName.getSchemaName(),
tableName.getTableName(),
managed,
tableLocation,
metadataEntry,
Expand Down Expand Up @@ -546,16 +545,12 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
// This method does not calculate column metadata for the projected columns
checkArgument(tableHandle.getProjectedColumns().isEmpty(), "Unexpected projected columns");
MetadataEntry metadataEntry = tableHandle.getMetadataEntry();
Map<String, String> columnComments = getColumnComments(metadataEntry);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadataEntry);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadataEntry);

List<String> constraints = ImmutableList.<String>builder()
.addAll(getCheckConstraints(metadataEntry).values())
.addAll(getColumnInvariants(metadataEntry).values()) // The internal logic for column invariants in Delta Lake is same as check constraints
.build();
List<ColumnMetadata> columns = getColumns(metadataEntry).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getBaseColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName())))
.collect(toImmutableList());
List<ColumnMetadata> columns = getTableColumnMetadata(metadataEntry);

ImmutableMap.Builder<String, Object> properties = ImmutableMap.<String, Object>builder()
.put(LOCATION_PROPERTY, tableHandle.getLocation());
Expand Down Expand Up @@ -592,6 +587,17 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
.collect(toImmutableList()));
}

private List<ColumnMetadata> getTableColumnMetadata(MetadataEntry metadataEntry)
{
Map<String, String> columnComments = getColumnComments(metadataEntry);
Map<String, Boolean> columnsNullability = getColumnsNullability(metadataEntry);
Map<String, String> columnGenerations = getGeneratedColumnExpressions(metadataEntry);
List<ColumnMetadata> columns = getColumns(metadataEntry).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getBaseColumnName()), columnsNullability.getOrDefault(column.getBaseColumnName(), true), columnGenerations.get(column.getBaseColumnName())))
.collect(toImmutableList());
return columns;
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,9 @@
import static io.trino.plugin.hive.util.AcidTables.writeAcidVersionFile;
import static io.trino.plugin.hive.util.HiveBucketing.getHiveBucketHandle;
import static io.trino.plugin.hive.util.HiveBucketing.isSupportedBucketing;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getRegularColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.getTableColumnMetadata;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isDeltaLakeTable;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
Expand Down Expand Up @@ -634,11 +634,7 @@ else if (isTrinoView || isTrinoMaterializedView) {
throw new TableNotFoundException(tableName);
}

Function<HiveColumnHandle, ColumnMetadata> metadataGetter = columnMetadataGetter(table);
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (HiveColumnHandle columnHandle : hiveColumnHandles(table, typeManager, getTimestampPrecision(session))) {
columns.add(metadataGetter.apply(columnHandle));
}
List<ColumnMetadata> columns = getTableColumnMetadata(session, table, typeManager);

// External location property
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
Expand Down Expand Up @@ -737,7 +733,7 @@ else if (isTrinoView || isTrinoMaterializedView) {
// Partition Projection specific properties
properties.putAll(partitionProjectionService.getPartitionProjectionTrinoTableProperties(table));

return new ConnectorTableMetadata(tableName, columns.build(), properties.buildOrThrow(), comment);
return new ConnectorTableMetadata(tableName, columns, properties.buildOrThrow(), comment);
}

private static Optional<String> getCsvSerdeProperty(Table table, String key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -462,15 +463,7 @@ public Optional<List<SchemaTableName>> getAllViews()
private List<String> getTableNames(String databaseName, Predicate<com.amazonaws.services.glue.model.Table> filter)
{
try {
List<String> tableNames = getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream)
List<String> tableNames = getGlueTables(databaseName)
.filter(filter)
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
Expand Down Expand Up @@ -1164,6 +1157,19 @@ public void checkSupportsTransactions()
throw new TrinoException(NOT_SUPPORTED, "Glue does not support ACID tables");
}

private Stream<com.amazonaws.services.glue.model.Table> getGlueTables(String databaseName)
{
return getPaginatedResults(
glueClient::getTables,
new GetTablesRequest()
.withDatabaseName(databaseName),
GetTablesRequest::setNextToken,
GetTablesResult::getNextToken,
stats.getGetTables())
.map(GetTablesResult::getTableList)
.flatMap(List::stream);
}

static class StatsRecordingAsyncHandler<Request extends AmazonWebServiceRequest, Result>
implements AsyncHandler<Request, Result>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.CharType;
Expand Down Expand Up @@ -93,6 +94,7 @@
import static io.trino.plugin.hive.HiveMetadata.SKIP_FOOTER_COUNT_KEY;
import static io.trino.plugin.hive.HiveMetadata.SKIP_HEADER_COUNT_KEY;
import static io.trino.plugin.hive.HivePartitionKey.HIVE_DEFAULT_DYNAMIC_PARTITION;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveTableProperties.ORC_BLOOM_FILTER_FPP;
import static io.trino.plugin.hive.HiveType.toHiveTypes;
import static io.trino.plugin.hive.metastore.SortingColumn.Order.ASCENDING;
Expand Down Expand Up @@ -511,6 +513,13 @@ private static Slice charPartitionKey(String value, String name, Type columnType
return partitionKey;
}

public static List<ColumnMetadata> getTableColumnMetadata(ConnectorSession session, Table table, TypeManager typeManager)
{
return hiveColumnHandles(table, typeManager, getTimestampPrecision(session)).stream()
.map(columnMetadataGetter(table))
.collect(toImmutableList());
}

public static List<HiveColumnHandle> hiveColumnHandles(Table table, TypeManager typeManager, HiveTimestampPrecision timestampPrecision)
{
ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.io.RecursiveDeleteOption;
import com.google.common.reflect.ClassPath;
import io.airlift.log.Logger;
import io.trino.filesystem.Location;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
Expand Down Expand Up @@ -47,17 +48,23 @@
import java.net.URI;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_QUERY_ID_NAME;
import static io.trino.plugin.hive.HiveMetadata.PRESTO_VERSION_NAME;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveStorageFormat.ORC;
import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.plugin.hive.HiveType.HIVE_INT;
import static io.trino.plugin.hive.HiveType.HIVE_STRING;
import static io.trino.plugin.hive.TableType.MANAGED_TABLE;
import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES;
import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.trino.plugin.hive.util.HiveUtil.SPARK_TABLE_PROVIDER_KEY;
import static java.nio.file.Files.copy;
Expand Down Expand Up @@ -105,14 +112,73 @@ public void initialize()
.setRcfileTimeZone("America/Los_Angeles");

setup(testDbName, hiveConfig, metastore, HDFS_ENVIRONMENT);

createTestTables();
}

protected void createTestTables()
throws Exception
{
Location location = Location.of((metastoreClient.getDatabase(database).orElseThrow()
.getLocation().orElseThrow()));

createTestTable(
// Matches create-test.sql » trino_test_partition_format
Table.builder()
.setDatabaseName(database)
.setTableName(tablePartitionFormat.getTableName())
.setTableType(MANAGED_TABLE.name())
.setOwner(Optional.empty())
.setDataColumns(List.of(
new Column("t_string", HiveType.HIVE_STRING, Optional.empty(), Map.of()),
new Column("t_tinyint", HiveType.HIVE_BYTE, Optional.empty(), Map.of()),
new Column("t_smallint", HiveType.HIVE_SHORT, Optional.empty(), Map.of()),
new Column("t_int", HiveType.HIVE_INT, Optional.empty(), Map.of()),
new Column("t_bigint", HiveType.HIVE_LONG, Optional.empty(), Map.of()),
new Column("t_float", HiveType.HIVE_FLOAT, Optional.empty(), Map.of()),
new Column("t_boolean", HiveType.HIVE_BOOLEAN, Optional.empty(), Map.of())))
.setPartitionColumns(List.of(
new Column("ds", HiveType.HIVE_STRING, Optional.empty(), Map.of()),
new Column("file_format", HiveType.HIVE_STRING, Optional.empty(), Map.of()),
new Column("dummy", HiveType.HIVE_INT, Optional.empty(), Map.of())))
.setParameter(TABLE_COMMENT, "Presto test data")
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(new HiveConfig().getHiveStorageFormat()))
.setLocation(Optional.of(location.appendPath(tablePartitionFormat.getTableName()).toString())))
.build());

createTestTable(
// Matches create-test.sql » trino_test_partition_format
Table.builder()
.setDatabaseName(database)
.setTableName(tableUnpartitioned.getTableName())
.setTableType(MANAGED_TABLE.name())
.setOwner(Optional.empty())
.setDataColumns(List.of(
new Column("t_string", HiveType.HIVE_STRING, Optional.empty(), Map.of()),
new Column("t_tinyint", HiveType.HIVE_BYTE, Optional.empty(), Map.of())))
.setParameter(TABLE_COMMENT, "Presto test data")
.withStorage(storage -> storage
.setStorageFormat(fromHiveStorageFormat(TEXTFILE))
.setLocation(Optional.of(location.appendPath(tableUnpartitioned.getTableName()).toString())))
.build());
}

protected void createTestTable(Table table)
throws Exception
{
metastoreClient.createTable(table, NO_PRIVILEGES);
}

@AfterClass(alwaysRun = true)
public void cleanup()
throws IOException
{
try {
getMetastoreClient().dropDatabase(testDbName, true);
for (String tableName : metastoreClient.getAllTables(database)) {
metastoreClient.dropTable(database, tableName, true);
}
metastoreClient.dropDatabase(testDbName, true);
}
finally {
deleteRecursively(tempDir.toPath(), ALLOW_INSECURE);
Expand All @@ -128,12 +194,6 @@ protected ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, Schema
throw new SkipException("tests using existing tables are not supported");
}

@Override
public void testGetAllTableNames()
{
throw new SkipException("Test disabled for this subclass");
}

@Override
public void testGetAllTableColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
package io.trino.plugin.hive;

import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.Table;
import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore;
import io.trino.plugin.hive.metastore.thrift.InMemoryThriftMetastore;
import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig;
import org.testng.SkipException;
import org.testng.annotations.Test;

import java.io.File;
import java.net.URI;

import static java.nio.file.Files.createDirectories;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

// staging directory is shared mutable state
Expand All @@ -38,6 +41,14 @@ protected HiveMetastore createMetastore(File tempDir)
return new BridgingHiveMetastore(hiveMetastore);
}

@Override
protected void createTestTable(Table table)
throws Exception
{
createDirectories(new File(URI.create(table.getStorage().getLocation())).toPath());
super.createTestTable(table);
}

@Test
public void forceTestNgToRespectSingleThreaded()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public enum Method
GET_TABLE,
GET_ALL_TABLES,
GET_ALL_TABLES_FROM_DATABASE,
GET_TABLE_WITH_PARAMETER,
GET_TABLES_WITH_PARAMETER,
GET_TABLE_STATISTICS,
GET_ALL_VIEWS,
GET_ALL_VIEWS_FROM_DATABASE,
Expand Down Expand Up @@ -113,7 +113,7 @@ public Optional<Database> getDatabase(String databaseName)
@Override
public List<String> getTablesWithParameter(String databaseName, String parameterKey, String parameterValue)
{
methodInvocations.add(Method.GET_TABLE_WITH_PARAMETER);
methodInvocations.add(Method.GET_TABLES_WITH_PARAMETER);
return delegate.getTablesWithParameter(databaseName, parameterKey, parameterValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public synchronized void createTable(Table table)
}
else {
File directory = new File(new Path(table.getSd().getLocation()).toUri());
checkArgument(directory.exists(), "Table directory does not exist");
checkArgument(directory.exists(), "Table directory [%s] does not exist", directory);
if (tableType == MANAGED_TABLE) {
checkArgument(isParentDir(directory, baseDirectory), "Table directory must be inside of the metastore base directory");
}
Expand All @@ -202,7 +202,7 @@ public synchronized void createTable(Table table)
}

PrincipalPrivilegeSet privileges = table.getPrivileges();
if (privileges != null) {
if (privileges != null && (!privileges.getUserPrivileges().isEmpty() || !privileges.getGroupPrivileges().isEmpty() || !privileges.getRolePrivileges().isEmpty())) {
throw new UnsupportedOperationException();
}
}
Expand Down
Loading