diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 6030f392ca23..85bea5b5aec0 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -255,6 +255,7 @@ import static io.trino.plugin.hive.metastore.MetastoreUtil.verifyOnline; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.fromHivePrivilegeInfos; +import static io.trino.plugin.hive.metastore.SemiTransactionalHiveMetastore.cleanExtraOutputFiles; import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; import static io.trino.plugin.hive.util.CompressionConfigUtil.configureCompression; @@ -1903,6 +1904,10 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode if (partitionUpdate.getUpdateMode() == OVERWRITE) { if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) { removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath()); + if (handle.isRetriesEnabled()) { + HdfsContext hdfsContext = new HdfsContext(session); + cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames())); + } } else { metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues(), true); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index 36741733c426..f2a7788b0e11 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -1946,7 +1946,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Part } verify(partitionAndMore.hasFileNames(), "fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true"); - cleanExtraOutputFiles(hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames())); + SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, partitionAndMore.getCurrentLocation(), ImmutableSet.copyOf(partitionAndMore.getFileNames())); } private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, TableAndMore tableAndMore) @@ -1956,59 +1956,7 @@ private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Tabl } Path tableLocation = tableAndMore.getCurrentLocation().orElseThrow(() -> new IllegalArgumentException("currentLocation expected to be set if isCleanExtraOutputFilesOnCommit is true")); List files = tableAndMore.getFileNames().orElseThrow(() -> new IllegalArgumentException("fileNames expected to be set if isCleanExtraOutputFilesOnCommit is true")); - cleanExtraOutputFiles(hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files)); - } - - private void cleanExtraOutputFiles(HdfsContext hdfsContext, String queryId, Path path, Set filesToKeep) - { - List filesToDelete = new LinkedList<>(); - try { - log.debug("Deleting failed attempt files from %s for query %s", path, queryId); - FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path); - if (!fileSystem.exists(path)) { - // directory may nat exit if no files were actually written - return; - } - - // files are written flat in a single directory so we do not need to list recursively - RemoteIterator iterator = fileSystem.listFiles(path, false); - while (iterator.hasNext()) { - Path file = iterator.next().getPath(); - if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) { - filesToDelete.add(file.getName()); - } - } - - ImmutableList.Builder deletedFilesBuilder = ImmutableList.builder(); - Iterator filesToDeleteIterator = filesToDelete.iterator(); - while (filesToDeleteIterator.hasNext()) { - String fileName = filesToDeleteIterator.next(); - log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId); - fileSystem.delete(new Path(path, fileName), false); - deletedFilesBuilder.add(fileName); - filesToDeleteIterator.remove(); - } - - List deletedFiles = deletedFilesBuilder.build(); - if (!deletedFiles.isEmpty()) { - log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId); - } - } - catch (IOException e) { - // If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query. - // Yet if we have problem here, probably rollback will also fail. - // - // Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually. - // Note it is not a bullet-proof solution. - // The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably. - // It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state. - // - // Still we cannot do much better for non-transactional Hive tables. - throw new TrinoException( - HIVE_FILESYSTEM_ERROR, - format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete), - e); - } + SemiTransactionalHiveMetastore.cleanExtraOutputFiles(hdfsEnvironment, hdfsContext, queryId, tableLocation, ImmutableSet.copyOf(files)); } private PartitionStatistics getExistingPartitionStatistics(Partition partition, String partitionName) @@ -3687,4 +3635,56 @@ public void commitTransaction(long transactionId) { delegate.commitTransaction(transactionId); } + + public static void cleanExtraOutputFiles(HdfsEnvironment hdfsEnvironment, HdfsContext hdfsContext, String queryId, Path path, Set filesToKeep) + { + List filesToDelete = new LinkedList<>(); + try { + log.debug("Deleting failed attempt files from %s for query %s", path, queryId); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path); + if (!fileSystem.exists(path)) { + // directory may nat exit if no files were actually written + return; + } + + // files are written flat in a single directory so we do not need to list recursively + RemoteIterator iterator = fileSystem.listFiles(path, false); + while (iterator.hasNext()) { + Path file = iterator.next().getPath(); + if (isFileCreatedByQuery(file.getName(), queryId) && !filesToKeep.contains(file.getName())) { + filesToDelete.add(file.getName()); + } + } + + ImmutableList.Builder deletedFilesBuilder = ImmutableList.builder(); + Iterator filesToDeleteIterator = filesToDelete.iterator(); + while (filesToDeleteIterator.hasNext()) { + String fileName = filesToDeleteIterator.next(); + log.debug("Deleting failed attempt file %s/%s for query %s", path, fileName, queryId); + fileSystem.delete(new Path(path, fileName), false); + deletedFilesBuilder.add(fileName); + filesToDeleteIterator.remove(); + } + + List deletedFiles = deletedFilesBuilder.build(); + if (!deletedFiles.isEmpty()) { + log.info("Deleted failed attempt files %s from %s for query %s", deletedFiles, path, queryId); + } + } + catch (IOException e) { + // If we fail here query will be rolled back. The optimal outcome would be for rollback to complete successfully and clean up everything for query. + // Yet if we have problem here, probably rollback will also fail. + // + // Thrown exception is listing files which we could not delete. So those can be cleaned up later by user manually. + // Note it is not a bullet-proof solution. + // The rollback routine will still fire and try to cleanup the changes query made. It will cleanup some, leave some behind probably. + // It is not obvious that if at this point user cleans up the failed attempt files the table would be in the expected state. + // + // Still we cannot do much better for non-transactional Hive tables. + throw new TrinoException( + HIVE_FILESYSTEM_ERROR, + format("Error deleting failed retry attempt files from %s; remaining files %s; manual cleanup may be required", path, filesToDelete), + e); + } + } }