diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/UncheckedCloseable.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/UncheckedCloseable.java new file mode 100644 index 000000000000..2e91f3bf37ba --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/util/UncheckedCloseable.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.util; + +public interface UncheckedCloseable + extends AutoCloseable +{ + @Override + void close(); +} diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunction.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunction.java index 56cc9849ffb0..416401fee895 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunction.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/functions/tablechanges/TableChangesFunction.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import io.airlift.slice.Slice; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.CorruptedDeltaLakeTableHandle; import io.trino.plugin.deltalake.DeltaLakeColumnHandle; import io.trino.plugin.deltalake.DeltaLakeMetadata; @@ -104,42 +105,45 @@ public TableFunctionAnalysis analyze( long firstReadVersion = sinceVersion + 1; // +1 to ensure that the since_version is exclusive; may overflow DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(session.getIdentity()); - SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - ConnectorTableHandle connectorTableHandle = deltaLakeMetadata.getTableHandle(session, schemaTableName); - if (connectorTableHandle == null) { - throw new TableNotFoundException(schemaTableName); - } - if (connectorTableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) { - throw corruptedTableHandle.createException(); - } - DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle; + deltaLakeMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> deltaLakeMetadata.cleanupQuery(session)) { + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + ConnectorTableHandle connectorTableHandle = deltaLakeMetadata.getTableHandle(session, schemaTableName); + if (connectorTableHandle == null) { + throw new TableNotFoundException(schemaTableName); + } + if (connectorTableHandle instanceof CorruptedDeltaLakeTableHandle corruptedTableHandle) { + throw corruptedTableHandle.createException(); + } + DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) connectorTableHandle; - if (sinceVersion > tableHandle.getReadVersion()) { - throw new TrinoException(INVALID_FUNCTION_ARGUMENT, format("since_version: %d is higher then current table version: %d", sinceVersion, tableHandle.getReadVersion())); - } - List columnHandles = deltaLakeMetadata.getColumnHandles(session, tableHandle) - .values().stream() - .map(DeltaLakeColumnHandle.class::cast) - .filter(column -> column.getColumnType() != SYNTHESIZED) - .collect(toImmutableList()); - accessControl.checkCanSelectFromColumns(null, schemaTableName, columnHandles.stream() - // Lowercase column names because users don't know the original names - .map(column -> column.getColumnName().toLowerCase(ENGLISH)) - .collect(toImmutableSet())); + if (sinceVersion > tableHandle.getReadVersion()) { + throw new TrinoException(INVALID_FUNCTION_ARGUMENT, format("since_version: %d is higher then current table version: %d", sinceVersion, tableHandle.getReadVersion())); + } + List columnHandles = deltaLakeMetadata.getColumnHandles(session, tableHandle) + .values().stream() + .map(DeltaLakeColumnHandle.class::cast) + .filter(column -> column.getColumnType() != SYNTHESIZED) + .collect(toImmutableList()); + accessControl.checkCanSelectFromColumns(null, schemaTableName, columnHandles.stream() + // Lowercase column names because users don't know the original names + .map(column -> column.getColumnName().toLowerCase(ENGLISH)) + .collect(toImmutableSet())); - ImmutableList.Builder outputFields = ImmutableList.builder(); - columnHandles.stream() - .map(columnHandle -> new Descriptor.Field(columnHandle.getColumnName(), Optional.of(columnHandle.getType()))) - .forEach(outputFields::add); + ImmutableList.Builder outputFields = ImmutableList.builder(); + columnHandles.stream() + .map(columnHandle -> new Descriptor.Field(columnHandle.getColumnName(), Optional.of(columnHandle.getType()))) + .forEach(outputFields::add); - // add at the end to follow Delta Lake convention - outputFields.add(new Descriptor.Field(CHANGE_TYPE_COLUMN_NAME, Optional.of(VARCHAR))); - outputFields.add(new Descriptor.Field(COMMIT_VERSION_COLUMN_NAME, Optional.of(BIGINT))); - outputFields.add(new Descriptor.Field(COMMIT_TIMESTAMP_COLUMN_NAME, Optional.of(TIMESTAMP_TZ_MILLIS))); + // add at the end to follow Delta Lake convention + outputFields.add(new Descriptor.Field(CHANGE_TYPE_COLUMN_NAME, Optional.of(VARCHAR))); + outputFields.add(new Descriptor.Field(COMMIT_VERSION_COLUMN_NAME, Optional.of(BIGINT))); + outputFields.add(new Descriptor.Field(COMMIT_TIMESTAMP_COLUMN_NAME, Optional.of(TIMESTAMP_TZ_MILLIS))); - return TableFunctionAnalysis.builder() - .handle(new TableChangesTableFunctionHandle(schemaTableName, firstReadVersion, tableHandle.getReadVersion(), tableHandle.getLocation(), columnHandles)) - .returnedType(new Descriptor(outputFields.build())) - .build(); + return TableFunctionAnalysis.builder() + .handle(new TableChangesTableFunctionHandle(schemaTableName, firstReadVersion, tableHandle.getReadVersion(), tableHandle.getLocation(), columnHandles)) + .returnedType(new Descriptor(outputFields.build())) + .build(); + } } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java index 6728b496a9c6..bdb015b5fd02 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/DropExtendedStatsProcedure.java @@ -15,6 +15,7 @@ import com.google.inject.Inject; import com.google.inject.Provider; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeMetadata; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; import io.trino.plugin.deltalake.LocatedTableHandle; @@ -79,11 +80,14 @@ public void dropStats(ConnectorSession session, ConnectorAccessControl accessCon SchemaTableName name = new SchemaTableName(schema, table); DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity()); - LocatedTableHandle tableHandle = metadata.getTableHandle(session, name); - if (tableHandle == null) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name)); + metadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) { + LocatedTableHandle tableHandle = metadata.getTableHandle(session, name); + if (tableHandle == null) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", name)); + } + accessControl.checkCanInsertIntoTable(null, name); + statsAccess.deleteExtendedStatistics(session, name, tableHandle.location()); } - accessControl.checkCanInsertIntoTable(null, name); - statsAccess.deleteExtendedStatistics(session, name, tableHandle.location()); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java index 4ac20e34b303..52429e5e891d 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/RegisterTableProcedure.java @@ -19,7 +19,9 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeConfig; +import io.trino.plugin.deltalake.DeltaLakeMetadata; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; import io.trino.plugin.deltalake.statistics.CachingExtendedStatisticsAccess; @@ -140,60 +142,64 @@ private void doRegisterTable( checkProcedureArgument(!isNullOrEmpty(tableLocation), "table_location cannot be null or empty"); SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - DeltaLakeMetastore metastore = metadataFactory.create(session.getIdentity()).getMetastore(); + DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity()); + metadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) { + DeltaLakeMetastore metastore = metadata.getMetastore(); - if (metastore.getDatabase(schemaName).isEmpty()) { - throw new SchemaNotFoundException(schemaTableName.getSchemaName()); - } - - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - try { - Location transactionLogDir = Location.of(getTransactionLogDir(tableLocation)); - if (!fileSystem.listFiles(transactionLogDir).hasNext()) { - throw new TrinoException(GENERIC_USER_ERROR, format("No transaction log found in location %s", transactionLogDir)); + if (metastore.getDatabase(schemaName).isEmpty()) { + throw new SchemaNotFoundException(schemaTableName.getSchemaName()); } - } - catch (IOException e) { - throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed checking table location %s", tableLocation), e); - } - Table table = buildTable(session, schemaTableName, tableLocation, true); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + try { + Location transactionLogDir = Location.of(getTransactionLogDir(tableLocation)); + if (!fileSystem.listFiles(transactionLogDir).hasNext()) { + throw new TrinoException(GENERIC_USER_ERROR, format("No transaction log found in location %s", transactionLogDir)); + } + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, format("Failed checking table location %s", tableLocation), e); + } - PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); - statisticsAccess.invalidateCache(schemaTableName, Optional.of(tableLocation)); - transactionLogAccess.invalidateCache(schemaTableName, Optional.of(tableLocation)); - // Verify we're registering a location with a valid table - try { - TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, table.getSchemaTableName(), tableLocation); - transactionLogAccess.getMetadataEntry(tableSnapshot, session); // verify metadata exists - } - catch (TrinoException e) { - throw e; - } - catch (IOException | RuntimeException e) { - throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Failed to access table location: " + tableLocation, e); - } + Table table = buildTable(session, schemaTableName, tableLocation, true); - // Ensure the table has queryId set. This is relied on for exception handling - String queryId = session.getQueryId(); - verify( - getQueryId(table).orElseThrow(() -> new IllegalArgumentException("Query id is not present")).equals(queryId), - "Table '%s' does not have correct query id set", - table); - try { - metastore.createTable( - session, - table, - principalPrivileges); - } - catch (TableAlreadyExistsException e) { - // Ignore TableAlreadyExistsException when table looks like created by us. - // This may happen when an actually successful metastore create call is retried - // e.g. because of a timeout on our side. - Optional existingTable = metastore.getRawMetastoreTable(schemaName, tableName); - if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { + PrincipalPrivileges principalPrivileges = buildInitialPrivilegeSet(table.getOwner().orElseThrow()); + statisticsAccess.invalidateCache(schemaTableName, Optional.of(tableLocation)); + transactionLogAccess.invalidateCache(schemaTableName, Optional.of(tableLocation)); + // Verify we're registering a location with a valid table + try { + TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(session, table.getSchemaTableName(), tableLocation); + transactionLogAccess.getMetadataEntry(tableSnapshot, session); // verify metadata exists + } + catch (TrinoException e) { throw e; } + catch (IOException | RuntimeException e) { + throw new TrinoException(DELTA_LAKE_INVALID_TABLE, "Failed to access table location: " + tableLocation, e); + } + + // Ensure the table has queryId set. This is relied on for exception handling + String queryId = session.getQueryId(); + verify( + getQueryId(table).orElseThrow(() -> new IllegalArgumentException("Query id is not present")).equals(queryId), + "Table '%s' does not have correct query id set", + table); + try { + metastore.createTable( + session, + table, + principalPrivileges); + } + catch (TableAlreadyExistsException e) { + // Ignore TableAlreadyExistsException when table looks like created by us. + // This may happen when an actually successful metastore create call is retried + // e.g. because of a timeout on our side. + Optional
existingTable = metastore.getRawMetastoreTable(schemaName, tableName); + if (existingTable.isEmpty() || !isCreatedBy(existingTable.get(), queryId)) { + throw e; + } + } } } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java index 9d800ec8d61e..2e2b62b14ccf 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/UnregisterTableProcedure.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.Provider; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeMetadata; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; import io.trino.plugin.deltalake.LocatedTableHandle; @@ -96,14 +97,16 @@ private void doUnregisterTable(ConnectorAccessControl accessControl, ConnectorSe accessControl.checkCanDropTable(null, schemaTableName); DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity()); - - LocatedTableHandle tableHandle = metadata.getTableHandle(session, schemaTableName); - if (tableHandle == null) { - throw new TableNotFoundException(schemaTableName); + metadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) { + LocatedTableHandle tableHandle = metadata.getTableHandle(session, schemaTableName); + if (tableHandle == null) { + throw new TableNotFoundException(schemaTableName); + } + metadata.getMetastore().dropTable(session, schemaTableName, tableHandle.location(), false); + // As a precaution, clear the caches + statisticsAccess.invalidateCache(schemaTableName, Optional.of(tableHandle.location())); + transactionLogAccess.invalidateCache(schemaTableName, Optional.of(tableHandle.location())); } - metadata.getMetastore().dropTable(session, schemaTableName, tableHandle.location(), false); - // As a precaution, clear the caches - statisticsAccess.invalidateCache(schemaTableName, Optional.of(tableHandle.location())); - transactionLogAccess.invalidateCache(schemaTableName, Optional.of(tableHandle.location())); } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java index e8fb95bcfd55..088038075338 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/procedure/VacuumProcedure.java @@ -25,6 +25,7 @@ import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.deltalake.DeltaLakeConfig; import io.trino.plugin.deltalake.DeltaLakeMetadata; import io.trino.plugin.deltalake.DeltaLakeMetadataFactory; @@ -168,127 +169,130 @@ private void doVacuum( Instant threshold = Instant.now().minusMillis(retentionDuration.toMillis()); DeltaLakeMetadata metadata = metadataFactory.create(session.getIdentity()); - SchemaTableName tableName = new SchemaTableName(schema, table); - ConnectorTableHandle connectorTableHandle = metadata.getTableHandle(session, tableName); - checkProcedureArgument(connectorTableHandle != null, "Table '%s' does not exist", tableName); - DeltaLakeTableHandle handle = checkValidTableHandle(connectorTableHandle); + metadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> metadata.cleanupQuery(session)) { + SchemaTableName tableName = new SchemaTableName(schema, table); + ConnectorTableHandle connectorTableHandle = metadata.getTableHandle(session, tableName); + checkProcedureArgument(connectorTableHandle != null, "Table '%s' does not exist", tableName); + DeltaLakeTableHandle handle = checkValidTableHandle(connectorTableHandle); - accessControl.checkCanInsertIntoTable(null, tableName); - accessControl.checkCanDeleteFromTable(null, tableName); + accessControl.checkCanInsertIntoTable(null, tableName); + accessControl.checkCanDeleteFromTable(null, tableName); - TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion()); - ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); - if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) { - throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion())); - } - Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); - if (!unsupportedWriterFeatures.isEmpty()) { - throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures)); - } + TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), handle.getReadVersion()); + ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot); + if (protocolEntry.getMinWriterVersion() > MAX_WRITER_VERSION) { + throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %d writer version".formatted(protocolEntry.getMinWriterVersion())); + } + Set unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.getWriterFeatures().orElse(ImmutableSet.of())); + if (!unsupportedWriterFeatures.isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "Cannot execute vacuum procedure with %s writer features".formatted(unsupportedWriterFeatures)); + } - String tableLocation = tableSnapshot.getTableLocation(); - String transactionLogDir = getTransactionLogDir(tableLocation); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/"; - String queryId = session.getQueryId(); + String tableLocation = tableSnapshot.getTableLocation(); + String transactionLogDir = getTransactionLogDir(tableLocation); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + String commonPathPrefix = tableLocation.endsWith("/") ? tableLocation : tableLocation + "/"; + String queryId = session.getQueryId(); - // Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent"). - // Any remaining file are not live, and not needed to read any "recent" snapshot. - List recentVersions = transactionLogAccess.getPastTableVersions(fileSystem, transactionLogDir, threshold, tableSnapshot.getVersion()); - Set retainedPaths = Stream.concat( - transactionLogAccess.getActiveFiles(tableSnapshot, handle.getMetadataEntry(), handle.getProtocolEntry(), session).stream() - .map(AddFileEntry::getPath), - transactionLogAccess.getJsonEntries( - fileSystem, - transactionLogDir, - // discard oldest "recent" snapshot, since we take RemoveFileEntry only, to identify files that are no longer - // active files, but still needed to read a "recent" snapshot - recentVersions.stream().sorted(naturalOrder()) - .skip(1) - .collect(toImmutableList())) - .map(DeltaLakeTransactionLogEntry::getRemove) - .filter(Objects::nonNull) - .map(RemoveFileEntry::getPath)) - .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) - .collect(toImmutableSet()); + // Retain all active files and every file removed by a "recent" transaction (except for the oldest "recent"). + // Any remaining file are not live, and not needed to read any "recent" snapshot. + List recentVersions = transactionLogAccess.getPastTableVersions(fileSystem, transactionLogDir, threshold, tableSnapshot.getVersion()); + Set retainedPaths = Stream.concat( + transactionLogAccess.getActiveFiles(tableSnapshot, handle.getMetadataEntry(), handle.getProtocolEntry(), session).stream() + .map(AddFileEntry::getPath), + transactionLogAccess.getJsonEntries( + fileSystem, + transactionLogDir, + // discard oldest "recent" snapshot, since we take RemoveFileEntry only, to identify files that are no longer + // active files, but still needed to read a "recent" snapshot + recentVersions.stream().sorted(naturalOrder()) + .skip(1) + .collect(toImmutableList())) + .map(DeltaLakeTransactionLogEntry::getRemove) + .filter(Objects::nonNull) + .map(RemoveFileEntry::getPath)) + .peek(path -> checkState(!path.startsWith(tableLocation), "Unexpected absolute path in transaction log: %s", path)) + .collect(toImmutableSet()); - log.debug( - "[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention", - queryId, - tableName, - tableLocation, - retention, - threshold, - retainedPaths.size()); + log.debug( + "[%s] attempting to vacuum table %s [%s] with %s retention (expiry threshold %s). %s data file paths marked for retention", + queryId, + tableName, + tableLocation, + retention, + threshold, + retainedPaths.size()); - long allPathsChecked = 0; - long transactionLogFiles = 0; - long retainedKnownFiles = 0; - long retainedUnknownFiles = 0; - long removedFiles = 0; + long allPathsChecked = 0; + long transactionLogFiles = 0; + long retainedKnownFiles = 0; + long retainedUnknownFiles = 0; + long removedFiles = 0; - List filesToDelete = new ArrayList<>(); - FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); - while (listing.hasNext()) { - FileEntry entry = listing.next(); - String location = entry.location().toString(); - checkState( - location.startsWith(commonPathPrefix), - "Unexpected path [%s] returned when listing files under [%s]", - location, - tableLocation); - String relativePath = location.substring(commonPathPrefix.length()); - if (relativePath.isEmpty()) { - // A file returned for "tableLocation/", might be possible on S3. - continue; - } - allPathsChecked++; + List filesToDelete = new ArrayList<>(); + FileIterator listing = fileSystem.listFiles(Location.of(tableLocation)); + while (listing.hasNext()) { + FileEntry entry = listing.next(); + String location = entry.location().toString(); + checkState( + location.startsWith(commonPathPrefix), + "Unexpected path [%s] returned when listing files under [%s]", + location, + tableLocation); + String relativePath = location.substring(commonPathPrefix.length()); + if (relativePath.isEmpty()) { + // A file returned for "tableLocation/", might be possible on S3. + continue; + } + allPathsChecked++; - // ignore tableLocation/_delta_log/** - if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) { - log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location); - transactionLogFiles++; - continue; - } + // ignore tableLocation/_delta_log/** + if (relativePath.equals(TRANSACTION_LOG_DIRECTORY) || relativePath.startsWith(TRANSACTION_LOG_DIRECTORY + "/")) { + log.debug("[%s] skipping a file inside transaction log dir: %s", queryId, location); + transactionLogFiles++; + continue; + } - // skip retained files - if (retainedPaths.contains(relativePath)) { - log.debug("[%s] retaining a known file: %s", queryId, location); - retainedKnownFiles++; - continue; - } + // skip retained files + if (retainedPaths.contains(relativePath)) { + log.debug("[%s] retaining a known file: %s", queryId, location); + retainedKnownFiles++; + continue; + } + + // ignore recently created files + Instant modificationTime = entry.lastModified(); + if (!modificationTime.isBefore(threshold)) { + log.debug("[%s] retaining an unknown file %s with modification time %s", queryId, location, modificationTime); + retainedUnknownFiles++; + continue; + } - // ignore recently created files - Instant modificationTime = entry.lastModified(); - if (!modificationTime.isBefore(threshold)) { - log.debug("[%s] retaining an unknown file %s with modification time %s", queryId, location, modificationTime); - retainedUnknownFiles++; - continue; + log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime); + filesToDelete.add(entry.location()); + if (filesToDelete.size() == DELETE_BATCH_SIZE) { + fileSystem.deleteFiles(filesToDelete); + removedFiles += filesToDelete.size(); + filesToDelete.clear(); + } } - log.debug("[%s] deleting file [%s] with modification time %s", queryId, location, modificationTime); - filesToDelete.add(entry.location()); - if (filesToDelete.size() == DELETE_BATCH_SIZE) { + if (!filesToDelete.isEmpty()) { fileSystem.deleteFiles(filesToDelete); removedFiles += filesToDelete.size(); - filesToDelete.clear(); } - } - if (!filesToDelete.isEmpty()) { - fileSystem.deleteFiles(filesToDelete); - removedFiles += filesToDelete.size(); + log.info( + "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", + queryId, + tableName, + tableLocation, + allPathsChecked, + transactionLogFiles, + retainedKnownFiles, + retainedUnknownFiles, + removedFiles); } - - log.info( - "[%s] finished vacuuming table %s [%s]: files checked: %s; metadata files: %s; retained known files: %s; retained unknown files: %s; removed files: %s", - queryId, - tableName, - tableLocation, - allPathsChecked, - transactionLogFiles, - retainedKnownFiles, - retainedUnknownFiles, - removedFiles); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java index 4f6e5fad33b5..1ab69d381f78 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeMetadata.java @@ -116,10 +116,6 @@ public class TestDeltaLakeMetadata private static final RowType NESTED_ROW_FIELD = RowType.from(ImmutableList.of( new RowType.Field(Optional.of("child1"), INTEGER), new RowType.Field(Optional.of("child2"), INTEGER))); - private static final RowType HIGHLY_NESTED_ROW_FIELD = RowType.from(ImmutableList.of( - new RowType.Field(Optional.of("grandparent"), RowType.from(ImmutableList.of( - new RowType.Field(Optional.of("parent"), RowType.from(ImmutableList.of( - new RowType.Field(Optional.of("child"), INTEGER))))))))); private static final DeltaLakeColumnHandle BOOLEAN_COLUMN_HANDLE = new DeltaLakeColumnHandle("boolean_column_name", BooleanType.BOOLEAN, OptionalInt.empty(), "boolean_column_name", BooleanType.BOOLEAN, REGULAR, Optional.empty()); @@ -142,32 +138,12 @@ public class TestDeltaLakeMetadata NESTED_ROW_FIELD, REGULAR, Optional.of(new DeltaLakeColumnProjectionInfo(INTEGER, ImmutableList.of(1), ImmutableList.of("child2")))); - private static final DeltaLakeColumnHandle NESTED_COLUMN_HANDLE_WITH_PROJECTION = - new DeltaLakeColumnHandle( - "highly_nested_column_name", - HIGHLY_NESTED_ROW_FIELD, - OptionalInt.empty(), - "highly_nested_column_name", - HIGHLY_NESTED_ROW_FIELD, - REGULAR, - Optional.of(new DeltaLakeColumnProjectionInfo(INTEGER, ImmutableList.of(0, 0), ImmutableList.of("grandparent", "parent")))); - private static final DeltaLakeColumnHandle EXPECTED_NESTED_COLUMN_HANDLE_WITH_PROJECTION = - new DeltaLakeColumnHandle( - "highly_nested_column_name", - HIGHLY_NESTED_ROW_FIELD, - OptionalInt.empty(), - "highly_nested_column_name", - HIGHLY_NESTED_ROW_FIELD, - REGULAR, - Optional.of(new DeltaLakeColumnProjectionInfo(INTEGER, ImmutableList.of(0, 0, 0), ImmutableList.of("grandparent", "parent", "child")))); private static final Map SYNTHETIC_COLUMN_ASSIGNMENTS = ImmutableMap.of( "test_synthetic_column_name_1", BOGUS_COLUMN_HANDLE, "test_synthetic_column_name_2", VARCHAR_COLUMN_HANDLE); private static final Map NESTED_COLUMN_ASSIGNMENTS = ImmutableMap.of("nested_column_name", NESTED_COLUMN_HANDLE); private static final Map EXPECTED_NESTED_COLUMN_ASSIGNMENTS = ImmutableMap.of("nested_column_name#child2", EXPECTED_NESTED_COLUMN_HANDLE); - private static final Map HIGHLY_NESTED_COLUMN_ASSIGNMENTS = ImmutableMap.of("highly_nested_column_name#grandparent#parent", NESTED_COLUMN_HANDLE_WITH_PROJECTION); - private static final Map EXPECTED_HIGHLY_NESTED_COLUMN_ASSIGNMENTS = ImmutableMap.of("highly_nested_column_name#grandparent#parent#child", EXPECTED_NESTED_COLUMN_HANDLE_WITH_PROJECTION); private static final ConnectorExpression DOUBLE_PROJECTION = new Variable("double_projection", DoubleType.DOUBLE); private static final ConnectorExpression BOOLEAN_PROJECTION = new Variable("boolean_projection", BooleanType.BOOLEAN); @@ -182,13 +158,6 @@ public class TestDeltaLakeMetadata private static final ConnectorExpression EXPECTED_NESTED_DEREFERENCE_PROJECTION = new Variable( "nested_column_name#child2", INTEGER); - private static final ConnectorExpression HIGHLY_NESTED_DEREFERENCE_PROJECTION = new FieldDereference( - INTEGER, - new Variable("highly_nested_column_name#grandparent#parent", HIGHLY_NESTED_ROW_FIELD), - 0); - private static final ConnectorExpression EXPECTED_HIGHLY_NESTED_DEREFERENCE_PROJECTION = new Variable( - "highly_nested_column_name#grandparent#parent#child", - INTEGER); private static final List SIMPLE_COLUMN_PROJECTIONS = ImmutableList.of(DOUBLE_PROJECTION, BOOLEAN_PROJECTION); @@ -198,10 +167,6 @@ public class TestDeltaLakeMetadata ImmutableList.of(NESTED_DEREFERENCE_PROJECTION); private static final List EXPECTED_NESTED_DEREFERENCE_COLUMN_PROJECTIONS = ImmutableList.of(EXPECTED_NESTED_DEREFERENCE_PROJECTION); - private static final List HIGHLY_NESTED_DEREFERENCE_COLUMN_PROJECTIONS = - ImmutableList.of(HIGHLY_NESTED_DEREFERENCE_PROJECTION); - private static final List EXPECTED_HIGHLY_NESTED_DEREFERENCE_COLUMN_PROJECTIONS = - ImmutableList.of(EXPECTED_HIGHLY_NESTED_DEREFERENCE_PROJECTION); private static final Set PREDICATE_COLUMNS = ImmutableSet.of(BOOLEAN_COLUMN_HANDLE, DOUBLE_COLUMN_HANDLE); @@ -275,12 +240,12 @@ public void tearDown() @Test public void testGetNewTableLayout() { - Optional newTableLayout = deltaLakeMetadataFactory.create(SESSION.getIdentity()) - .getNewTableLayout( - SESSION, - newTableMetadata( - ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), - ImmutableList.of(BIGINT_COLUMN_2))); + DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(SESSION.getIdentity()); + Optional newTableLayout = deltaLakeMetadata.getNewTableLayout( + SESSION, + newTableMetadata( + ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), + ImmutableList.of(BIGINT_COLUMN_2))); assertThat(newTableLayout).isPresent(); @@ -289,40 +254,45 @@ public void testGetNewTableLayout() assertThat(newTableLayout.get().getPartitionColumns()) .isEqualTo(ImmutableList.of(BIGINT_COLUMN_2.getName())); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test public void testGetNewTableLayoutNoPartitionColumns() { - assertThat(deltaLakeMetadataFactory.create(SESSION.getIdentity()) - .getNewTableLayout( - SESSION, - newTableMetadata( - ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), - ImmutableList.of()))) + DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(SESSION.getIdentity()); + assertThat(deltaLakeMetadata.getNewTableLayout( + SESSION, + newTableMetadata( + ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), + ImmutableList.of()))) .isNotPresent(); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test public void testGetNewTableLayoutInvalidPartitionColumns() { - assertThatThrownBy(() -> deltaLakeMetadataFactory.create(SESSION.getIdentity()) - .getNewTableLayout( - SESSION, - newTableMetadata( - ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), - ImmutableList.of(BIGINT_COLUMN_2, MISSING_COLUMN)))) + DeltaLakeMetadata deltaLakeMetadata = deltaLakeMetadataFactory.create(SESSION.getIdentity()); + assertThatThrownBy(() -> deltaLakeMetadata.getNewTableLayout( + SESSION, + newTableMetadata( + ImmutableList.of(BIGINT_COLUMN_1, BIGINT_COLUMN_2), + ImmutableList.of(BIGINT_COLUMN_2, MISSING_COLUMN)))) .isInstanceOf(TrinoException.class) .hasMessage("Table property 'partition_by' contained column names which do not exist: [missing_column]"); - assertThatThrownBy(() -> deltaLakeMetadataFactory.create(SESSION.getIdentity()) - .getNewTableLayout( - SESSION, - newTableMetadata( - ImmutableList.of(TIMESTAMP_COLUMN, BIGINT_COLUMN_2), - ImmutableList.of(BIGINT_COLUMN_2)))) + assertThatThrownBy(() -> deltaLakeMetadata.getNewTableLayout( + SESSION, + newTableMetadata( + ImmutableList.of(TIMESTAMP_COLUMN, BIGINT_COLUMN_2), + ImmutableList.of(BIGINT_COLUMN_2)))) .isInstanceOf(TrinoException.class) .hasMessage("Unsupported type: timestamp(3)"); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test @@ -347,6 +317,8 @@ public void testGetInsertLayout() assertThat(insertLayout.get().getPartitionColumns()) .isEqualTo(getPartitionColumnNames(ImmutableList.of(BIGINT_COLUMN_1))); + + deltaLakeMetadata.cleanupQuery(SESSION); } private ConnectorTableMetadata newTableMetadata(List tableColumns, List partitionTableColumns) @@ -377,6 +349,8 @@ public void testGetInsertLayoutTableUnpartitioned() SESSION, deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()))) .isNotPresent(); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test @@ -460,6 +434,8 @@ private void testApplyProjection( assertThat(projection.isPrecalculateStatistics()) .isFalse(); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test @@ -484,6 +460,8 @@ public void testApplyProjectionWithEmptyResult() ImmutableList.of(), ImmutableMap.of())) .isEmpty(); + + deltaLakeMetadata.cleanupQuery(SESSION); } @Test @@ -496,6 +474,7 @@ public void testGetInputInfoForPartitionedTable() deltaLakeMetadata.createTable(SESSION, tableMetadata, false); DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(true))); + deltaLakeMetadata.cleanupQuery(SESSION); } @Test @@ -508,6 +487,7 @@ public void testGetInputInfoForUnPartitionedTable() deltaLakeMetadata.createTable(SESSION, tableMetadata, false); DeltaLakeTableHandle tableHandle = (DeltaLakeTableHandle) deltaLakeMetadata.getTableHandle(SESSION, tableMetadata.getTable()); assertThat(deltaLakeMetadata.getInfo(tableHandle)).isEqualTo(Optional.of(new DeltaLakeInputInfo(false))); + deltaLakeMetadata.cleanupQuery(SESSION); } private static DeltaLakeTableHandle createDeltaLakeTableHandle(Set projectedColumns, Set constrainedColumns) diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java index 8f5cadb08bb1..c06f4a1cb959 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeGlueMetastore.java @@ -243,6 +243,8 @@ public void testHideNonDeltaLakeTable() .isEmpty(); assertThat(listTableColumns(metadata, new SchemaTablePrefix(databaseName, nonDeltaLakeView1.getTableName()))) .isEmpty(); + + metadata.cleanupQuery(session); } private Set listTableColumns(DeltaLakeMetadata metadata, SchemaTablePrefix tablePrefix) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java index 6a1096222001..1e9dc2fdadac 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/CreateEmptyPartitionProcedure.java @@ -19,6 +19,7 @@ import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveInsertTableHandle; import io.trino.plugin.hive.HiveMetastoreClosure; @@ -109,46 +110,49 @@ private void doCreateEmptyPartition(ConnectorSession session, ConnectorAccessCon checkProcedureArgument(partitionValues != null, "partition_values cannot be null"); TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true); - HiveTableHandle tableHandle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schemaName, tableName)); - if (tableHandle == null) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schemaName, tableName))); + hiveMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) { + HiveTableHandle tableHandle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schemaName, tableName)); + if (tableHandle == null) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schemaName, tableName))); + } + + accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schemaName, tableName)); + + List actualPartitionColumnNames = tableHandle.getPartitionColumns().stream() + .map(HiveColumnHandle::getName) + .collect(toImmutableList()); + + if (!Objects.equals(partitionColumnNames, actualPartitionColumnNames)) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Provided partition column names do not match actual partition column names: " + actualPartitionColumnNames); + } + + HiveMetastoreClosure metastore = hiveMetadata.getMetastore().unsafeGetRawHiveMetastoreClosure(); + if (metastore.getPartition(schemaName, tableName, partitionValues).isPresent()) { + throw new TrinoException(ALREADY_EXISTS, "Partition already exists"); + } + HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) hiveMetadata.beginInsert(session, tableHandle, ImmutableList.of(), NO_RETRIES); + String partitionName = makePartName(actualPartitionColumnNames, partitionValues); + + WriteInfo writeInfo = locationService.getPartitionWriteInfo(hiveInsertTableHandle.getLocationHandle(), Optional.empty(), partitionName); + Slice serializedPartitionUpdate = Slices.wrappedBuffer( + partitionUpdateJsonCodec.toJsonBytes( + new PartitionUpdate( + partitionName, + UpdateMode.NEW, + writeInfo.writePath().toString(), + writeInfo.targetPath().toString(), + ImmutableList.of(), + 0, + 0, + 0))); + + hiveMetadata.finishInsert( + session, + hiveInsertTableHandle, + ImmutableList.of(serializedPartitionUpdate), + ImmutableList.of()); + hiveMetadata.commit(); } - - accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schemaName, tableName)); - - List actualPartitionColumnNames = tableHandle.getPartitionColumns().stream() - .map(HiveColumnHandle::getName) - .collect(toImmutableList()); - - if (!Objects.equals(partitionColumnNames, actualPartitionColumnNames)) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Provided partition column names do not match actual partition column names: " + actualPartitionColumnNames); - } - - HiveMetastoreClosure metastore = hiveMetadata.getMetastore().unsafeGetRawHiveMetastoreClosure(); - if (metastore.getPartition(schemaName, tableName, partitionValues).isPresent()) { - throw new TrinoException(ALREADY_EXISTS, "Partition already exists"); - } - HiveInsertTableHandle hiveInsertTableHandle = (HiveInsertTableHandle) hiveMetadata.beginInsert(session, tableHandle, ImmutableList.of(), NO_RETRIES); - String partitionName = makePartName(actualPartitionColumnNames, partitionValues); - - WriteInfo writeInfo = locationService.getPartitionWriteInfo(hiveInsertTableHandle.getLocationHandle(), Optional.empty(), partitionName); - Slice serializedPartitionUpdate = Slices.wrappedBuffer( - partitionUpdateJsonCodec.toJsonBytes( - new PartitionUpdate( - partitionName, - UpdateMode.NEW, - writeInfo.writePath().toString(), - writeInfo.targetPath().toString(), - ImmutableList.of(), - 0, - 0, - 0))); - - hiveMetadata.finishInsert( - session, - hiveInsertTableHandle, - ImmutableList.of(serializedPartitionUpdate), - ImmutableList.of()); - hiveMetadata.commit(); } } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/DropStatsProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/DropStatsProcedure.java index 834784802597..be2e8e295441 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/DropStatsProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/DropStatsProcedure.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.Provider; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveMetastoreClosure; import io.trino.plugin.hive.HiveTableHandle; @@ -100,56 +101,59 @@ private void doDropStats(ConnectorSession session, ConnectorAccessControl access checkProcedureArgument(table != null, "table_name cannot be null"); TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true); - HiveTableHandle handle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schema, table)); - if (handle == null) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schema, table))); - } - - accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schema, table)); + hiveMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) { + HiveTableHandle handle = (HiveTableHandle) hiveMetadata.getTableHandle(session, new SchemaTableName(schema, table)); + if (handle == null) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, format("Table '%s' does not exist", new SchemaTableName(schema, table))); + } - Map columns = hiveMetadata.getColumnHandles(session, handle); - List partitionColumns = columns.values().stream() - .map(HiveColumnHandle.class::cast) - .filter(HiveColumnHandle::isPartitionKey) - .map(HiveColumnHandle::getName) - .collect(toImmutableList()); + accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schema, table)); - HiveMetastoreClosure metastore = hiveMetadata.getMetastore().unsafeGetRawHiveMetastoreClosure(); - if (partitionValues != null) { - // drop stats for specified partitions - List> partitionStringValues = partitionValues.stream() - .map(DropStatsProcedure::validateParameterType) + Map columns = hiveMetadata.getColumnHandles(session, handle); + List partitionColumns = columns.values().stream() + .map(HiveColumnHandle.class::cast) + .filter(HiveColumnHandle::isPartitionKey) + .map(HiveColumnHandle::getName) .collect(toImmutableList()); - validatePartitions(partitionStringValues, partitionColumns); - partitionStringValues.forEach(values -> metastore.updatePartitionStatistics( - schema, - table, - makePartName(partitionColumns, values), - stats -> PartitionStatistics.empty())); - } - else { - // no partition specified, so drop stats for the entire table - if (partitionColumns.isEmpty()) { - // for non-partitioned tables, just wipe table stats - metastore.updateTableStatistics( + HiveMetastoreClosure metastore = hiveMetadata.getMetastore().unsafeGetRawHiveMetastoreClosure(); + if (partitionValues != null) { + // drop stats for specified partitions + List> partitionStringValues = partitionValues.stream() + .map(DropStatsProcedure::validateParameterType) + .collect(toImmutableList()); + validatePartitions(partitionStringValues, partitionColumns); + + partitionStringValues.forEach(values -> metastore.updatePartitionStatistics( schema, table, - NO_ACID_TRANSACTION, - stats -> PartitionStatistics.empty()); + makePartName(partitionColumns, values), + stats -> PartitionStatistics.empty())); } else { - // the table is partitioned; remove stats for every partition - metastore.getPartitionNamesByFilter(handle.getSchemaName(), handle.getTableName(), partitionColumns, TupleDomain.all()) - .ifPresent(partitions -> partitions.forEach(partitionName -> metastore.updatePartitionStatistics( - schema, - table, - partitionName, - stats -> PartitionStatistics.empty()))); + // no partition specified, so drop stats for the entire table + if (partitionColumns.isEmpty()) { + // for non-partitioned tables, just wipe table stats + metastore.updateTableStatistics( + schema, + table, + NO_ACID_TRANSACTION, + stats -> PartitionStatistics.empty()); + } + else { + // the table is partitioned; remove stats for every partition + metastore.getPartitionNamesByFilter(handle.getSchemaName(), handle.getTableName(), partitionColumns, TupleDomain.all()) + .ifPresent(partitions -> partitions.forEach(partitionName -> metastore.updatePartitionStatistics( + schema, + table, + partitionName, + stats -> PartitionStatistics.empty()))); + } } - } - hiveMetadata.commit(); + hiveMetadata.commit(); + } } private static List validateParameterType(Object param) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/RegisterPartitionProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/RegisterPartitionProcedure.java index eaf6295f5042..2dc267fcfe05 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/RegisterPartitionProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/RegisterPartitionProcedure.java @@ -20,8 +20,10 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.TransactionalMetadata; import io.trino.plugin.hive.TransactionalMetadataFactory; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -113,49 +115,53 @@ private void doRegisterPartition(ConnectorSession session, ConnectorAccessContro throw new TrinoException(PERMISSION_DENIED, "register_partition procedure is disabled"); } - SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(session.getIdentity(), true).getMetastore(); + TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true); + hiveMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) { + SemiTransactionalHiveMetastore metastore = hiveMetadata.getMetastore(); - SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - Table table = metastore.getTable(schemaName, tableName) - .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + Table table = metastore.getTable(schemaName, tableName) + .orElseThrow(() -> new TableNotFoundException(schemaTableName)); - accessControl.checkCanInsertIntoTable(null, schemaTableName); + accessControl.checkCanInsertIntoTable(null, schemaTableName); - checkIsPartitionedTable(table); - checkPartitionColumns(table, partitionColumns); + checkIsPartitionedTable(table); + checkPartitionColumns(table, partitionColumns); - Optional partition = metastore.unsafeGetRawHiveMetastoreClosure().getPartition(schemaName, tableName, partitionValues); - if (partition.isPresent()) { - String partitionName = makePartName(partitionColumns, partitionValues); - throw new TrinoException(ALREADY_EXISTS, format("Partition [%s] is already registered with location %s", partitionName, partition.get().getStorage().getLocation())); - } + Optional partition = metastore.unsafeGetRawHiveMetastoreClosure().getPartition(schemaName, tableName, partitionValues); + if (partition.isPresent()) { + String partitionName = makePartName(partitionColumns, partitionValues); + throw new TrinoException(ALREADY_EXISTS, format("Partition [%s] is already registered with location %s", partitionName, partition.get().getStorage().getLocation())); + } - Location partitionLocation = Optional.ofNullable(location) - .map(Location::of) - .orElseGet(() -> Location.of(table.getStorage().getLocation()).appendPath(makePartName(partitionColumns, partitionValues))); + Location partitionLocation = Optional.ofNullable(location) + .map(Location::of) + .orElseGet(() -> Location.of(table.getStorage().getLocation()).appendPath(makePartName(partitionColumns, partitionValues))); - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - try { - if (!fileSystem.directoryExists(partitionLocation).orElse(true)) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Partition location does not exist: " + partitionLocation); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + try { + if (!fileSystem.directoryExists(partitionLocation).orElse(true)) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Partition location does not exist: " + partitionLocation); + } + } + catch (IOException e) { + throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed checking partition location: " + partitionLocation, e); } - } - catch (IOException e) { - throw new TrinoException(HIVE_FILESYSTEM_ERROR, "Failed checking partition location: " + partitionLocation, e); - } - metastore.addPartition( - session, - table.getDatabaseName(), - table.getTableName(), - buildPartitionObject(session, table, partitionValues, partitionLocation), - partitionLocation, - Optional.empty(), // no need for failed attempts cleanup - PartitionStatistics.empty(), - false); - - metastore.commit(); + metastore.addPartition( + session, + table.getDatabaseName(), + table.getTableName(), + buildPartitionObject(session, table, partitionValues, partitionLocation), + partitionLocation, + Optional.empty(), // no need for failed attempts cleanup + PartitionStatistics.empty(), + false); + + metastore.commit(); + } } private static Partition buildPartitionObject(ConnectorSession session, Table table, List partitionValues, Location location) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/SyncPartitionMetadataProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/SyncPartitionMetadataProcedure.java index 3be1d4d10d3b..47282bd11a27 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/SyncPartitionMetadataProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/SyncPartitionMetadataProcedure.java @@ -21,7 +21,9 @@ import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.PartitionStatistics; +import io.trino.plugin.hive.TransactionalMetadata; import io.trino.plugin.hive.TransactionalMetadataFactory; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Partition; @@ -116,36 +118,40 @@ private void doSyncPartitionMetadata(ConnectorSession session, ConnectorAccessCo checkProcedureArgument(mode != null, "mode cannot be null"); SyncMode syncMode = toSyncMode(mode); - SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(session.getIdentity(), true).getMetastore(); - SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true); + hiveMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) { + SemiTransactionalHiveMetastore metastore = hiveMetadata.getMetastore(); + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - Table table = metastore.getTable(schemaName, tableName) - .orElseThrow(() -> new TableNotFoundException(schemaTableName)); - if (table.getPartitionColumns().isEmpty()) { - throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName); - } + Table table = metastore.getTable(schemaName, tableName) + .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + if (table.getPartitionColumns().isEmpty()) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "Table is not partitioned: " + schemaTableName); + } - if (syncMode == SyncMode.ADD || syncMode == SyncMode.FULL) { - accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schemaName, tableName)); - } - if (syncMode == SyncMode.DROP || syncMode == SyncMode.FULL) { - accessControl.checkCanDeleteFromTable(null, new SchemaTableName(schemaName, tableName)); - } + if (syncMode == SyncMode.ADD || syncMode == SyncMode.FULL) { + accessControl.checkCanInsertIntoTable(null, new SchemaTableName(schemaName, tableName)); + } + if (syncMode == SyncMode.DROP || syncMode == SyncMode.FULL) { + accessControl.checkCanDeleteFromTable(null, new SchemaTableName(schemaName, tableName)); + } - Location tableLocation = Location.of(table.getStorage().getLocation()); + Location tableLocation = Location.of(table.getStorage().getLocation()); - Set partitionsInMetastore = metastore.getPartitionNames(schemaName, tableName) - .map(ImmutableSet::copyOf) - .orElseThrow(() -> new TableNotFoundException(schemaTableName)); - Set partitionsInFileSystem = listPartitions(fileSystemFactory.create(session), tableLocation, table.getPartitionColumns(), caseSensitive); + Set partitionsInMetastore = metastore.getPartitionNames(schemaName, tableName) + .map(ImmutableSet::copyOf) + .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + Set partitionsInFileSystem = listPartitions(fileSystemFactory.create(session), tableLocation, table.getPartitionColumns(), caseSensitive); - // partitions in file system but not in metastore - Set partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore); + // partitions in file system but not in metastore + Set partitionsToAdd = difference(partitionsInFileSystem, partitionsInMetastore); - // partitions in metastore but not in file system - Set partitionsToDrop = difference(partitionsInMetastore, partitionsInFileSystem); + // partitions in metastore but not in file system + Set partitionsToDrop = difference(partitionsInMetastore, partitionsInFileSystem); - syncPartitions(partitionsToAdd, partitionsToDrop, syncMode, metastore, session, table); + syncPartitions(partitionsToAdd, partitionsToDrop, syncMode, metastore, session, table); + } } private static Set listPartitions(TrinoFileSystem fileSystem, Location directory, List partitionColumns, boolean caseSensitive) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/UnregisterPartitionProcedure.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/UnregisterPartitionProcedure.java index 2ff444f2f30e..d5e03b0332c7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/UnregisterPartitionProcedure.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/UnregisterPartitionProcedure.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.Provider; +import io.trino.plugin.base.util.UncheckedCloseable; +import io.trino.plugin.hive.TransactionalMetadata; import io.trino.plugin.hive.TransactionalMetadataFactory; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore; @@ -94,28 +96,32 @@ private void doUnregisterPartition(ConnectorSession session, ConnectorAccessCont SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); - SemiTransactionalHiveMetastore metastore = hiveMetadataFactory.create(session.getIdentity(), true).getMetastore(); + TransactionalMetadata hiveMetadata = hiveMetadataFactory.create(session.getIdentity(), true); + hiveMetadata.beginQuery(session); + try (UncheckedCloseable ignore = () -> hiveMetadata.cleanupQuery(session)) { + SemiTransactionalHiveMetastore metastore = hiveMetadata.getMetastore(); - Table table = metastore.getTable(schemaName, tableName) - .orElseThrow(() -> new TableNotFoundException(schemaTableName)); + Table table = metastore.getTable(schemaName, tableName) + .orElseThrow(() -> new TableNotFoundException(schemaTableName)); - accessControl.checkCanDeleteFromTable(null, schemaTableName); + accessControl.checkCanDeleteFromTable(null, schemaTableName); - checkIsPartitionedTable(table); - checkPartitionColumns(table, partitionColumns); + checkIsPartitionedTable(table); + checkPartitionColumns(table, partitionColumns); - String partitionName = makePartName(partitionColumns, partitionValues); + String partitionName = makePartName(partitionColumns, partitionValues); - Partition partition = metastore.unsafeGetRawHiveMetastoreClosure().getPartition(schemaName, tableName, partitionValues) - .orElseThrow(() -> new TrinoException(NOT_FOUND, format("Partition '%s' does not exist", partitionName))); + Partition partition = metastore.unsafeGetRawHiveMetastoreClosure().getPartition(schemaName, tableName, partitionValues) + .orElseThrow(() -> new TrinoException(NOT_FOUND, format("Partition '%s' does not exist", partitionName))); - metastore.dropPartition( - session, - table.getDatabaseName(), - table.getTableName(), - partition.getValues(), - false); + metastore.dropPartition( + session, + table.getDatabaseName(), + table.getTableName(), + partition.getValues(), + false); - metastore.commit(); + metastore.commit(); + } } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseS3AndGlueMetastoreTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseS3AndGlueMetastoreTest.java index f43d7a7f72ca..fcf9f741a9a7 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseS3AndGlueMetastoreTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseS3AndGlueMetastoreTest.java @@ -18,6 +18,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.S3ObjectSummary; import io.trino.Session; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.testing.AbstractTestQueryFramework; @@ -346,11 +347,4 @@ public String locationForTable(String bucketName, String schemaName, String tabl return locationPattern.formatted(bucketName, schemaName, tableName); } } - - protected interface UncheckedCloseable - extends AutoCloseable - { - @Override - void close(); - } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java index 15ff9fc407a9..d45371b16b0b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveS3AndGlueMetastoreTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.Session; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.spi.security.Identity; import io.trino.spi.security.SelectedRole; import io.trino.testing.DistributedQueryRunner; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergS3AndGlueMetastoreTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergS3AndGlueMetastoreTest.java index 7c16726b970d..326b38011ba3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergS3AndGlueMetastoreTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergS3AndGlueMetastoreTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.glue; import com.google.common.collect.ImmutableMap; +import io.trino.plugin.base.util.UncheckedCloseable; import io.trino.plugin.hive.BaseS3AndGlueMetastoreTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.testing.DistributedQueryRunner;